TiKV 源码解析系列文章(十)Snapshot 的发送和接收

黄梦龙 产品技术解读 2019-07-09

背景知识

TiKV 使用 Raft 算法来提供高可用且具有强一致性的存储服务。在 Raft 中,Snapshot 指的是整个 State Machine 数据的一份快照,大体上有以下这几种情况需要用到 Snapshot:

  1. 正常情况下 leader 与 follower/learner 之间是通过 append log 的方式进行同步的,出于空间和效率的考虑,leader 会定期清理过老的 log。假如 follower/learner 出现宕机或者网络隔离,恢复以后可能所缺的 log 已经在 leader 节点被清理掉了,此时只能通过 Snapshot 的方式进行同步。
  2. 筏加入新的节点的,由于新节点没同步过任何日志,只能通过接收快照的方式来同步。实际上这也可以认为是 1 的一种特殊情形。
  3. 出于备份/恢复等需求,应用层需要 dump 一份 State Machine 的完整数据。

TiKV 涉及到的是 1 和 2 这两种情况。在我们的实现中,Snapshot 总是由 Region leader 所在的 TiKV 生成,通过网络发送给 Region follower/learner 所在的 TiKV。

理论上讲,我们完全可以把 Snapshot 当作普通的RaftMessage来发送,但这样做实践上会产生一些问题,主要是因为 Snapshot 消息的尺寸远大于其他RaftMessage

  1. Snapshot 消息需要花费更长的时间来发送,如果共用网络连接容易导致网络拥塞,进而引起其他 Region 出现 Raft 选举超时等问题。
  2. 构建待发送 Snapshot 消息需要消耗更多的内存。
  3. 过大的消息可能导致超出 gRPC 的 Message Size 限制等问题。

基于上面的原因,TiKV 对 Snapshot 的发送和接收进行了特殊处理,为每个 Snapshot 创建单独的网络连接,并将 Snapshot 拆分成 1M 大小的多个 Chunk 进行传输。

源码解读

下面我们分别从 RPC 协议、发送 Snapshot、收取 Snapshot 三个方面来解读相关源代码。本文的所有内容都基于 v3.0.0-rc.2 版本。

Snapshot RPC call 的定义

与普通的 raft message 类似,Snapshot 消息也是使用 gRPC 远程调用的方式来传输的。在pingcap/kvproto项目中可以找到相关 RPC Call 的定义,具体在tikvpb.protoraft_serverpb.proto文件中。

rpc Snapshot(stream raft_serverpb.SnapshotChunk) returns (raft_serverpb.Done) {} ... message SnapshotChunk { RaftMessage message=1;bytes data=2;} message Done {}

可以看出,Snapshot 被定义成 client streaming 调用,即对于每个 Call,客户端依次向服务器发送多个相同类型的请求,服务器接收并处理完所有请求后,向客户端返回处理结果。具体在这里,每个请求的类型是SnapshotChunk,其中包含了 Snapshot 对应的RaftMessage,或者携带一段 Snapshot 数据;回复消息是一个简单的空消息Done,因为我们在这里实际不需要返回任何信息给客户端,只需要关闭对应的 stream。

Snapshot 的发送流程

Snapshot 的发送过程的处理比较简单粗暴,直接在将要发送RaftMessage的地方截获 Snapshot 类型的消息,转而通过特殊的方式进行发送。相关代码可以在server/transport.rs中找到:

fnwrite_data(&self, store_id:u64, addr: &str, msg: RaftMessage) {ifmsg.get_message().has_snapshot() {returnself.send_snapshot_sock(addr, msg); }ifletErr(e) =self.raft_client.wl().send(store_id, addr, msg) { error!("send raft msg err";"err"=> ?e); } }fnsend_snapshot_sock(&self, addr: &str, msg: RaftMessage) { ...ifletErr(e) =self.snap_scheduler.schedule(SnapTask::Send{ addr: addr.to_owned(), msg, cb, }) { ... } }

从代码中可以看出,这里简单地把对应的RaftMessage包装成一个SnapTask::Send任务,并将其交给独立的snap-worker去处理。值得注意的是,这里的RaftMessage只包含 Snapshot 的元信息,而不包括真正的快照数据。TiKV 中有一个单独的模块叫做SnapManager,用来专门处理数据快照的生成与转存,稍后我们将会看到从SnapManager模块读取 Snapshot 数据块并进行发送的相关代码。

我们不妨顺藤摸瓜来看看snap-worker是如何处理这个任务的,相关代码在server/snap.rs,精简掉非核心逻辑后的代码引用如下:

