Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

Vinllen Chen


但行好事,莫问前程

MongoDB的主从复制之全量同步

  关于增量复制,我的上一篇博客:MongoDB主从复制之增量同步 已经大体介绍过了,本文介绍MongoDB主从复制的全量复制流程,参考官方主从复制文档和4.2的源码实现。第一部分,将会过一下大体流程;第二部分将会从代码层面大概讲一下主体流程。
  说明:标题全量同步环节也叫做initial sync,但是下文提到的全量同步是全量数据的同步(data sync),其实initial sync本身除了全量数据的同步(data sync),还包括了并行的增量数据的同步(oplog sync)。这么说可能有点绕,看完这篇文章应该就懂了。

0. 前言

  对于绝大多数数据库来说,数据的复制环节都是全量环节增量环节,其中全量环节主要拷贝全量的数据,可能是打snapshot镜像复制,也可能是直接扫描表进行复制,比如mongodb里面就是挨个扫描源库的文档写到目的库,redis里面是通过psync拉取源端bgsave后的全量rdb文件;增量通常是拉取增量修改记录,在mysql里面叫binlog,在redis里面通过psync,在mongodb里面叫做oplog。
  mongodb的增量数据是存在于local库下面的oplog.rs表,所有的写操作都会产生一条oplog以表示数据库发生变化,正式因为有它的存在,才会使得下游节点(secondary)能够源源不断的获取到增量修改数据,从而使得主从数据能够保持一致。当然它是有上限的,不能无限大,对于wiredTiger来说,默认配置的是5%的磁盘大小。

1. 主要数据流

  我们先来看下主要的数据流,也就是全量同步到底干了啥?

1.1 全量同步前

全量数据同步之前,需要经历一下几个步骤:

  1. 设置initial sync的标志位:local.replset.minvaild集合里面的_initialSyncFlag置为true。如果全量阶段重启发现这个标志位为true,那么会重新从头进行全量同步。全量同步结束这个标志位设为false。
  2. 寻找一个sync source。如何寻找sync source,可以看我的上一篇博客中:同步源sync source的选择。
  3. 删除除local库以外的所有库,并重新创建oplog表
  4. 获取sync source的rollback id(RBID),用于在全量结束以后进行判断,同步的源在全量同步期间是否发生了回滚。
  5. 获取并存储sync source最新的oplog OpTime到defaultBeginFetchingOpTime变量中。这个时间戳将用于增量开始拉取的位点。
  6. 查找事务表里面所有目前还活跃的事务,最老的事务的OpTime(最老事务的开始的时间),赋值给beginFetchingTimestamp,如果没有,则节点采用defaultBeginFetchingOpTime。(相当于如果存在活跃未提交的事务,则可能存在beginFetchingTimestamp < defaultBeginFetchingOpTime,相当于oplog要从beginFetchingTimestamp开始拉取,而不能是defaultBeginFetchingOpTime)
  7. 将最新的Oplog的opTime赋值给beginApplyingTimestamp,如果目前sync source没有活跃的事务,则beginFetchingTimestamp==beginApplyingTimestamp(只有有事务的情况,这个才会有差距)。注意,这里不是重新查询sync source获取一次最新oplog的OpTime,而只是将第5步得到的defaultBeginFetchingOpTime重新赋值给beginApplyingTimestamp。
  8. 启动OplogFetcher开始拉取源端的oplog并在本地缓存,缓存不是在内存里面,而是直接写入local.temp_oplog_buffer表,对应代码是initial_syncer.h中是_oplogBuffer变量。这个是全量开始阶段就开始执行,是为了防止源端oplog表不够大,导致增量oplog被删掉。注意对于oplogBuffer的设置,initial sync和后面增量环节不一样,initialSync的时候用的是表存储oplog,后面增量环节用的是256MB的blocking queue存储。

这里有一个小问题:为什么mongodb要在initial sync环节开始就同步增量数据,而不是等到全量结束再拉取增量?
答案就是因为全量同步势必会消耗一定的时间,那么全量同步的过程中,需要维持这个增量信息,使得全量结束以后我这个增量能够顺利的接上。比如我10:00时间开始同步全量数据,11:00全量同步完毕,开始同步增量,这个时候增量不能从11:00开始拉取,而是肯定要从10:00开始拉取,否则就会丢掉10:00-11:00中间的增量数据。这个增量的数据是存在于源端的local.oplog.rs表,而这个表又是固定大小的。而全量同步可能会花费很久的时间,比如用户有几十个T的数据。所以如果全量同步的开始,就开始拉取增量就没问题了,这个增量在目的端的临时表进行存储,initial sync结束后才会应用(write+apply)这个临时表数据到正式的oplog表里面。

