TiKV 源码解析系列文章(十六)TiKV Coprocessor Executor 源码解析

邓力铭 产品技术解读 2019-12-11

在前两篇文章TiKV 源码解析系列文章(十四)Coprocessor 概览TiKV 源码解析系列文章(十五)表达式计算框架中,讲到了 TiDB 为了最大化利用分布式计算能力,会尽量将 Selection 算子、Aggregation 算子等算子下推到 TiKV 节点上,以及下推的表达式是如何在 TiKV 上做计算的。本文将在前两篇文章的基础上,介绍下推算子的执行流程并分析下推算子的部分实现细节,加深大家对 TiKV Coprocessor 的理解。

什么是下推算子

以下边的SQL为例子:

select*fromstudentswhereage>21limit2

TiDB 在解析完这条SQL语句之后,会开始制定执行计划。在这个语句中, TiDB 会向 TiKV 下推一个可以用有向无环图(DAG)来描述的查询请求:

图 1 DAG 样例

以上的DAG是一个由一系列算子组成的有向无环图,算子在 TiKV 中称为Executor。整个DAG描述了查询计划在 TiKV 的执行过程。在上边的例子中,一条查询SQL被翻译成了三个执行步骤:

  1. 扫表

  2. 选择过滤

  3. 取若干行

有了基本概念后,下面我们简单介绍一下这样的查询计划在 TiKV 内部的一个执行流程。

下推算子如何执行

绕不开的火山

TiKV 执行器是基于 Volcano Model (火山模型),一种经典的基于行的流式迭代模型。现在主流的关系型数据库都采用了这种模型,例如 Oracle,MySQL 等。

我们可以把每个算子看成一个迭代器。每次调用它的next()方法,我们就可以获得一行,然后向上返回。而每个算子都把下层算子看成一张表,返回哪些行,返回怎么样的行由算子本身决定。举个例子:

假设我们现在对一张没有主键,没有索引的表[1],执行一次全表扫描操作:

select*fromtwherea>2limit2

[1]

a(int) b(int)
3 1
1 2
5 2
2 3
1 4

那么我们就可以得到这样的一个执行计划:

图 2 DAG 执行流程样例

每个算子都实现了一个Executortrait, 所以每个算子都可以调用next()来向上返回一行。

