本文介绍mongodb内部对于change stream的实现逻辑,以4.0版本为准。
change stream走的是aggregate的pipeline模式,其加了一个特殊的$changestream字段,客户端发送的change stream会被翻译成对应的$changestream stage:
"pipeline" : [
{
"$changeStream" : {
"fullDocument" : "default",
"startAtOperationTime" : Timestamp(1580867312, 3)
}
}
],
其本质就是一个带$match的在oplog表上的$cursor,数据拉取后经过transform转换把oplog转成change stream的event。所以整个$changestream可以分成2部分,第一部分是$match,第二部分是transform。如果是sharding,则还有一个mongos上的merge操作。
1. $match
在DocumentSourceChangeStream::buildMatchFilter
里面或者直接通过db.watch或aggregate的explain可以看到,change
stream是直接构建如下的aggregate进行拉取oplog的。为了精简,我贴的是简化的shard上$match的基本流程:
$and:[
{"ts": {$gte: startFrom}}, // 如果指定了resumeAfter则从这个startFrom开始,否则是$gt lastAppliedTs
{$or:
[
// opMatch
{
"ns": inputNamespace,
$or: [
{"op": {$ne: "n"}}, // 不为noop, 表示是普通的CRUD操作
{"op": "n", "o2.type": "migrateChunkToNewShard"}, // chunk mirgrate到新的shard上
],
} ,
// commandMatch:
{
"op": "c",
$or: [
// commandsOnTargetDb
$and: [
{"ns": inputDb.$cmd}, //如果不是指定全局则用这个,否则是正则匹配所有除local, admin, config之外的db
$or: [
{o.drop: kRegexAllCollections}, // kRegexAllCollections表示匹配所有的collection,
{o.dropDatabase: {$exist: true}}, //drop database操作
{o.renameCollection: inputNamespace},
]
]
// renameDropTarget
{o.to: inputNamespace},
]
}
// applyOps事务
{
"op": "c",
"lsid": {$exists: true},
"txnNumber": {$exists: true},
"o.applyOps.ns": inputNamespace,
}
]
},
{"fromMigrate": {$ne: true}}
]
2. transform
transform转换阶段,用于把oplog转成event(在createTransformationStage
创建了DocumentSourceChangeStreamTransform
)。DocumentSourceChangeStreamTransform::getNext
调用applyTransformation,前者就是获取下一个change stream的event了。applyTransformation内部将会申请新的内存,然后把oplog转为change stream event。这个过程是单线程操作的。而且整个流程需要经历oplog的bson解序列化,event的序列化,还是需要消耗不少资源的。
3. 在mongos上的合并逻辑
mongos收到aggregate命令后,发现第一个stage是$changestream,会异步并发发给所有的shard进行处理,然后mongos接受cursor回复并进行merge的操作。代码的入口是ClusterAggregate::runAggregate
。
merge的具体处理是在establishMergingMongosCursor
里面,是按照raw resumeToken和postBatchResumeToken合并的。resumeToken有2种封装方式,BinData和Hex-encoded string,无论是哪种格式,都包含了可比较的_data
字段。对于BinData来说,_data
字段包含clusterTime, documentKey, UUID;对于Hex-encoded string来说,_data
字段就是clusterTime。所以可以用_data
进行排序,因为clusterTime就是混合逻辑时钟表示的ts。从下面Explain的结果也可以看到,sort key是ts,uuid,和documentKye三者的组合。
mongos> db.watch([], {maxAwaitTimeMS: 1000000, fullDocument: "updateLookup", explain: true})
{
"mergeType" : "mongos",
"splitPipeline" : {
"shardsPart" : [
{
"$changeStream" : {
"fullDocument" : "updateLookup",
"startAtOperationTime" : Timestamp(1588752756, 2)
}
}
],
"mergerPart" : [
{
"$sort" : {
"sortKey" : {
"_id.clusterTime.ts" : 1,
"_id.uuid" : 1,
"_id.documentKey" : 1
},
"mergePresorted" : true
}
},
{
"$_internalLookupChangePostImage" : {
}
}
]
},
"shards" : {
"d-uf6be18f6f6b7e74" : {
"host" : "11.218.89.101:3129",
"stages" : [
{
"$cursor" : {
"query" : {
// 这里就是第1节里面介绍的$match
},
"fields" : {
"ns" : 1,
"o" : 1,
"o2" : 1,
"op" : 1,
"ts" : 1,
"ui" : 1,
"_id" : 0
},
"queryPlanner" : {
"plannerVersion" : 1,
"namespace" : "local.oplog.rs",
"indexFilterSet" : false,
"parsedQuery" : {
"$and" : [
// 参考前面的query请求
]
},
"winningPlan" : {
// 省略了
},
"rejectedPlans" : [ ]
}
}
},
{
"$changeStream" : {
"fullDocument" : "updateLookup",
"startAtOperationTime" : Timestamp(1588752756, 2)
}
}
]
},
"d-uf6de02da7923da4" : {
// 同上面shard
}
},
"ok" : 1,
"operationTime" : Timestamp(1588752756, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1588752756, 1),
"signature" : {
"hash" : BinData(0,"UHPQX1iqGr3joKrhaMYGcvOcXnk="),
"keyId" : NumberLong("6791488152868487169")
}
}
}
不同的shard也是直接过滤的fromMigrate的消息处理的。另外,在query在找到后,通过$project直接提取出来感兴趣的列:
"fields" : {
"ns" : 1,
"o" : 1,
"o2" : 1,
"op" : 1,
"ts" : 1,
"ui" : 1,
"_id" : 0
},
4. Q&A
4.1 对于固定collection的watch,是否即使某个shard没有对应的表,也会发到这个shard上?
A: 对的。从watch的explain可以看到,不仅仅发到primary shard,而是发到所有的shard。
4.2 如果没有指定起始时间startAtOperationTime,那么是从什么时间开始的?
A: 从mongos的当前时间开始的,而不是真正意义上的当前时间戳。mongos的请求会带有clusterTime时间戳,shard收到以后只会返回大于等于这个clusterTime时间戳的oplog,之前的都会过滤掉。
4.3 MongoS上是否有等待的策略?shard返回的oplog就立刻返回吗?
A: 有等待策略。shard返回的oplog将会携带postBatchResumeToken,以告知mongos,当前shard下一个返回的时间肯定不会早于这个时间戳。这样,在mongos上,就可以对不同的oplog进行排序聚合,并返回早于所有shard中最小的postBatchResumeToken。
5. 总结
目前change stream的主要瓶颈在于第二个transform转换阶段。所以有2个优化方向,但是官方计划都比较长期。
- 第一个阶段$match和第二个阶段transform可以合并成一个阶段。
- 一个change stream可以分发成多个streams进行并行处理,通过并发解析解决单线程性能不足的问题。
此外,给大家遗留一个问题,为什么不同shard的oplog拉过来直接过滤fromMigrate就可以合并,这里没有因果一致性的问题吗?答案请见我的之前博客:混合逻辑时钟文章中的“MongoShake解决sharding的move chunk问题”小节。
说明
转载请注明出错:http://vinllen.com/mongodb-change-streamliu-cheng/
参考
https://jira.mongodb.org/browse/SERVER-46979
https://developer.mongodb.com/community/forums/t/the-dispatching-and-merging-policy-about-change-stream-in-mongos/2711/2