1.2 全量同步时

  InitialSyncer创建AllDatabaseCloner进行数据拉取。AllDatabaseCloner会请求源端所有db的列表,每个db启动一个 DatabaseCloner进行db层面的并发复制。
  DatabaseCloner向源端请求当前Db所有collection的列表,对于每个collection启动一个CollectionCloner进行表级别的并发复制。
  CollectionCloner同时调用listIndexes命令查找sync source的索引,并且用CollectionBulkLoader在数据复制期间并行创建索引。CollectionCloner接着采用exhaust cursor调用find命令查找源端的表数据,这个区别于普通的getMore命令,源端主动推送流式的数据给当前结点,而不需要多次query+reply交互。
  如果同步期间碰到一些标记为RetriableError的错误,则会进行重试,直到重试超过initialSyncTransientErrorRetryPeriodSeconds才会标记为永久性错误。这个时候,会进行切换sync source并重试整个initial sync的流程。当然,这个切换sync source并重试的次数也有上限,就是numInitialSyncAttempts。
  如果CollectionCloner拉数据期间中断了,4.3.2版本开始是可以恢复断点续传的,这是根据$_requestResumeToken来实现的源端发送的时候携带了这个标记,接收方存储在CollectionCloner里面的内存变量中。出现中断将根据这个进行恢复。

1.3 幂等性的考虑

  先全量数据,然后再增量应用是可能会存在问题的。因为DML语句是幂等的,但是DDL并不是,比如索引就会导致问题,举个例子:

  1. initial sync开始,secondary开始全量同步,并缓存增量数据
  2. 源端在foo表中插入{a: 1, b: 1}
  3. 源端接着在foo表中插入{a: 1, b: 2}
  4. 删除foo表
  5. 重建foo表
  6. foo表上面创建a的唯一索引
  7. 全量数据同步完毕,这个时候foo表里面只有唯一索引,没有数据,尝试同步之前缓存的增量
  8. 尝试插入{a: 1, b: 1}成功,接着插入{a: 1, b: 2}失败。

MongoDB自身对于一些错误进行了忽略,比如DuplicateKey等,但是有些可能存在潜在问题的oplog,比如renameCollection导致出错,则会直接abort并重新触发一次initial sync。

1.4 全量同步完成后

  全量完成后,节点查询sync source最新的Oplog时间作为stopTimestamp,在这之前的缓存的Oplog(oplogBuffer里面)必须同步完成才能进入secondary状态。当前结点应用本地缓存的从beginFetchingTimestamp到stopTimestamp之间的oplog。
  oplog应用到stopTimestamp时候,MongoDB检查sync source的Rollback ID判断是否发生了rollback,如果有重新进行initial sync,否则则进入结束环节:

  1. 首先向存储引擎注册lastApplied以表示所有之前的oplog都被用户可见。
  2. 然后,重新构建所有prepared的事务。
  3. 接着,将local.replset.minvaild集合里面的_initialSyncFlag置为false。并告诉存储引擎,initialDataTimestamp是节点最后applied的OpTime。
  4. InitialSyncer退出,ReplicationCoordinator启动进行后面的增量同步(将会重新创建新的OplogFetcher)。

2. 代码流

  在src/mongo/db/repl/initial_syncer.h文件中,画出了initial sync阶段的调用链,这里我复制了一下加了点注释,也方便后面自己查看。

start() // 入口函数  
     |
     |
     V
_setUp_inlock() // 设置minValid等一些初始化值  
     |
     |
     V
_startInitialSyncAttemptCallback() // 启动异步task执行init sync  
     |
     |
     |<-------+
     |        |
     |        | (bad sync source) // 如果sync source状态不对,将会重新选择直到重试到一定次数
     |        |
     V        |
_chooseSyncSourceCallback() // 选择同步源,若存在问题将会重试  
     |
     |
     | (good sync source found) // 选择了一个可用的sync source,代码往下组
     |
     |
     V
_truncateOplogAndDropReplicatedDatabases() // 这个函数是清空当前结点的用户表,以及删除local下面的oplog表  
     |
     |
     V
_rollbackCheckerResetCallback() // 获取源端的RBID,用于全量同步结束以后检查源端是否发生回滚  
     |
     |
     V
lastOplogEntryFetcherCallbackForDefaultBeginFetchingOpTime() // 分配异步task获取目前最新的oplog的OpTime  
     |
     |                           // 中间源码少了一个函数:_scheduleGetBeginFetchingOpTime_inlock
     |                           // 构造find+filter+sort+readConcern+limit命令拉取活跃的最老的事务的opTime, 存入_beginFetchingOpTimeFetcher
     V
getBeginFetchingOpTimeCallback() // 上面find命令的返回回调函数,返回的值设置beginFetchingOpTime,如果不存在活跃的事务,则置该值为defaultBeginFetchingOpTime。defaultBeginFetchingOpTime为最新的oplog OpTime。  
     |                           
     |                          
     V
_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp() //查询源端的fcv  
     |
     |
     V
_fcvFetcherCallback() // 获取源端fcv的回调函数  
     |
     |
     +------------------------------+
     |                              |
     |                              |
     V                              V
