关于增量复制,我的上一篇博客:MongoDB主从复制之增量同步
已经大体介绍过了,本文介绍MongoDB主从复制的全量复制流程,参考官方主从复制文档和4.2的源码实现。第一部分,将会过一下大体流程;第二部分将会从代码层面大概讲一下主体流程。
说明:标题全量同步环节也叫做initial sync,但是下文提到的全量同步是全量数据的同步(data sync),其实initial sync本身除了全量数据的同步(data sync),还包括了并行的增量数据的同步(oplog sync)。这么说可能有点绕,看完这篇文章应该就懂了。
0. 前言
对于绝大多数数据库来说,数据的复制环节都是全量环节和增量环节,其中全量环节主要拷贝全量的数据,可能是打snapshot镜像复制,也可能是直接扫描表进行复制,比如mongodb里面就是挨个扫描源库的文档写到目的库,redis里面是通过psync拉取源端bgsave后的全量rdb文件;增量通常是拉取增量修改记录,在mysql里面叫binlog,在redis里面通过psync,在mongodb里面叫做oplog。
mongodb的增量数据是存在于local库下面的oplog.rs表,所有的写操作都会产生一条oplog以表示数据库发生变化,正式因为有它的存在,才会使得下游节点(secondary)能够源源不断的获取到增量修改数据,从而使得主从数据能够保持一致。当然它是有上限的,不能无限大,对于wiredTiger来说,默认配置的是5%的磁盘大小。
1. 主要数据流
我们先来看下主要的数据流,也就是全量同步到底干了啥?
1.1 全量同步前
全量数据同步之前,需要经历一下几个步骤:
- 设置initial sync的标志位:
local.replset.minvaild
集合里面的_initialSyncFlag
置为true。如果全量阶段重启发现这个标志位为true,那么会重新从头进行全量同步。全量同步结束这个标志位设为false。 - 寻找一个sync source。如何寻找sync source,可以看我的上一篇博客中:同步源sync source的选择。
- 删除除local库以外的所有库,并重新创建oplog表
- 获取sync source的rollback id(RBID),用于在全量结束以后进行判断,同步的源在全量同步期间是否发生了回滚。
- 获取并存储sync source最新的oplog OpTime到defaultBeginFetchingOpTime变量中。这个时间戳将用于增量开始拉取的位点。
- 查找事务表里面所有目前还活跃的事务,最老的事务的OpTime(最老事务的开始的时间),赋值给beginFetchingTimestamp,如果没有,则节点采用defaultBeginFetchingOpTime。(相当于如果存在活跃未提交的事务,则可能存在
beginFetchingTimestamp < defaultBeginFetchingOpTime
,相当于oplog要从beginFetchingTimestamp开始拉取,而不能是defaultBeginFetchingOpTime) - 将最新的Oplog的opTime赋值给beginApplyingTimestamp,如果目前sync source没有活跃的事务,则
beginFetchingTimestamp==beginApplyingTimestamp
(只有有事务的情况,这个才会有差距)。注意,这里不是重新查询sync source获取一次最新oplog的OpTime,而只是将第5步得到的defaultBeginFetchingOpTime重新赋值给beginApplyingTimestamp。 - 启动OplogFetcher开始拉取源端的oplog并在本地缓存,缓存不是在内存里面,而是直接写入
local.temp_oplog_buffer
表,对应代码是initial_syncer.h
中是_oplogBuffer
变量。这个是全量开始阶段就开始执行,是为了防止源端oplog表不够大,导致增量oplog被删掉。注意对于oplogBuffer的设置,initial sync和后面增量环节不一样,initialSync的时候用的是表存储oplog,后面增量环节用的是256MB的blocking queue存储。
这里有一个小问题:为什么mongodb要在initial sync环节开始就同步增量数据,而不是等到全量结束再拉取增量?
答案就是因为全量同步势必会消耗一定的时间,那么全量同步的过程中,需要维持这个增量信息,使得全量结束以后我这个增量能够顺利的接上。比如我10:00时间开始同步全量数据,11:00全量同步完毕,开始同步增量,这个时候增量不能从11:00开始拉取,而是肯定要从10:00开始拉取,否则就会丢掉10:00-11:00中间的增量数据。这个增量的数据是存在于源端的local.oplog.rs表,而这个表又是固定大小的。而全量同步可能会花费很久的时间,比如用户有几十个T的数据。所以如果全量同步的开始,就开始拉取增量就没问题了,这个增量在目的端的临时表进行存储,initial sync结束后才会应用(write+apply)这个临时表数据到正式的oplog表里面。
1.2 全量同步时
InitialSyncer创建AllDatabaseCloner进行数据拉取。AllDatabaseCloner会请求源端所有db的列表,每个db启动一个 DatabaseCloner进行db层面的并发复制。
DatabaseCloner向源端请求当前Db所有collection的列表,对于每个collection启动一个CollectionCloner进行表级别的并发复制。
CollectionCloner同时调用listIndexes命令查找sync source的索引,并且用CollectionBulkLoader在数据复制期间并行创建索引。CollectionCloner接着采用exhaust cursor调用find命令查找源端的表数据,这个区别于普通的getMore命令,源端主动推送流式的数据给当前结点,而不需要多次query+reply交互。
如果同步期间碰到一些标记为RetriableError的错误,则会进行重试,直到重试超过initialSyncTransientErrorRetryPeriodSeconds才会标记为永久性错误。这个时候,会进行切换sync source并重试整个initial sync的流程。当然,这个切换sync source并重试的次数也有上限,就是numInitialSyncAttempts。
如果CollectionCloner拉数据期间中断了,4.3.2版本开始是可以恢复断点续传的,这是根据$_requestResumeToken
来实现的源端发送的时候携带了这个标记,接收方存储在CollectionCloner里面的内存变量中。出现中断将根据这个进行恢复。
1.3 幂等性的考虑
先全量数据,然后再增量应用是可能会存在问题的。因为DML语句是幂等的,但是DDL并不是,比如索引就会导致问题,举个例子:
- initial sync开始,secondary开始全量同步,并缓存增量数据
- 源端在foo表中插入
{a: 1, b: 1}
- 源端接着在foo表中插入
{a: 1, b: 2}
- 删除foo表
- 重建foo表
- foo表上面创建
a
的唯一索引 - 全量数据同步完毕,这个时候foo表里面只有唯一索引,没有数据,尝试同步之前缓存的增量
- 尝试插入
{a: 1, b: 1}
成功,接着插入{a: 1, b: 2}
失败。
MongoDB自身对于一些错误进行了忽略,比如DuplicateKey等,但是有些可能存在潜在问题的oplog,比如renameCollection导致出错,则会直接abort并重新触发一次initial sync。
1.4 全量同步完成后
全量完成后,节点查询sync source最新的Oplog时间作为stopTimestamp,在这之前的缓存的Oplog(oplogBuffer里面)必须同步完成才能进入secondary状态。当前结点应用本地缓存的从beginFetchingTimestamp到stopTimestamp之间的oplog。
oplog应用到stopTimestamp时候,MongoDB检查sync source的Rollback ID判断是否发生了rollback,如果有重新进行initial sync,否则则进入结束环节:
- 首先向存储引擎注册lastApplied以表示所有之前的oplog都被用户可见。
- 然后,重新构建所有prepared的事务。
- 接着,将
local.replset.minvaild
集合里面的_initialSyncFlag
置为false。并告诉存储引擎,initialDataTimestamp是节点最后applied的OpTime。 - InitialSyncer退出,ReplicationCoordinator启动进行后面的增量同步(将会重新创建新的OplogFetcher)。
2. 代码流
在src/mongo/db/repl/initial_syncer.h
文件中,画出了initial sync阶段的调用链,这里我复制了一下加了点注释,也方便后面自己查看。
start() // 入口函数
|
|
V
_setUp_inlock() // 设置minValid等一些初始化值
|
|
V
_startInitialSyncAttemptCallback() // 启动异步task执行init sync
|
|
|<-------+
| |
| | (bad sync source) // 如果sync source状态不对,将会重新选择直到重试到一定次数
| |
V |
_chooseSyncSourceCallback() // 选择同步源,若存在问题将会重试
|
|
| (good sync source found) // 选择了一个可用的sync source,代码往下组
|
|
V
_truncateOplogAndDropReplicatedDatabases() // 这个函数是清空当前结点的用户表,以及删除local下面的oplog表
|
|
V
_rollbackCheckerResetCallback() // 获取源端的RBID,用于全量同步结束以后检查源端是否发生回滚
|
|
V
lastOplogEntryFetcherCallbackForDefaultBeginFetchingOpTime() // 分配异步task获取目前最新的oplog的OpTime
|
| // 中间源码少了一个函数:_scheduleGetBeginFetchingOpTime_inlock
| // 构造find+filter+sort+readConcern+limit命令拉取活跃的最老的事务的opTime, 存入_beginFetchingOpTimeFetcher
V
getBeginFetchingOpTimeCallback() // 上面find命令的返回回调函数,返回的值设置beginFetchingOpTime,如果不存在活跃的事务,则置该值为defaultBeginFetchingOpTime。defaultBeginFetchingOpTime为最新的oplog OpTime。
|
|
V
_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp() //查询源端的fcv
|
|
V
_fcvFetcherCallback() // 获取源端fcv的回调函数
|
|
+------------------------------+
| |
| |
V V
_oplogFetcherCallback() _databasesClonerCallback
| |
| |
| V
| _lastOplogEntryFetcherCallbackForStopTimestamp()
| | |
| | |
| (no ops to apply) | | (have ops to apply)
| | |
| | V
| | _getNextApplierBatchCallback()<-----+
| | | ^ |
| | | | |
| | | (no docs fetched | |
| | | and end ts not | |
| | | reached) | |
| | | | |
| | V | |
| | _multiApplierCallback()-----+ |
| | | | |
| | | | |
| | | | (docs fetched) | (end ts not
| | | | | reached)
| | | V |
| | | _lastOplogEntryFetcherCallbackAfter-
| | | FetchingMissingDocuments()
| | | |
| | | |
| (reached end timestamp)
| | | |
| V V V
| _rollbackCheckerCheckForRollbackCallback()
| |
| |
+------------------------------+
|
|
V
_finishInitialSyncAttempt() // 右边的全量同步结束后,将会关闭左边的增量同步
|
|
V
_finishCallback() // 这里会关闭oplog_buffer
3. Q&A
Oplog是什么时候拉取的,全量开始就拉取,还是全量结束才拉取?
A: 全量开始就拉取,并缓存在本地的local.temp_oplog_buffer
表中,而不是存在内存的blocking queue。增量环境的oplog是缓存在blocking queue。
全量同步中断是否会断点续传?
A: 在4.2及以前版本只有部分标记RetriableError才可以重试,重试一定次数后还失败将会触发重新全量同步。当前节点发生重启肯定是不支持断点续传的。
在4.3.2版本开始,增加了$_requestResumeToken
,可以支持真正意义上的断点续传,相当于记录表扫描的位点。
全量同步关于oplog一共有几个时间戳?
A: 真正意义上就三个:
- 全量开始前的beginFetchingTimestamp表示开始拉取的时间戳,若不存在活跃的事务就是最新的oplog,若存在就是等于活跃事务最老的opTime(可能小于最新的oplog的opTime)
- 全量开始前的beginApplyingTimestamp表示最新的opTime。
- 全量结束后的stopTimestamp表示最新的opTime。当oplog应用到这个opTime,整个initial sync就结束了,节点状态由STARTUP2进入到SECONDARY。
也就是说,不存在事务情况下,beginFetchingTimestamp==beginApplyingTimestamp
。
强约束:beginFetchingTimestamp<=beginApplyingTimestamp
。
如果beginApplyingTimestamp==stopTimestamp
表示全量期间源端Oplog为空,没有产生任何增量。
oplogBuffer到底是blocking queue,还是collection缓存?
A: 在src/mongo/db/repl/repl_server_parameters.idl
里面指定的,初始是collection,在3.4版本是直接用的oplog buffer(大小为256M),4.2里面是用collection,这个collection对应的也就是上面我说的local.temp_oplog_buffer
临时表。
全量同步结束后,oplogFetcher会退出吗,是否会复用全量的oplogBuffer?
A: 是会退出的,不会复用。增量会新起一个oplogFetcher,包含的oplogBuffer的类型不是collection,而是blocking queue,也就是写满256MB就会阻塞住。
全量同步对应的节点状态是STARTUP还是STARTUP2?
A: STARTUP2,全量同步完毕进入SECONDARY。STARTUP是刚启动,一旦初始化加载配置文件后就进入STARTUP2。
全量阶段单表有没有并发拉取?
A: CollectionCloner初始化的时候有个maxNumClonerCursors参数,如果是1将会调用find命令从源端进行拉取,如果是大于1,将会调用parallelCollectionScan进行拉取。那么这个参数传入到底是几?答案是初始加载的是1。而且这个parallelCollectionScan看起来从4.1开始就废弃了,而且要求存储引擎是MMAPv1。参考:https://jira.mongodb.org/browse/SERVER-33998
但实际上,wiredTiger引擎本身是支持表切片的,只是目前没有server层没有支持,这是一个可以优化的点。
索引是在表的数据同步前还是同步后创建的?是前台还是后台索引?
A: 表数据同步前创建后台索引。
说明
装载请注明出错:http://vinllen.com/mongodbde-zhu-cong-quan-liang-fu-zhi-guo-cheng/
参考
https://github.com/mongodb/mongo/blob/master/src/mongo/db/repl/README.md
http://vinllen.com/mongodbzhu-cong-fu-zhi-zhi-zeng-liang-tong-bu/
https://jira.mongodb.org/browse/SERVER-33998
https://jira.mongodb.org/browse/SERVER-43268