注意:本文基于 MyCat2 main 分支 ced134b 版本源码进行学习研究,其他版本可能会存在实现逻辑差异,对源码感兴趣的读者请注意版本选择 。
前言MyCat 是曾经较为流行的一款分库分表中间件,能够支持海量数据的水平分片,以及读写分离、分布式事务等功能。MyCat2 在原有功能的基础上增加了分布式查询引擎,该引擎基于 Calcite 项目实现,能够将 SQL 编译为关系代数表达式,并基于规则优化引擎和代价优化引擎,生成物理执行计划,实现对跨库、跨实例的分布式 SQL 的支持 。虽然 MyCat 项目已经停止维护,但是分布式查询引擎功能
仍然值得我们学习,本文将带领大家一起探索 Apache Calcite
在 MyCat2
中的实践,学习如何基于 Calcite 构建分布式查询引擎。
MyCat2 环境搭建首先,我们需要本地启动 MyCat2 服务,参考入门 MyCat2 ,MyCat2 配置分为服务器配置和 Schema 配置。服务器配置 server.json
中可以指定 MyCat2 对外提供服务的 IP 和端口,serverVersion
用于模拟 MySQL 版本,此处我们将 serverVersion
调整为 8.0.40-mycat-2.0
。
1 2 3 4 5 6 7 8 9 { "server" : { "ip" : "127.0.0.1" , "mycatId" : 1 , "port" : 8066 , "serverVersion" : "8.0.40-mycat-2.0" } }
Schema 对应了 MySQL 中的库,MyCat2 Schema 配置包含了库与表的配置,它是建立在集群的基础上,而集群则是建立在数据源的基础上。因此,我们在配置时,需要自下而上进行配置,先配置数据源,再加数据源构建为集群,然后在集群上配置库与表。
MyCat2 中将 Schema 划分为 2 类:原型库和业务库 ,原型库 prototype
用于支持 MySQL 的兼容性 SQL 和系统表 SQL ,这些 SQL 通常是由客户端或者 DAO 框架请求,普通用户一般不会使用。业务库顾名思义,就是指用户业务数据存储的库,通常会对这些库表进行水平分片、读写分离的配置。原型库和业务库都遵循上面的 Schema 配置方式,都可以配置在集群之上。
原型库配置按照前文所属,我们先配置下 prototype
原型库的数据源,修改 prototypeDs.datasource.json
文件,将 MySQL 中的系统库 mysql
注册进来,数据源的名称为 prototypeDs
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "dbType" : "mysql" , "idleTimeout" : 60000 , "initSqls" : [ ] , "initSqlsGetConnection" : true , "instanceType" : "READ_WRITE" , "maxCon" : 1000 , "maxConnectTimeout" : 30000 , "maxRetryCount" : 5 , "minCon" : 1 , "name" : "prototypeDs" , "password" : "123456" , "type" : "JDBC" , "url" : "jdbc:mysql://localhost:3306/mysql?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8" , "user" : "root" , "weight" : 0 }
然后修改 prototype.cluster.json
,将 prototypeDs
数据源构建为原型库集群,prototype.cluster.json
配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 { "clusterType" : "MASTER_SLAVE" , "heartbeat" : { "heartbeatTimeout" : 1000 , "maxRetry" : 3 , "minSwitchTimeInterval" : 300 , "slaveThreshold" : 0 } , "masters" : [ "prototypeDs" ] , "maxCon" : 200 , "name" : "prototype" , "readBalanceType" : "BALANCE_ALL" , "switchType" : "SWITCH" }
配置完成后,我们搜索 MycatCore
类进行本地启动,出现如下的日志表示启动成功。
启动成功后,可以使用 mysql -h127.0.0.1 -uroot -p -P8066 --binary-as-hex=0 -c -A
命令连接 MyCat2,密码为 123456
。通过 SHOW DATABASES
可以看到,MyCat2 通过原型库默认提供了 3 个系统库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 > mysql - h127.0 .0 .1 - uroot - p - P8066 Enter password: Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 0 Server version: 8.0 .40 - mycat-2.0 MySQL Community Server - GPL Copyright (c) 2000 , 2024 , Oracle and / or its affiliates. Oracle is a registered trademark of Oracle Corporation and / or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> SHOW DATABASES; + | `Database` | + | information_schema | | mysql | | performance_schema | + 3 rows in set (0.14 sec)
业务库配置完成原型库配置后,我们再来配置业务库。和原型库的配置类似,我们同样需要先注册 MySQL 数据源,然后将数据源构建为集群。MyCat2 提供了一种注释 SQL,用来注册数据源和集群。我们使用 mysql -h127.0.0.1 -uroot -p -P8066 --binary-as-hex=0 -c -A
连接 MyCat2 服务,并执行以下 SQL 注册数据源。为了模拟 MySQL 主从同步,我们将从库的数据库设置为和主库相同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ; ; ; ;
然后,我们基于创建的数据源构建集群,执行以下 SQL 创建集群:
创建完集群后,我们就可以创建一些不同维度的分片表,并通过这些表来观察 MyCat2 是如何支持分布式 SQL 的,执行如下 SQL 创建分片表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 CREATE DATABASE sharding_db;USE sharding_db; CREATE TABLE `sbtest_sharding_id` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `k` int (11 ) NOT NULL DEFAULT '0' , `c` char (120 ) NOT NULL DEFAULT '' , `pad` char (60 ) NOT NULL DEFAULT '' , PRIMARY KEY (`id`) ) DBPARTITION BY HASH(id) DBPARTITIONS 2 ; CREATE TABLE `sbtest_sharding_k` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `k` int (11 ) NOT NULL DEFAULT '0' , `c` char (120 ) NOT NULL DEFAULT '' , `pad` char (60 ) NOT NULL DEFAULT '' , PRIMARY KEY (`id`) ) DBPARTITION BY HASH(k) DBPARTITIONS 2 ; CREATE TABLE `sbtest_sharding_c` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `k` int (11 ) NOT NULL DEFAULT '0' , `c` char (120 ) NOT NULL DEFAULT '' , `pad` char (60 ) NOT NULL DEFAULT '' , PRIMARY KEY (`id`) ) DBPARTITION BY HASH(c) DBPARTITIONS 2 ;
初始化数据创建好分片表后,我们再使用 sysbench 工具向 sbtest1
表插入 10w 条数据,执行如下脚本初始化数据:
1 sysbench /opt/homebrew/Cellar/sysbench/1.0.20_6/share/sysbench/oltp_read_write.lua --tables=1 --table_size=100000 --mysql-user=root --mysql-password=123456 --mysql-host=127.0.0.1 --mysql-port=8066 --mysql-db=sharding_db prepare
由于 MyCat2 不支持 INSERT ... SELECT ...
语句,因此需要先使用 mysqldump
将 sbtest1
中的数据导出到文件。
1 mysqldump -h127.0.0.1 -uroot -p -P8066 sharding_db sbtest1 > sbtest1.sql
然后修改 sbtest1.sql
文件,注释掉文件中除了 INSERT
外的语句,并将 sbtest1
分别修改为 sbtest_sharding_id
、sbtest_sharding_k
和 sbtest_sharding_c
,然后执行 mysql> source ~/sbtest1.sql
导入数据到目标表。使用 SELECT COUNT(1)
检查各个表的数据量,都是 10w 条记录,符合我们的预期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 mysql> SELECT COUNT (1 ) FROM sbtest_sharding_id; + | COUNT (1 ) | + | 100000 | + mysql> SELECT COUNT (1 ) FROM sbtest_sharding_k; + | COUNT (1 ) | + | 100000 | + mysql> SELECT COUNT (1 ) FROM sbtest_sharding_c; + | COUNT (1 ) | + | 100000 | +
MyCat2 Calcite 实践探究 执行流程概览参考 MyCat2 SQL 编写指导 ,MyCat2 SQL 执行流程如下,服务端接收到 SQL 字符串或模板化 SQL 后,会先将 SQL 解析为 SQL AST,然后使用 Hack Router
进行路由判断,如果是一些简单的单节点 SQL,Hack Router
会直接将 SQL 路由到 DB 中执行,其他复杂的 SQL 则会进入 DRDS 处理流程。DRDS 处理流程中,会使用 Calcite 对 SQL 语句进行编译,然后生成关系代数树,并经过逻辑优化和物理优化两步,最终执行返回 SQL 结果。
初看 SQL 执行由于本文主要关注 MyCat2 对于 Calcite 的应用,因此后续介绍中其他流程不会过多探究,感兴趣的朋友可以下载源码自行研究。我们执行如下的 SQL 示例,来跟踪 MyCat2 的执行流程,并探索在 SQL 执行过程中,Calcite 查询引擎都进行了哪些优化。
1 SELECT * FROM sbtest_sharding_id i INNER JOIN sbtest_sharding_k k ON i.id = k.id INNER JOIN sbtest_sharding_c c ON k.id = c.id LIMIT 10 ;
首先,我们可以执行 EXPLAIN
语句,先观察下这条语句的执行计划(省略了执行计划中生成的执行代码 Code
部分)。对于这 3 张表的 JOIN 处理,MyCat2 优化器选择了 SortMergeJoin
的方式,从 MySQL 中查询 sharding_db.sbtest_sharding_id
和 sharding_db.sbtest_sharding_k
表时,会使用 Join Key
进行排序,对于已经排序的结果集,再拉取到内存中进行 SortMergeJoin
。处理完 Join 后,会对结果集进行一次内存排序,然后和 sharding_db.sbtest_sharding_c
表再进行一次 SortMergeJoin
,最终的结果集经过内存排序后获取出前 10 条结果。
可以看到,MyCat2 中将分片的逻辑表封装为 MycatView,MycatView 在内部下推执行时,会根据分片的规则改写为不同的真实 SQL,执行计划中的 Each
部分展示了下推 DB 执行的 SQL 语句,由于使用了 SortMergeJoin
,因此下推语句中包含了 ORDER BY
排序处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 mysql> EXPLAIN SELECT * FROM sbtest_sharding_id i INNER JOIN sbtest_sharding_k k ON i.id = k.id INNER JOIN sbtest_sharding_c c ON k.id = c.id LIMIT 10 ; + | plan | + | Plan: | | MycatMemSort(fetch = [?0 ]) | | MycatSortMergeJoin(condition = [= ($4 , $8 )], joinType= [inner ]) | | MycatMemSort(sort0= [$4 ], dir0= [ASC ]) | | MycatSortMergeJoin(condition = [= ($0 , $4 )], joinType= [inner ]) | | MycatView(distribution= [[sharding_db.sbtest_sharding_id]], mergeSort= [true ]) | | MycatView(distribution= [[sharding_db.sbtest_sharding_k]], mergeSort= [true ]) | | MycatView(distribution= [[sharding_db.sbtest_sharding_c]], mergeSort= [true ]) | | Each (targetName= c0, sql = SELECT * FROM sharding_db_0.sbtest_sharding_id_0 AS `sbtest_sharding_id` ORDER BY (`sbtest_sharding_id`.`id` IS NULL ), `sbtest_sharding_id`.`id`) | | Each (targetName= c1, sql = SELECT * FROM sharding_db_1.sbtest_sharding_id_0 AS `sbtest_sharding_id` ORDER BY (`sbtest_sharding_id`.`id` IS NULL ), `sbtest_sharding_id`.`id`) | | Each (targetName= c0, sql = SELECT * FROM sharding_db_0.sbtest_sharding_k_0 AS `sbtest_sharding_k` ORDER BY (`sbtest_sharding_k`.`id` IS NULL ), `sbtest_sharding_k`.`id`) | | Each (targetName= c1, sql = SELECT * FROM sharding_db_1.sbtest_sharding_k_0 AS `sbtest_sharding_k` ORDER BY (`sbtest_sharding_k`.`id` IS NULL ), `sbtest_sharding_k`.`id`) | | Each (targetName= c0, sql = SELECT * FROM sharding_db_0.sbtest_sharding_c_0 AS `sbtest_sharding_c` ORDER BY (`sbtest_sharding_c`.`id` IS NULL ), `sbtest_sharding_c`.`id`) | | Each (targetName= c1, sql = SELECT * FROM sharding_db_1.sbtest_sharding_c_0 AS `sbtest_sharding_c` ORDER BY (`sbtest_sharding_c`.`id` IS NULL ), `sbtest_sharding_c`.`id`) | + 170 rows in set (0.46 sec)
从 SQL 到执行计划通过 MyCat2 的执行计划,我们对于分片表的多表关联查询有了初步的认识,下面我们来探究下 MyCat2 的代码实现,看看一条 SQL 是如何转换为执行计划的。我们执行如下的 SQL 语句:
1 SELECT * FROM sbtest_sharding_id i INNER JOIN sbtest_sharding_k k ON i.id = k.id INNER JOIN sbtest_sharding_c c ON k.id = c.id LIMIT 10 ;
MyCat2 SQL 执行的入口在 MycatdbCommand 类中,它会根据 SQL 语句的类型生成不同的 Handler 类,SQLSelectStatement
查询语句对应的是 ShardingSQLHandler
。
获取到 ShardingSQLHandler
后,会调用 AbstractSQLHandler#execute
方法,最终会调用到 ShardingSQLHandler#onExecute
方法中,方法内部会使用 hackRouter
的 analyse
方法进行分析,用来决定 SQL 直接执行还是走 DRDS 执行。analyse
方法内部会先提取出语句中的表,然后调用 checkVaildNormalRoute
方法,对不同表的路由进行 check
并记录数据分布结果,最后根据数据分布结果决定执行方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ShardingSQLHandler extends AbstractSQLHandler <SQLSelectStatement> { @Override protected Future<Void> onExecute (SQLRequest<SQLSelectStatement> request, MycatDataContext dataContext, Response response) { Optional<Future<Void>> op = Optional.empty(); ... DrdsSqlWithParams drdsSqlWithParams = DrdsRunnerHelper.preParse(request.getAst(), dataContext.getDefaultSchema()); HackRouter hackRouter = new HackRouter (drdsSqlWithParams.getParameterizedStatement(), dataContext); try { if (hackRouter.analyse()) { Pair<String, String> plan = hackRouter.getPlan(); return response.proxySelect(Collections.singletonList(plan.getKey()), plan.getValue(), drdsSqlWithParams.getParams()); } else { return DrdsRunnerHelper.runOnDrds(dataContext, drdsSqlWithParams, response); } } catch (Throwable throwable) { LOGGER.error(request.getAst().toString(), throwable); return Future.failedFuture(throwable); } } }
DrdsRunnerHelper#runOnDrds
方法逻辑如下,getPlan
用于获取 SQL 对应的执行计划,然后再调用 getPlanImplementor
获取执行计划的执行器,并执行 SQL 语句,然后返回 Future 对象等待返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static Future<Void> runOnDrds (MycatDataContext dataContext, DrdsSqlWithParams drdsSqlWithParams, Response response) { PlanImpl plan = getPlan(drdsSqlWithParams); PlanImplementor planImplementor = getPlanImplementor(dataContext, response, drdsSqlWithParams); return impl(plan, planImplementor); } @NotNull public static PlanImpl getPlan (DrdsSqlWithParams drdsSqlWithParams) { QueryPlanner planner = MetaClusterCurrent.wrapper(QueryPlanner.class); PlanImpl plan; ParamHolder paramHolder = ParamHolder.CURRENT_THREAD_LOCAL.get(); try { paramHolder.setData(drdsSqlWithParams.getParams(), drdsSqlWithParams.getTypeNames()); CodeExecuterContext codeExecuterContext = planner.innerComputeMinCostCodeExecuterContext(drdsSqlWithParams); plan = new PlanImpl (codeExecuterContext.getMycatRel(), codeExecuterContext, drdsSqlWithParams.getAliasList()); } finally { } return plan; }
我们先重点关注 getPlan
方法是如何生成执行计划的,该方法内部调用的是 QueryPlanner#innerComputeMinCostCodeExecuterContext
方法,它负责从缓存中获取 MyCatRelList
执行计划,如果缓存中不存在则调用 add
方法生成执行计划,并将执行计划添加到缓存中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public CodeExecuterContext innerComputeMinCostCodeExecuterContext (DrdsSql sqlSelectStatement) { RelOptCluster relOptCluster = DrdsSqlCompiler.newCluster(); List<CodeExecuterContext> codeExecuterContexts = getAcceptedMycatRelList(sqlSelectStatement); int size = codeExecuterContexts.size(); } public List<CodeExecuterContext> getAcceptedMycatRelList (DrdsSql drdsSql) { List<CodeExecuterContext> acceptedMycatRelList = planCache.getAcceptedMycatRelList(drdsSql); if (acceptedMycatRelList.isEmpty()) { synchronized (this ) { acceptedMycatRelList = planCache.getAcceptedMycatRelList(drdsSql); if (!acceptedMycatRelList.isEmpty()) { return acceptedMycatRelList; } else { PlanResultSet add = planCache.add(false , drdsSql); return Collections.singletonList(add.getContext()); } } } else { return acceptedMycatRelList; } }
add
方法内部首先会获取 SQL 执行计划的基线,用于提供稳定的执行计划,然后调用 drdsSqlCompiler#dispatch
方法,内部包含了 CBO 和 RBO 优化,会生成 MycatRel
执行计划树。生成的执行计划树通过 RelJsonWriter
工具类转换为字符串,存储到新的执行计划基线中,最终调用 saveBaselinePlan
保存下来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public synchronized PlanResultSet add (boolean fix, DrdsSql drdsSql) { Long baselineId = null ; Baseline baseline = this .getBaseline(drdsSql); DrdsSqlCompiler drdsSqlCompiler = MetaClusterCurrent.wrapper(DrdsSqlCompiler.class); OptimizationContext optimizationContext = new OptimizationContext (); MycatRel mycatRel = drdsSqlCompiler.dispatch(optimizationContext, drdsSql); RelJsonWriter relJsonWriter = new RelJsonWriter (); mycatRel.explain(relJsonWriter); long hash = planIds.nextPlanId(); BaselinePlan newBaselinePlan = new BaselinePlan (drdsSql.getParameterizedSQL(), relJsonWriter.asString(), hash, baselineId = baseline.getBaselineId(), null ); getCodeExecuterContext(baseline,newBaselinePlan,optimizationContext, mycatRel); return saveBaselinePlan(fix, false , baseline, newBaselinePlan); }
drdsSqlCompiler#dispatch
方法负责将不同的 SQL 语句进行转换处理,如果是 SQLSelectStatement
,则会调用 compileQuery
方法,方法实现逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public MycatRel compileQuery (OptimizationContext optimizationContext, SchemaPlus plus, DrdsSql drdsSql) { RelNode logPlan; RelNodeContext relNodeContext = null ; { relNodeContext = getRelRoot(plus, drdsSql); logPlan = relNodeContext.getRoot().rel; optimizationContext.relNodeContext = relNodeContext; } RelDataType finalRowType = logPlan.getRowType(); RelNode rboLogPlan = optimizeWithRBO(logPlan); MycatRel mycatRel = optimizeWithCBO(rboLogPlan, Collections.emptyList()); if (!RelOptUtil.areRowTypesEqual(mycatRel.getRowType(), finalRowType, true )) { Project relNode = (Project) relNodeContext.getRelBuilder().push(mycatRel).rename(finalRowType.getFieldNames()).build(); mycatRel = MycatProject.create(relNode.getInput(), relNode.getProjects(), relNode.getRowType()); } return mycatRel; }
optimizeWithRBO
主要进行逻辑优化,包括:子查询优化、聚合查询优化、JOIN 顺序优化、其他优化(包括 MyCat2 自定义的优化),逻辑优化基本都采用了 HepPlanner
优化器,通过 HepProgramBuilder
添加的优化规则,builder 中可以调用 addMatchLimit
设置最大匹配次数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private RelNode optimizeWithRBO (RelNode logPlan) { Program subQueryProgram = getSubQueryProgram(); RelNode unSubQuery = subQueryProgram.run(null , logPlan, null , Collections.emptyList(), Collections.emptyList()); RelNode unAvg = resolveAggExpr(unSubQuery); ... RelNode joinClustering = toMultiJoin(unAvg).map(relNode -> { HepProgramBuilder builder = new HepProgramBuilder (); builder.addMatchLimit(1024 ); builder.addGroupBegin(); builder.addRuleInstance(MycatHepJoinClustering.Config.DEFAULT.toRule()); builder.addGroupEnd(); builder.addMatchLimit(64 ); builder.addGroupBegin(); builder.addRuleInstance(CoreRules.MULTI_JOIN_OPTIMIZE); builder.addGroupEnd(); HepPlanner planner = new HepPlanner (builder.build()); planner.setRoot(relNode); RelNode bestExp = planner.findBestExp(); return bestExp; }...; HepProgramBuilder builder = new HepProgramBuilder (); builder.addMatchLimit(1024 ); builder.addGroupBegin().addRuleCollection(ImmutableList.of(AggregateExpandDistinctAggregatesRule.Config.DEFAULT.toRule(), CoreRules.AGGREGATE_ANY_PULL_UP_CONSTANTS, CoreRules.PROJECT_MERGE, CoreRules.PROJECT_CORRELATE_TRANSPOSE, CoreRules.PROJECT_SET_OP_TRANSPOSE, CoreRules.PROJECT_JOIN_TRANSPOSE, CoreRules.PROJECT_WINDOW_TRANSPOSE, CoreRules.PROJECT_FILTER_TRANSPOSE, ProjectRemoveRule.Config.DEFAULT.toRule())).addGroupEnd().addMatchOrder(HepMatchOrder.BOTTOM_UP); builder.addMatchLimit(1024 ); builder.addGroupBegin().addRuleCollection(FILTER).addGroupEnd().addMatchOrder(HepMatchOrder.BOTTOM_UP); builder.addMatchLimit(1024 ); builder.addGroupBegin().addRuleInstance(CoreRules.PROJECT_MERGE).addGroupEnd().addMatchOrder(HepMatchOrder.ARBITRARY); builder.addMatchLimit(1024 ); builder.addGroupBegin().addRuleCollection(LocalRules.RBO_RULES).addRuleInstance(MycatAggDistinctRule.Config.DEFAULT.toRule()).addGroupEnd().addMatchOrder(HepMatchOrder.BOTTOM_UP); builder.addMatchLimit(1024 ); HepPlanner planner = new HepPlanner (builder.build()); planner.setRoot(joinClustering); RelNode bestExp = planner.findBestExp(); return bestExp; }
optimizeWithCBO
主要进行物理优化,它根据这种方案的 Cost 选择最优的执行计划。optimizeWithCBO
逻辑如下,如果 logPlan
已经是 MycatRel
,则直接返回,否则继续执行进行优化。MyCat2 物理优化中使用了许多 Calcite 内置的优化规则,同时也扩展了一些适合于 MyCat2 的规则,例如:MycatTableLookupSemiJoinRule
、MycatJoinTableLookupTransposeRule
,感兴趣的朋友可以构造相关的 SQL 研究具体的优化规则逻辑。
optimizeWithCBO
方法最后使用 MatierialRewriter
对执行计划树进行改写,MatierialRewriter
主要用于处理计算过程中需要消耗较多内存、网络调用的场景,例如:MycatNestedLoopJoin
,会将右表 right
替换为 MycatMatierial
,然后在执行时 MycatMatierial
会一次读取右表(Inner)的数据,并写入到本地文件中,这样计算 MycatNestedLoopJoin
时,MyCat2 就无需频繁地去 MySQL 中获取 Inner 表中的数据,直接从本地文件就可以快速获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public MycatRel optimizeWithCBO (RelNode logPlan, Collection<RelOptRule> relOptRules) { if (logPlan instanceof MycatRel) { return (MycatRel) logPlan; } else { RelOptCluster cluster = logPlan.getCluster(); RelOptPlanner planner = cluster.getPlanner(); planner.clear(); MycatConvention.INSTANCE.register(planner); ImmutableList.Builder<RelOptRule> listBuilder = ImmutableList.builder(); listBuilder.addAll(MycatExtraSortRule.RULES); listBuilder.addAll(LocalRules.CBO_RULES); listBuilder.add(CoreRules.JOIN_PUSH_EXPRESSIONS); listBuilder.add(CoreRules.FILTER_INTO_JOIN); listBuilder.add(CoreRules.SORT_JOIN_TRANSPOSE.config.withOperandFor(MycatTopN.class, Join.class).toRule()); listBuilder.add(CoreRules.FILTER_SET_OP_TRANSPOSE.config.toRule()); listBuilder.add(CoreRules.AGGREGATE_JOIN_TRANSPOSE.config.withOperandFor(Aggregate.class, Join.class, false ).toRule()); listBuilder.add(CoreRules.SORT_PROJECT_TRANSPOSE.config.withOperandFor(Sort.class, Project.class).toRule()); listBuilder.add(MycatViewIndexViewRule.DEFAULT_CONFIG.toRule()); if (DrdsSqlCompiler.RBO_BKA_JOIN) { listBuilder.add(MycatTableLookupSemiJoinRule.INSTANCE); listBuilder.add(MycatTableLookupCombineRule.INSTANCE); listBuilder.add(MycatJoinTableLookupTransposeRule.LEFT_INSTANCE); listBuilder.add(MycatJoinTableLookupTransposeRule.RIGHT_INSTANCE); listBuilder.add(MycatValuesJoinRule.INSTANCE); } listBuilder.build().forEach(c -> planner.addRule(c)); MycatConvention.INSTANCE.register(planner); if (relOptRules != null ) { for (RelOptRule relOptRule : relOptRules) { planner.addRule(relOptRule); } } ... logPlan = planner.changeTraits(logPlan, cluster.traitSetOf(MycatConvention.INSTANCE)); planner.setRoot(logPlan); RelNode bestExp = planner.findBestExp(); RelNode accept = bestExp.accept(new MatierialRewriter ()); return (MycatRel) accept; } }
TODO
执行代码生成1 2 3 4 5 public static Future<Void> runOnDrds (MycatDataContext dataContext, DrdsSqlWithParams drdsSqlWithParams, Response response) { PlanImpl plan = getPlan(drdsSqlWithParams); PlanImplementor planImplementor = getPlanImplementor(dataContext, response, drdsSqlWithParams); return impl(plan, planImplementor); }
TODO
结语笔者因为工作原因接触到 Calcite,前期学习过程中,深感 Calcite 学习资料之匮乏,因此创建了 Calcite 从入门到精通知识星球 ,希望能够将学习过程中的资料和经验沉淀下来,为更多想要学习 Calcite 的朋友提供一些帮助。
欢迎关注「端小强的博客 」微信公众号,会不定期分享日常学习和工作经验,欢迎大家关注交流。