pubtraitExecutor:Send{fnnext(&mutself)->Result<Option>;// ...}当以上的请求被解析之后,我们会在 ExecutorRunner 里边不断的调用最上层算子的next() 方法, 直到其无法再返回行。pubfnhandle_request(&mutself)->Result {loop{matchself.executor.next()? {Some(row) => {// Do some aggregation.},None=> {// ...returnresult; } } } }

大概的逻辑就是:Runner调用Limit算子的next()方法,然后这个时候Limit实现的next()方法会去调用下一层算子Selectionnext()方法要一行上来做聚合,直到达到预设的阀值,在例子中也就是两行,接着Selection实现的next()又会去调用下一层算子的next()方法, 也就是TableScanTableScannext()实现是根据请求中的KeyRange, 向下边的MVCC要上一行,然后返回给上层算子, 也就是第一行(3, 1)Selection收到行后根据where字句中的表达式的值做判断,如果满足条件向上返回一行, 否则继续问下层算子要一行,此时a == 3 > 2, 满足条件向上返回,Limit接收到一行则判断当前收到的行数时候满两行,但是现在只收到一行,所以继续问下层算子要一行。接下来TableScan返回(1,2), Selection发现不满足条件,继续问TableScan要一行也就是(5,2), Selection发现这行满足条件,然后返回这一行,Limit接收到一行,然后在下一次调用其next()方法时,发现接收到的行数已经满两行,此时返回NoneRunner会开始对结果开始聚合,然会返回一个响应结果。

引入向量化的查询引擎

当前 TiKV 引入了向量化的执行引擎,所谓的向量化,就是在Executor间传递的不再是单单的一行,而是多行,比如TableScan在底层MVCC Snapshot中扫上来的不再是一行,而是说多行。自然的,在算子执行计算任务的时候,计算的单元也不再是一个标量,而是一个向量。举个例子,当遇到一个表达式:a + b的时候, 我们不是计算一行里边a列和b列两个标量相加的结果,而是计算a列和b列两列相加的结果。

图 3 向量化计算模型与标量计算模型对比

为什么要引入向量化模型呢,原因有以下几点:

  1. 对于每行我们至少得调用 1 次next()方法,如果DAG的最大深度很深,为了获取一行我们需要调用更多次的next()方法,所以在传统的迭代模型中,虚函数调用的开销非常大。如果一次next()方法就返回多行,这样平均下来每次next()方法就可以返回多行,而不是至多一行。

  2. 由于迭代的开销非常大,整个执行的循环无法被loop-pipelining优化,使得整个循环流水线被卡死,IPC 大大下降。返回多行之后,每个算子内部可以采用开销较小的循环,更好利用loop-pipelining优化。

当然向量化模型也会带来一些问题:

  1. 原先最上层算子按需向下层算子拿上一行,而现在拿上多行,内存开销自然会增加。

  2. 计算模型发生变化,原来基于标量计算的表达式框架需要重构 (详见上篇文章)。

但是这样并不影响向量化查询带来的显著的性能提升,下边是引入向量化模型后一个基准测试结果:(需要注意的是,Coprocessor 计算还只是 TPC-H 中的其中一部分,所以计算任务比重很大程度上决定了开不开向量化带来的提升比例)。

图 4 向量化查询引擎benchmark

引入向量化模型后,原先的Execturortrait 就变成了BatchExecutor, 对应的next()方法就成了next_batch()。自然的next_batch不再返回一个行,而是一个BatchExecuteResult,上边记录了扫上来的一张表physical_columns,以及子表中哪些行应当被保留的logical_rows和一个is_drain用来表示下层算子是否已经没有数据可以返回。

pubtraitBatchExecutor:Send{/// 获取表的 `schema`fnschema(&self)->&[FieldType];// 向下层算子要回一张表fnnext_batch(&mutself, scan_rows:usize)->BatchExecuteResult;// ...}pubstructBatchExecuteResult{// 本轮循环 `TableScan` 扫上来的数据pubphysical_columns: LazyBatchColumnVec,/// 记录 `physical_columns` 中有效的行的下标publogical_rows:Vec<usize>,// ...// 表示下层算子是否已经没有数据可以返回pubis_drained:Result<bool>, }

在接下来的文章中,我们将简单介绍一下几种典型算子的实现细节,旨在让大家更加熟悉各个算子的工作原理。

典型算子的实现

BatchTableScanExecutor的实现

首先我们先明确一下BatchTableScanExecutor的功能,TableScan实现的next_batch()每被调用一次,它就会从底层的实现了Storage trait的存储层中扫上指定的行数,也就是scan_rows行。但是由于我们在计算的时候是采用向量化的计算模型,计算都是基于列进行的,所以我们会对扫上来的行进行一次行列转换,将表从行存格式转换成列存格式。

图 5 行存转列存

接下来我们看看BatchTableScanExecutor现在的定义:

pubstructBatchTableScanExecutor(ScanExecutor);

从结构体的定义中我们可以看出,BatchTableScanExecutor依赖于ScanExecutor,而这个ScanExecutor依赖于一个实现Storage的类型和具体TableScanExecutorImpl

其中ScanExecutor是一个通用的结构体,其作用是为了抽象出扫表和扫索引两种操作,这两种操作都需要依赖一个Storage而区别他们具体行为的是一个实现了ScanExecutorImpl的结构体,在上边的定义中就是:TableScanExecutorImpl

pubstructScanExecutor {/// 具体的扫表/扫索引实现。imp: I,/// 给定一个 `KeyRange`,扫上一行或者多行。scanner: RangesScanner,// 标记是否已经扫完了所有的行。is_ended:bool, }

BatchTableScanExecutor中我们需要重点关注的是其实现的BatchExecutor, 其中最为关键的就是next_batch(),然而其依赖于内部ScanExecutorBatchExecutor实现,也就是:

fnnext_batch(&mutself, scan_rows:usize)->BatchExecuteResult {// 创建一个列数组letmutlogical_columns=self.imp.build_column_vec(scan_rows);// 扫上 `scan_rows` 行, 然后按列填充到创建好的列数组中。letis_drained=self.fill_column_vec(scan_rows, &mutlogical_columns);// 创建一个 `logical_rows`, 表示当前表中所有行有效。后边可能根据 `Selection` 的结果修改这个 `logical_rows`。letlogical_rows= (0..logical_columns.rows_len()).collect();// 判断是否扫完传入的 `KeyRange`match&is_drained {// Note: `self.is_ended` is only used for assertion purpose.Err(_) |Ok(true) =>self.is_ended =true,Ok(false) => {} };// 返回 `BatchExecuteResult`BatchExecuteResult {// ...}}

值得注意的是上边fill_column_vec的实现, 它大概的逻辑就是每次问self.scanner要上一个Key-Value对, 然后扔给self.imp.process_kv_pair处理,在扫表的实现中就是将value看成是一个行的datum编码,然后将每列的数据解出来然后放到建好的列数组里边去。

fnfill_column_vec( &mutself, scan_rows:usize, columns: &mutLazyBatchColumnVec, )->Result<bool> {assert!(scan_rows >0);for_in0..scan_rows {letsome_row=self.scanner.next()?;ifletSome(关键字,值))= some_row {// 将扫上来的一行放入 `columns` 中self.imp.process_kv_pair(&key, &value, columns)?; }else{// 没有 `KeyRange` 可供扫描,已经完成扫表。returnOk(true); } }// 表示下层数据还没有扫完。Ok(false) }

值得注意的是,现在表中的数据都是未经解码的生数据,所谓的生数据就是还不能直接参与到表达式计算的数据,这里采用的是一种 lazy decoding 的策略,只有要参与计算的时候,我们才会解码特定的列,而不是将数据扫上来就开始解码数据,将其变成能够直接参与计算的结构。

BatchSelectionExecutor的实现

接下来要介绍的是BatchSelectionExecutor的实现,我们首先来看看定义:

pubstructBatchSelectionExecutor {// ...// 数据源src: Src,// 条件表达式conditions:Vec, }

首先,BatchSelectionExecutor需要依赖一个Src,一个BatchExecutor来提供数据的来源,然后是一组条件表达式,当BatchSelectionExecutor在执行的时候会对表达式进行求值,然后根据求出的值对下层数据拉上来的行做过滤聚合,然后返回过滤出的行。

观察BatchSelectionExecutor实现的BatchExecutor可以发现,其中的next_batch()方法依赖于handle_src_result ()

#[inline]fnnext_batch(&mutself, scan_rows:usize)->BatchExecuteResult {// 从下层算子那会一块数据开始过滤letmutsrc_result=self.src.next_batch(scan_rows);// 根据表达式的值,过滤出对应的行。ifletErr(e) =self.handle_src_result(&mutsrc_result) { src_result.is_drained = src_result.is_drained.and(Err(e)); src_result.logical_rows.clear(); }else{// ...} src_result

通过观察handle_src_result的实现,我们可以发现,它会遍历所有表达式,对其求值,表达式的值可能是一个标量,也可能是一个向量,但是我们完全是可以把标量看成是每行都一样的向量,然后根据每行的值,将其转换成bool,如果该行的值为true,则在logical_rows中保留他的下标。

fnhandle_src_result(&mutself, src_result: &mutBatchExecuteResult)->Result<()> {letmutsrc_logical_rows_copy= Vec::with_capacity(src_result.logical_rows.len());letmutcondition_index=0;whilecondition_index <self.conditions.len() && !src_result.logical_rows.is_empty() {// 拷贝一份下层算子的 `logical_rows`,用做计算表达式。src_logical_rows_copy.clear(); src_logical_rows_copy.extend_from_slice(&src_result.logical_rows);// 计算表达式的值,然后根据表达式的值去更新下层算子的 `logical_rows`。matchself.conditions[condition_index].eval( &mutself.context,self.src.schema(), &mutsrc_result.physical_columns, &src_logical_rows_copy,// 表达式产生的结果如果是一列的话, 这里表示表达式应该输出的行数src_logical_rows_copy.len(), )? { RpnStackNode::Scalar { value, .. } => {// 如果表达式是一个标量,根据转换成 `bool` 的值确定是否保留该列。update_logical_rows_by_scalar_value( &mutsrc_result.logical_rows, &mutself.context, value, )?; } RpnStackNode::Vector { value, .. } => {// 根据每行的结果,确定是否保留那行。update_logical_rows_by_vector_value( &mutsrc_result.logical_rows, &mutself.context, eval_result, eval_result_logical_rows, )?; } } condition_index +=1; }Ok(()) } }

BatchFastHashAggregationExecutor的实现

聚合算子的种类有很多种,包括:

  1. SimpleAggregation(没有group by字句,只有聚合函数)
  • =>select count(*) from t where a > 1
  1. FastHashAggregation(只有一个group bycolumn)
  • =>select count(*) from t group by a
  1. SlowHashAggregation(多个groub bycolumns, 或者表达式值不是Hashable的)
  • =>select sum(*) from t group by a, b
  1. StreamAggregation这种聚合算子假设输入已经按照group bycolumns 排好序。

我们这里挑出一个比较具有代表性的算子:BatchFastHashAggregationExecutor来进行分析。

首先要明确一下BatchFastHashAggregationExecutor大致的执行过程,首先我们会根据group bycolumn 里边的值给下层算子返回的表进行分组,比如:

selectcount(*)fromtgroupbya

图 6 根据 `a` 列的值对行进行分组

然后,我们会遍历每个组,然后针对每个组求出每个聚合函数的值,在这里就是:

图 7 遍历每个组求出聚合函数的值

接下来就涉及到两个重要的细节:

  1. 聚合函数如何求值。

  2. 如何根据group_by column对行进行分组并聚合。

后续几节我们着重介绍一下这两个细节是如何实现的。

聚合函数

每个聚合函数都会实现一个AggrFunction这个 trait:

pubtraitAggrFunction: std::fmt::Debug+Send+'static{/// The display name of the function.fnname(&self)->&'staticstr;/// Creates a new state instance. Different states aggregate independently.fncreate_state(&self)->Box<dynAggrFunctionState>; }//NOTE:AggrFunctionState 是 AggrFunctionStateUpdatePartial 的 super traitpubtraitAggrFunctionState: std::fmt::Debug+Send+'static+ AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial + AggrFunctionStateUpdatePartial {fnpush_result(&selfctx: &mutEvalContext, target: &mut[VectorValue])->Result< > ();}pubtraitAggrFunctionStateUpdatePartial {fnupdate(&mutselfctx: &mutEvalContext, value: &Option)->Result< > ();fnupdate_repeat( &mutselfctx: &mutEvalContext, value: &Option, repeat_times:usize, )->Result< > ();fnupdate_vector( &mutselfctx: &mutEvalContext, physical_values: &[Option], logical_rows: &[usize], )->Result< > ();}

聚合函数的求值过程分为三个步骤:

  1. 创建并初始化状态,这一过程一般是由调用者调用:create_state实现的。

  2. 然后在不断遍历行/向量的过程中,我们会将行的内容传入update/update_repeat/update_vector函数(具体调用那种取决于不同的聚合函数实现),更新内部的状态,比如遇到一个非空行,数()就会给自己内部计数器+1。

  3. 当遍历结束之后,聚合函数就会将自己的状态通过 push_result(), 写入到一个列数组里边,这里之所以是列数组是因为聚合函数可能有多个输出列,比如 AVG(),在分布式的场景,我们需要返回两列:SUM

这个trait可以通过#[derive(AggrFuntion)]自动推导出实现,并且可以通过过程宏#[aggr_funtion(state = FooState::new())]来指定create_state创建出来的State类型。举个例子,的实现:

/// The COUNT aggregate function.#[derive(Debug, AggrFunction)]#[aggr_function(state = AggrFnStateCount::new())]pubstructAggrFnCount;/// The state of the COUNT aggregate function.#[derive(Debug)]pubstructAggrFnStateCount{ count:usize, }implAggrFnStateCount{pubfnnew()->Self{Self{ count:0}}}implAggrFunctionStateUpdatePartialforAggrFnStateCount{/* .. */}implAggrFunctionStateforAggrFnStateCount{/* .. */}

这个时候,调用create_state()的时候就会将内部状态 Box 起来然后返回。

如何根据group bycolumn 分组并聚合

BatchFastHashAggregationExecutor内部会有一个Groups的结构,其核心是一个HashTable,根据group by表达式具体的类型作为key的类型,而value的值则是一个AggrFunctionState数组中该组对应的聚合函数状态集合的开始下标。举个例子:

图 8 遍历所有聚合函数创建的状态

Hash值一样的行会被分配到同一个组中,每组会有若干个状态,聚合的过程其实就是根据每行的group bycolumn 找到其对应的分组 (HashTable::get),然后对组内的每一个状态,根据该行的内容进行更新。最后遍历每个组,将他们的状态写入到列数组即可。

将两个过程结合起来

上边两节讨论了聚合函数如何计算,如何分组以及如何对每个组做聚合的基本过程。现在我们通过代码,来探讨一下其中的具体细节。

先来看看BatchFastHashAggregationExecutor的定义:

pubstructBatchFastHashAggregationExecutor( AggregationExecutor, );

我们发现,这个和BatchTableScanExecutor的定义十分相似,区别每个聚合算子行为的是AggregationExecutor里边实现了AggregationExecutorImpltrait 的一个结构体。 我们也可以看看这个 trait 提供了哪些方法。

pubstructAggregationExecutor> { imp: I, is_ended:bool, entities: Entities, }pubtraitAggregationExecutorImpl:Send{// 根据 `group by` columns 和 聚合函数初始化 `entities` 中的 `schema`fnprepare_entities(&mutself, entities: &mutEntities);// 根据下层算子扫上来的数据做聚合和分组fnprocess_batch_input( &mutself, entities: &mutEntities, input_physical_columns: LazyBatchColumnVec, input_logical_rows: &[usize], )->Result< > ();// 将每个聚合函数的状态更新到列数组中,即写入聚合结果// 这里返回的是 `group by` column,在分布式场景如果不把 `group by` column 返回,`TiDB` 没有办法根据分组做二次聚合。fniterate_available_groups( &mutself, entities: &mutEntities, src_is_drained:bool, iteratee:implFnMut(&mutEntities, &[Box<dynAggrFunctionState>])->Result<()>, )->Result<Vec>; }

上边代码中的Entities是记录源算子已经聚合函数元信息的一个结构体:

pubstructEntities {pubsrc: Src,// ...// 聚合后产生的 `schmea`, 包含 `group_by` columnspubschema:Vec,/// 聚合函数的集合pubeach_aggr_fn:Vec<Box<dynAggrFunction>>,/// 每个聚合函数输出的列大小,`COUNT` 是 1,`AVG` 是 2pubeach_aggr_cardinality:Vec<usize>,/// 聚合函数里边的表达式pubeach_aggr_exprs:Vec,// 每个聚合表达式输出的类型的集合puball_result_column_types:Vec, }

首先,为了观察到BatchFastHashAggregationExecutor我们需要追踪他的next_batch()的实现,在这里也就是:AggregationExecutor::handle_next_batch

fnhandle_next_batch(&mutself)->Result<(Option,bool)> {// 从下层算子取回一个 `batch`letsrc_result=self.entities .src .next_batch(crate::batch::runner::BATCH_MAX_SIZE);self.entities.context.warnings = src_result.warnings;letsrc_is_drained= src_result.is_drained?;// 如果下层返回的数据不为空,将根据每行的结果分组并聚合if!src_result.logical_rows.is_empty() {self.imp.process_batch_input( &mutself.entities, src_result.physical_columns, &src_result.logical_rows, )?; }// 在 `FastHashAggr` 中,只有下层算子没有办法再返回数据的时候,才能认为聚合已经完成,// 否则我们返回一个空数据给上层算子,等待下一次 `next_batch` 被调用。letresult=ifsrc_is_drained {Some(self.aggregate_partial_results(src_is_drained)?) }else{None};Ok((result, src_is_drained)) }

具体到FastHashAggr中,process_batch_input就是分组并更新每组的状态。aggregate_partial_results就是写入最终的状态到列数组中。

总结

本文简略的介绍了 TiKV 查询引擎的实现原理和几个简单算子的实现,如果大家对其他算子也感兴趣的话,可以到tikv/components/tidb_query/src/batch/executors下边找到对应的实现,本文中出现的代码都经过一定删减,欢迎大家阅读 TiKV 的源码获取更多的细节。

点击查看更多TiKV 源码解析系列文章

目录