fnrun(&mutself, task: Task) {matchtask { Task::Recv { stream, sink } => { ...letf=recv_snap(stream, sink, ...).then(move|result| { ... });self.pool.spawn(f).forget(); } Task::Send{ addr, msg, cb } => { ...letf= future::result(send_snap(..., &addr, msg)) .flatten() .then(move|res| { ... });self.pool.spawn(f).forget(); } } }

snap-worker使用了future来完成收发 Snapshot 任务:通过调用send_snap()recv_snap()生成一个 future 对象,并将其交给FuturePool异步执行。

现在我们暂且只关注send_snap()实现

fnsend_snap( ... addr: &str, msg: RaftMessage, )->Result<implFuture> { ...letkey= {letsnap= msg.get_message().get_snapshot(); SnapKey::from_snap(snap)? }; ...lets= box_try!(mgr.get_snapshot_for_sending(&key));if!s.exists() {returnErr(box_err!("missing snap file: {:?}", s.path())); }lettotal_size= s.total_size()?;letchunks= {letmutfirst_chunk= SnapshotChunk::new(); first_chunk.set_message(msg); SnapChunk { first:Some(first_chunk), snap: s, remain_bytes: total_sizeasusize, } };letcb= ChannelBuilder::new(env);letchannel= security_mgr.connect(cb, addr);letclient= TikvClient::new(channel);let(sink, receiver) = client.snapshot()?;letsend= chunks.forward(sink).map_err(Error::from);letsend= send .and_then(|(s, _)| receiver.map_err(Error::from).map(|_| s)) .then(move|result| { ... });Ok(send) }

这一段流程还是比较清晰的:先是用 Snapshot 元信息从SnapManager取到待发送的快照数据,然后将RaftMessageSnap一起封装进SnapChunk结构,最后创建全新的 gRPC 连接及一个 Snapshot stream 并将SnapChunk写入。这里引入SnapChunk是为了避免将整块 Snapshot 快照一次性加载进内存,它 impl 了futures::Stream这个 trait 来达成按需加载流式发送的效果。如果感兴趣可以参考它的具体实现,本文就暂不展开了。

Snapshot 的收取流程

最后我们来简单看一下 Snapshot 的收取流程,其实也就是 gRPC Call 的 server 端对应的处理,整个流程的入口我们可以在server/service/kv.rs中找到:

fnsnapshot( &mutself, ctx: RpcContext<'_>, stream: RequestStream, sink: ClientStreamingSink, ) {lettask= SnapTask::Recv { stream, sink };ifletErr(e) =self.snap_scheduler.schedule(task) { ... } }

与发送过程类似,也是直接构建SnapTask::Recv任务并转发给snap-worker了,这里会调用上面出现过的recv_snap()函数,具体实现如下:

fnrecv_snap'static>( stream: RequestStream, sink: ClientStreamingSink, ... )->implFuture { ...letf= stream.into_future().map_err(|(e, _)| e).and_then(move|(head, chunks)|->Box<dynFuture +Send> {letcontext=matchRecvSnapContext::new(head, &snap_mgr) {Ok(context) => context,Err(e) =>returnBox::new(future::err(e)), }; ...letrecv_chunks= chunks.fold(context, |mutcontext,mutchunk|->Result<_> {letdata= chunk.take_data(); ...ifletErr(e) = context.file.as_mut().unwrap().write_all(&data) { ... }Ok(context) }); Box::new( recv_chunks .and_then(move|context| context.finish(raft_router)) .then(move|r| { snap_mgr.deregister(&context_key, &SnapEntry::Receiving); r }), ) }, ); f.then(move|res|matchres { ... }) .map_err(Error::from) }

值得留意的是 stream 中的第一个消息(其中包含有RaftMessage)被用来创建RecvSnapContext对象,其后的每个 chunk 收取后都依次写入文件,最后调用context.finish()把之前保存的RaftMessage发送给raftstore完成整个接收过程。

总结

以上就是 TiKV 发送和接收 Snapshot 相关的代码解析了。这是 TiKV 代码库中较小的一个模块,它很好地解决了由于 Snapshot 消息特殊性所带来的一系列问题,充分应用了grpc-rs组件及futures/FuturePool模型,大家可以结合本系列文章的《TiKV 源码解析系列文章(七)gRPC Server 的初始化和启动流程》《TiKV 源码解析系列文章(八)grpc-rs 的封装与实现》进一步拓展学习。

点击查看更多TiKV 源码解析系列文章

目录