在前两篇文章TiKV 源码解析系列文章(十四)Coprocessor 概览、TiKV 源码解析系列文章(十五)表达式计算框架中,讲到了 TiDB 为了最大化利用分布式计算能力,会尽量将 Selection 算子、Aggregation 算子等算子下推到 TiKV 节点上,以及下推的表达式是如何在 TiKV 上做计算的。本文将在前两篇文章的基础上,介绍下推算子的执行流程并分析下推算子的部分实现细节,加深大家对 TiKV Coprocessor 的理解。
什么是下推算子
以下边的SQL
为例子:
select*fromstudentswhereage>21limit2
TiDB 在解析完这条SQL
语句之后,会开始制定执行计划。在这个语句中, TiDB 会向 TiKV 下推一个可以用有向无环图(DAG)来描述的查询请求:
以上的DAG
是一个由一系列算子组成的有向无环图,算子在 TiKV 中称为Executor
。整个DAG
描述了查询计划在 TiKV 的执行过程。在上边的例子中,一条查询SQL
被翻译成了三个执行步骤:
扫表
选择过滤
取若干行
有了基本概念后,下面我们简单介绍一下这样的查询计划在 TiKV 内部的一个执行流程。
下推算子如何执行
绕不开的火山
TiKV 执行器是基于 Volcano Model (火山模型),一种经典的基于行的流式迭代模型。现在主流的关系型数据库都采用了这种模型,例如 Oracle,MySQL 等。
我们可以把每个算子看成一个迭代器。每次调用它的next()
方法,我们就可以获得一行,然后向上返回。而每个算子都把下层算子看成一张表,返回哪些行,返回怎么样的行由算子本身决定。举个例子:
假设我们现在对一张没有主键,没有索引的表[1]
,执行一次全表扫描操作:
select*fromtwherea>2limit2
表[1]
:
a(int) |
b(int) |
---|---|
3 | 1 |
1 | 2 |
5 | 2 |
2 | 3 |
1 | 4 |
那么我们就可以得到这样的一个执行计划:
每个算子都实现了一个Executor
的trait
, 所以每个算子都可以调用next()
来向上返回一行。
pubtraitExecutor:Send{fnnext(&mutself)->Result<Option>;// ...}当以上的请求被解析之后,我们会在 ExecutorRunner 里边不断的调用最上层算子的next() 方法, 直到其无法再返回行。pubfnhandle_request(&mutself)->Result {loop{matchself.executor.next()? {Some(row) => {// Do some aggregation.},None=> {// ...returnresult; } } } }
大概的逻辑就是:Runner
调用Limit
算子的next()
方法,然后这个时候Limit
实现的next()
方法会去调用下一层算子Selection
的next()
方法要一行上来做聚合,直到达到预设的阀值,在例子中也就是两行,接着Selection
实现的next()
又会去调用下一层算子的next()
方法, 也就是TableScan
,TableScan
的next()
实现是根据请求中的KeyRange
, 向下边的MVCC
要上一行,然后返回给上层算子, 也就是第一行(3, 1)
,Selection
收到行后根据where
字句中的表达式的值做判断,如果满足条件向上返回一行, 否则继续问下层算子要一行,此时a == 3 > 2
, 满足条件向上返回,Limit
接收到一行则判断当前收到的行数时候满两行,但是现在只收到一行,所以继续问下层算子要一行。接下来TableScan
返回(1,2), Selection
发现不满足条件,继续问TableScan
要一行也就是(5,2), Selection
发现这行满足条件,然后返回这一行,Limit
接收到一行,然后在下一次调用其next()
方法时,发现接收到的行数已经满两行,此时返回None
,Runner
会开始对结果开始聚合,然会返回一个响应结果。
引入向量化的查询引擎
当前 TiKV 引入了向量化的执行引擎,所谓的向量化,就是在Executor
间传递的不再是单单的一行,而是多行,比如TableScan
在底层MVCC Snapshot
中扫上来的不再是一行,而是说多行。自然的,在算子执行计算任务的时候,计算的单元也不再是一个标量,而是一个向量。举个例子,当遇到一个表达式:a + b
的时候, 我们不是计算一行里边a
列和b
列两个标量相加的结果,而是计算a
列和b
列两列相加的结果。
为什么要引入向量化模型呢,原因有以下几点:
对于每行我们至少得调用 1 次
next()
方法,如果DAG
的最大深度很深,为了获取一行我们需要调用更多次的next()
方法,所以在传统的迭代模型中,虚函数调用的开销非常大。如果一次next()
方法就返回多行,这样平均下来每次next()
方法就可以返回多行,而不是至多一行。由于迭代的开销非常大,整个执行的循环无法被
loop-pipelining
优化,使得整个循环流水线被卡死,IPC 大大下降。返回多行之后,每个算子内部可以采用开销较小的循环,更好利用loop-pipelining
优化。
当然向量化模型也会带来一些问题:
原先最上层算子按需向下层算子拿上一行,而现在拿上多行,内存开销自然会增加。
计算模型发生变化,原来基于标量计算的表达式框架需要重构 (详见上篇文章)。
但是这样并不影响向量化查询带来的显著的性能提升,下边是引入向量化模型后一个基准测试结果:(需要注意的是,Coprocessor 计算还只是 TPC-H 中的其中一部分,所以计算任务比重很大程度上决定了开不开向量化带来的提升比例)。
引入向量化模型后,原先的Execturor
trait 就变成了BatchExecutor
, 对应的next()
方法就成了next_batch()
。自然的next_batch
不再返回一个行,而是一个BatchExecuteResult
,上边记录了扫上来的一张表physical_columns
,以及子表中哪些行应当被保留的logical_rows
和一个is_drain
用来表示下层算子是否已经没有数据可以返回。
pubtraitBatchExecutor:Send{/// 获取表的 `schema`fnschema(&self)->&[FieldType];// 向下层算子要回一张表fnnext_batch(&mutself, scan_rows:usize)->BatchExecuteResult;// ...}pubstructBatchExecuteResult{// 本轮循环 `TableScan` 扫上来的数据pubphysical_columns: LazyBatchColumnVec,/// 记录 `physical_columns` 中有效的行的下标publogical_rows:Vec<usize>,// ...// 表示下层算子是否已经没有数据可以返回pubis_drained:Result<bool>, }
在接下来的文章中,我们将简单介绍一下几种典型算子的实现细节,旨在让大家更加熟悉各个算子的工作原理。
典型算子的实现
BatchTableScanExecutor
的实现
首先我们先明确一下BatchTableScanExecutor
的功能,TableScan
实现的next_batch()
每被调用一次,它就会从底层的实现了Storage trait
的存储层中扫上指定的行数,也就是scan_rows
行。但是由于我们在计算的时候是采用向量化的计算模型,计算都是基于列进行的,所以我们会对扫上来的行进行一次行列转换,将表从行存格式转换成列存格式。
接下来我们看看BatchTableScanExecutor
现在的定义:
pubstructBatchTableScanExecutor(ScanExecutor);
从结构体的定义中我们可以看出,BatchTableScanExecutor
依赖于ScanExecutor
,而这个ScanExecutor
依赖于一个实现Storage
的类型和具体TableScanExecutorImpl
。
其中ScanExecutor
是一个通用的结构体,其作用是为了抽象出扫表和扫索引两种操作,这两种操作都需要依赖一个Storage
而区别他们具体行为的是一个实现了ScanExecutorImpl
的结构体,在上边的定义中就是:TableScanExecutorImpl
。
pubstructScanExecutor {/// 具体的扫表/扫索引实现。imp: I,/// 给定一个 `KeyRange`,扫上一行或者多行。scanner: RangesScanner,// 标记是否已经扫完了所有的行。is_ended:bool, }
BatchTableScanExecutor
中我们需要重点关注的是其实现的BatchExecutor
, 其中最为关键的就是next_batch()
,然而其依赖于内部ScanExecutor
的BatchExecutor
实现,也就是:
fnnext_batch(&mutself, scan_rows:usize)->BatchExecuteResult {// 创建一个列数组letmutlogical_columns=self.imp.build_column_vec(scan_rows);// 扫上 `scan_rows` 行, 然后按列填充到创建好的列数组中。letis_drained=self.fill_column_vec(scan_rows, &mutlogical_columns);// 创建一个 `logical_rows`, 表示当前表中所有行有效。后边可能根据 `Selection` 的结果修改这个 `logical_rows`。letlogical_rows= (0..logical_columns.rows_len()).collect();// 判断是否扫完传入的 `KeyRange`match&is_drained {// Note: `self.is_ended` is only used for assertion purpose.Err(_) |Ok(true) =>self.is_ended =true,Ok(false) => {} };// 返回 `BatchExecuteResult`BatchExecuteResult {// ...}}
值得注意的是上边fill_column_vec
的实现, 它大概的逻辑就是每次问self.scanner
要上一个Key-Value
对, 然后扔给self.imp.process_kv_pair
处理,在扫表的实现中就是将value
看成是一个行的datum
编码,然后将每列的数据解出来然后放到建好的列数组里边去。
fnfill_column_vec( &mutself, scan_rows:usize, columns: &mutLazyBatchColumnVec, )->Result<bool> {assert!(scan_rows >0);for_in0..scan_rows {letsome_row=self.scanner.next()?;ifletSome(关键字,值))= some_row {// 将扫上来的一行放入 `columns` 中self.imp.process_kv_pair(&key, &value, columns)?; }else{// 没有 `KeyRange` 可供扫描,已经完成扫表。returnOk(true); } }// 表示下层数据还没有扫完。Ok(false) }
值得注意的是,现在表中的数据都是未经解码的生数据,所谓的生数据就是还不能直接参与到表达式计算的数据,这里采用的是一种 lazy decoding 的策略,只有要参与计算的时候,我们才会解码特定的列,而不是将数据扫上来就开始解码数据,将其变成能够直接参与计算的结构。
BatchSelectionExecutor
的实现
接下来要介绍的是BatchSelectionExecutor
的实现,我们首先来看看定义:
pubstructBatchSelectionExecutor {// ...// 数据源src: Src,// 条件表达式conditions:Vec, }
首先,BatchSelectionExecutor
需要依赖一个Src
,一个BatchExecutor
来提供数据的来源,然后是一组条件表达式,当BatchSelectionExecutor
在执行的时候会对表达式进行求值,然后根据求出的值对下层数据拉上来的行做过滤聚合,然后返回过滤出的行。
观察BatchSelectionExecutor
实现的BatchExecutor
可以发现,其中的next_batch()
方法依赖于handle_src_result ()
:
#[inline]fnnext_batch(&mutself, scan_rows:usize)->BatchExecuteResult {// 从下层算子那会一块数据开始过滤letmutsrc_result=self.src.next_batch(scan_rows);// 根据表达式的值,过滤出对应的行。ifletErr(e) =self.handle_src_result(&mutsrc_result) { src_result.is_drained = src_result.is_drained.and(Err(e)); src_result.logical_rows.clear(); }else{// ...} src_result
通过观察handle_src_result
的实现,我们可以发现,它会遍历所有表达式,对其求值,表达式的值可能是一个标量,也可能是一个向量,但是我们完全是可以把标量看成是每行都一样的向量,然后根据每行的值,将其转换成bool
,如果该行的值为true
,则在logical_rows
中保留他的下标。
fnhandle_src_result(&mutself, src_result: &mutBatchExecuteResult)->Result<()> {letmutsrc_logical_rows_copy= Vec::with_capacity(src_result.logical_rows.len());letmutcondition_index=0;whilecondition_index <self.conditions.len() && !src_result.logical_rows.is_empty() {// 拷贝一份下层算子的 `logical_rows`,用做计算表达式。src_logical_rows_copy.clear(); src_logical_rows_copy.extend_from_slice(&src_result.logical_rows);// 计算表达式的值,然后根据表达式的值去更新下层算子的 `logical_rows`。matchself.conditions[condition_index].eval( &mutself.context,self.src.schema(), &mutsrc_result.physical_columns, &src_logical_rows_copy,// 表达式产生的结果如果是一列的话, 这里表示表达式应该输出的行数src_logical_rows_copy.len(), )? { RpnStackNode::Scalar { value, .. } => {// 如果表达式是一个标量,根据转换成 `bool` 的值确定是否保留该列。update_logical_rows_by_scalar_value( &mutsrc_result.logical_rows, &mutself.context, value, )?; } RpnStackNode::Vector { value, .. } => {// 根据每行的结果,确定是否保留那行。update_logical_rows_by_vector_value( &mutsrc_result.logical_rows, &mutself.context, eval_result, eval_result_logical_rows, )?; } } condition_index +=1; }Ok(()) } }
BatchFastHashAggregationExecutor
的实现
聚合算子的种类有很多种,包括:
SimpleAggregation
(没有group by
字句,只有聚合函数)
- =>
select count(*) from t where a > 1
FastHashAggregation
(只有一个group by
column)
- =>
select count(*) from t group by a
SlowHashAggregation
(多个groub by
columns, 或者表达式值不是Hashable
的)
- =>
select sum(*) from t group by a, b
StreamAggregation
这种聚合算子假设输入已经按照group by
columns 排好序。
我们这里挑出一个比较具有代表性的算子:BatchFastHashAggregationExecutor
来进行分析。
首先要明确一下BatchFastHashAggregationExecutor
大致的执行过程,首先我们会根据group by
column 里边的值给下层算子返回的表进行分组,比如:
selectcount(*)fromtgroupbya
然后,我们会遍历每个组,然后针对每个组求出每个聚合函数的值,在这里就是:
接下来就涉及到两个重要的细节:
聚合函数如何求值。
如何根据
group_by column
对行进行分组并聚合。
后续几节我们着重介绍一下这两个细节是如何实现的。
聚合函数
每个聚合函数都会实现一个AggrFunction
这个 trait:
pubtraitAggrFunction: std::fmt::Debug+Send+'static{/// The display name of the function.fnname(&self)->&'staticstr;/// Creates a new state instance. Different states aggregate independently.fncreate_state(&self)->Box<dynAggrFunctionState>; }//NOTE:AggrFunctionState 是 AggrFunctionStateUpdatePartial 的 super traitpubtraitAggrFunctionState: std::fmt::Debug+Send+'static+ AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial {fnpush_result(&selfctx: &mutEvalContext, target: &mut[VectorValue])->Result< > ();}pubtraitAggrFunctionStateUpdatePartial {fnupdate(&mutselfctx: &mutEvalContext, value: &Option)->Result< > ();fnupdate_repeat( &mutselfctx: &mutEvalContext, value: &Option, repeat_times:usize, )->Result< > ();fnupdate_vector( &mutselfctx: &mutEvalContext, physical_values: &[Option], logical_rows: &[usize], )->Result< > ();}
聚合函数的求值过程分为三个步骤:
创建并初始化状态,这一过程一般是由调用者调用:
create_state
实现的。然后在不断遍历行/向量的过程中,我们会将行的内容传入
update/update_repeat/update_vector
函数(具体调用那种取决于不同的聚合函数实现),更新内部的状态,比如遇到一个非空行,数()
就会给自己内部计数器+1。当遍历结束之后,聚合函数就会将自己的状态通过 push_result(), 写入到一个列数组里边,这里之所以是列数组是因为聚合函数可能有多个输出列,比如 AVG(),在分布式的场景,我们需要返回两列:
SUM
和数
。
这个trait
可以通过#[derive(AggrFuntion)]
自动推导出实现,并且可以通过过程宏#[aggr_funtion(state = FooState::new())]
来指定create_state
创建出来的State
类型。举个例子,数
的实现:
/// The COUNT aggregate function.#[derive(Debug, AggrFunction)]#[aggr_function(state = AggrFnStateCount::new())]pubstructAggrFnCount;/// The state of the COUNT aggregate function.#[derive(Debug)]pubstructAggrFnStateCount{ count:usize, }implAggrFnStateCount{pubfnnew()->Self{Self{ count:0}}}implAggrFunctionStateUpdatePartialforAggrFnStateCount{/* .. */}implAggrFunctionStateforAggrFnStateCount{/* .. */}
这个时候,调用create_state()
的时候就会将内部状态 Box 起来然后返回。
如何根据group by
column 分组并聚合
BatchFastHashAggregationExecutor
内部会有一个Groups
的结构,其核心是一个HashTable
,根据group by
表达式具体的类型作为key
的类型,而value
的值则是一个AggrFunctionState
数组中该组对应的聚合函数状态集合的开始下标。举个例子:
Hash
值一样的行会被分配到同一个组中,每组会有若干个状态,聚合的过程其实就是根据每行的group by
column 找到其对应的分组 (HashTable::get),然后对组内的每一个状态,根据该行的内容进行更新。最后遍历每个组,将他们的状态写入到列数组即可。
将两个过程结合起来
上边两节讨论了聚合函数如何计算,如何分组以及如何对每个组做聚合的基本过程。现在我们通过代码,来探讨一下其中的具体细节。
先来看看BatchFastHashAggregationExecutor
的定义:
pubstructBatchFastHashAggregationExecutor( AggregationExecutor, );
我们发现,这个和BatchTableScanExecutor
的定义十分相似,区别每个聚合算子行为的是AggregationExecutor
里边实现了AggregationExecutorImpl
trait 的一个结构体。 我们也可以看看这个 trait 提供了哪些方法。
pubstructAggregationExecutor> { imp: I, is_ended:bool, entities: Entities, }pubtraitAggregationExecutorImpl:Send{// 根据 `group by` columns 和 聚合函数初始化 `entities` 中的 `schema`fnprepare_entities(&mutself, entities: &mutEntities);// 根据下层算子扫上来的数据做聚合和分组fnprocess_batch_input( &mutself, entities: &mutEntities, input_physical_columns: LazyBatchColumnVec, input_logical_rows: &[usize], )->Result< > ();// 将每个聚合函数的状态更新到列数组中,即写入聚合结果// 这里返回的是 `group by` column,在分布式场景如果不把 `group by` column 返回,`TiDB` 没有办法根据分组做二次聚合。fniterate_available_groups( &mutself, entities: &mutEntities, src_is_drained:bool, iteratee:implFnMut(&mutEntities, &[Box<dynAggrFunctionState>])->Result<()>, )->Result<Vec>; }
上边代码中的Entities
是记录源算子已经聚合函数元信息的一个结构体:
pubstructEntities {pubsrc: Src,// ...// 聚合后产生的 `schmea`, 包含 `group_by` columnspubschema:Vec,/// 聚合函数的集合pubeach_aggr_fn:Vec<Box<dynAggrFunction>>,/// 每个聚合函数输出的列大小,`COUNT` 是 1,`AVG` 是 2pubeach_aggr_cardinality:Vec<usize>,/// 聚合函数里边的表达式pubeach_aggr_exprs:Vec,// 每个聚合表达式输出的类型的集合puball_result_column_types:Vec, }
首先,为了观察到BatchFastHashAggregationExecutor
我们需要追踪他的next_batch()
的实现,在这里也就是:AggregationExecutor::handle_next_batch
:
fnhandle_next_batch(&mutself)->Result<(Option,bool)> {// 从下层算子取回一个 `batch`letsrc_result=self.entities .src .next_batch(crate::batch::runner::BATCH_MAX_SIZE);self.entities.context.warnings = src_result.warnings;letsrc_is_drained= src_result.is_drained?;// 如果下层返回的数据不为空,将根据每行的结果分组并聚合if!src_result.logical_rows.is_empty() {self.imp.process_batch_input( &mutself.entities, src_result.physical_columns, &src_result.logical_rows, )?; }// 在 `FastHashAggr` 中,只有下层算子没有办法再返回数据的时候,才能认为聚合已经完成,// 否则我们返回一个空数据给上层算子,等待下一次 `next_batch` 被调用。letresult=ifsrc_is_drained {Some(self.aggregate_partial_results(src_is_drained)?) }else{None};Ok((result, src_is_drained)) }
具体到FastHashAggr
中,process_batch_input
就是分组并更新每组的状态。aggregate_partial_results
就是写入最终的状态到列数组中。
总结
本文简略的介绍了 TiKV 查询引擎的实现原理和几个简单算子的实现,如果大家对其他算子也感兴趣的话,可以到tikv/components/tidb_query/src/batch/executors下边找到对应的实现,本文中出现的代码都经过一定删减,欢迎大家阅读 TiKV 的源码获取更多的细节。
点击查看更多TiKV 源码解析系列文章