_oplogFetcherCallback()         _databasesClonerCallback  
     |                              |
     |                              |
     |                              V
     |                          _lastOplogEntryFetcherCallbackForStopTimestamp()
     |                              |       |
     |                              |       |
     |            (no ops to apply) |       | (have ops to apply)
     |                              |       |
     |                              |       V
     |                              |   _getNextApplierBatchCallback()<-----+
     |                              |       |                       ^       |
     |                              |       |                       |       |
     |                              |       |      (no docs fetched |       |
     |                              |       |       and end ts not  |       |
     |                              |       |       reached)        |       |
     |                              |       |                       |       |
     |                              |       V                       |       |
     |                              |   _multiApplierCallback()-----+       |
     |                              |       |       |                       |
     |                              |       |       |                       |
     |                              |       |       | (docs fetched)        | (end ts not
     |                              |       |       |                       |  reached)
     |                              |       |       V                       |
     |                              |       |   _lastOplogEntryFetcherCallbackAfter-
     |                              |       |       FetchingMissingDocuments()
     |                              |       |       |
     |                              |       |       |
     |                           (reached end timestamp)
     |                              |       |       |
     |                              V       V       V
     |                          _rollbackCheckerCheckForRollbackCallback()
     |                              |
     |                              |
     +------------------------------+
     |
     |
     V
_finishInitialSyncAttempt() // 右边的全量同步结束后,将会关闭左边的增量同步  
     |
     |
     V
_finishCallback() // 这里会关闭oplog_buffer

3. Q&A

Oplog是什么时候拉取的,全量开始就拉取,还是全量结束才拉取?

A: 全量开始就拉取,并缓存在本地的local.temp_oplog_buffer表中,而不是存在内存的blocking queue。增量环境的oplog是缓存在blocking queue。

全量同步中断是否会断点续传?

A: 在4.2及以前版本只有部分标记RetriableError才可以重试,重试一定次数后还失败将会触发重新全量同步。当前节点发生重启肯定是不支持断点续传的。
  在4.3.2版本开始,增加了$_requestResumeToken,可以支持真正意义上的断点续传,相当于记录表扫描的位点。

全量同步关于oplog一共有几个时间戳?

A: 真正意义上就三个:

  • 全量开始前的beginFetchingTimestamp表示开始拉取的时间戳,若不存在活跃的事务就是最新的oplog,若存在就是等于活跃事务最老的opTime(可能小于最新的oplog的opTime)
  • 全量开始前的beginApplyingTimestamp表示最新的opTime。
  • 全量结束后的stopTimestamp表示最新的opTime。当oplog应用到这个opTime,整个initial sync就结束了,节点状态由STARTUP2进入到SECONDARY。

也就是说,不存在事务情况下,beginFetchingTimestamp==beginApplyingTimestamp。 强约束:beginFetchingTimestamp<=beginApplyingTimestamp
如果beginApplyingTimestamp==stopTimestamp表示全量期间源端Oplog为空,没有产生任何增量。

oplogBuffer到底是blocking queue,还是collection缓存?

A: 在src/mongo/db/repl/repl_server_parameters.idl里面指定的,初始是collection,在3.4版本是直接用的oplog buffer(大小为256M),4.2里面是用collection,这个collection对应的也就是上面我说的local.temp_oplog_buffer临时表。

全量同步结束后,oplogFetcher会退出吗,是否会复用全量的oplogBuffer?

A: 是会退出的,不会复用。增量会新起一个oplogFetcher,包含的oplogBuffer的类型不是collection,而是blocking queue,也就是写满256MB就会阻塞住。

全量同步对应的节点状态是STARTUP还是STARTUP2?

A: STARTUP2,全量同步完毕进入SECONDARY。STARTUP是刚启动,一旦初始化加载配置文件后就进入STARTUP2。

全量阶段单表有没有并发拉取?

A: CollectionCloner初始化的时候有个maxNumClonerCursors参数,如果是1将会调用find命令从源端进行拉取,如果是大于1,将会调用parallelCollectionScan进行拉取。那么这个参数传入到底是几?答案是初始加载的是1。而且这个parallelCollectionScan看起来从4.1开始就废弃了,而且要求存储引擎是MMAPv1。参考:https://jira.mongodb.org/browse/SERVER-33998
但实际上,wiredTiger引擎本身是支持表切片的,只是目前没有server层没有支持,这是一个可以优化的点。

索引是在表的数据同步前还是同步后创建的?是前台还是后台索引?

A: 表数据同步前创建后台索引。

说明

装载请注明出错:http://vinllen.com/mongodbde-zhu-cong-quan-liang-fu-zhi-guo-cheng/

参考

https://github.com/mongodb/mongo/blob/master/src/mongo/db/repl/README.md
http://vinllen.com/mongodbzhu-cong-fu-zhi-zhi-zeng-liang-tong-bu/
https://jira.mongodb.org/browse/SERVER-33998
https://jira.mongodb.org/browse/SERVER-43268


About the author

vinllen chen

Beijing, China

格物致知


Discussions

comments powered by Disqus