Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

Vinllen Chen


但行好事,莫问前程

mongodb change stream流程

  本文介绍mongodb内部对于change stream的实现逻辑,以4.0版本为准。
  change stream走的是aggregate的pipeline模式,其加了一个特殊的$changestream字段,客户端发送的change stream会被翻译成对应的$changestream stage:

"pipeline" : [
  {
    "$changeStream" : {
      "fullDocument" : "default",
      "startAtOperationTime" : Timestamp(1580867312, 3)
    }
  }
],

其本质就是一个带$match的在oplog表上的$cursor,数据拉取后经过transform转换把oplog转成change stream的event。所以整个$changestream可以分成2部分,第一部分是$match,第二部分是transform。如果是sharding,则还有一个mongos上的merge操作。

1. $match

DocumentSourceChangeStream::buildMatchFilter里面或者直接通过db.watch或aggregate的explain可以看到,change stream是直接构建如下的aggregate进行拉取oplog的。为了精简,我贴的是简化的shard上$match的基本流程:

    $and:[
        {"ts": {$gte: startFrom}}, // 如果指定了resumeAfter则从这个startFrom开始,否则是$gt lastAppliedTs
        {$or: 
            [
                // opMatch
                {
                    "ns": inputNamespace,
                    $or: [ 
                        {"op": {$ne: "n"}}, // 不为noop, 表示是普通的CRUD操作
                        {"op": "n", "o2.type": "migrateChunkToNewShard"}, // chunk mirgrate到新的shard上
                    ],
                } ,
                // commandMatch:
                {
                    "op": "c",
                    $or: [
                        // commandsOnTargetDb
                        $and: [
                            {"ns": inputDb.$cmd}, //如果不是指定全局则用这个,否则是正则匹配所有除local, admin, config之外的db
                            $or: [
                                {o.drop: kRegexAllCollections}, // kRegexAllCollections表示匹配所有的collection,
                                {o.dropDatabase: {$exist: true}}, //drop database操作
                                {o.renameCollection: inputNamespace},
                            ]
                        ]
                        // renameDropTarget
                        {o.to: inputNamespace},
                    ]
                }
                // applyOps事务
                {
                    "op": "c",
                    "lsid": {$exists: true},
                    "txnNumber": {$exists: true},
                    "o.applyOps.ns": inputNamespace,
                }
            ]
        },
        {"fromMigrate": {$ne: true}}
    ]

2. transform

transform转换阶段,用于把oplog转成event(在createTransformationStage创建了DocumentSourceChangeStreamTransform)。DocumentSourceChangeStreamTransform::getNext调用applyTransformation,前者就是获取下一个change stream的event了。applyTransformation内部将会申请新的内存,然后把oplog转为change stream event。这个过程是单线程操作的。而且整个流程需要经历oplog的bson解序列化,event的序列化,还是需要消耗不少资源的。

3. 在mongos上的合并逻辑

mongos收到aggregate命令后,发现第一个stage是$changestream,会异步并发发给所有的shard进行处理,然后mongos接受cursor回复并进行merge的操作。代码的入口是ClusterAggregate::runAggregate
merge的具体处理是在establishMergingMongosCursor里面,是按照raw resumeToken和postBatchResumeToken合并的。resumeToken有2种封装方式,BinData和Hex-encoded string,无论是哪种格式,都包含了可比较的_data字段。对于BinData来说,_data字段包含clusterTime, documentKey, UUID;对于Hex-encoded string来说,_data字段就是clusterTime。所以可以用_data进行排序,因为clusterTime就是混合逻辑时钟表示的ts。从下面Explain的结果也可以看到,sort key是ts,uuid,和documentKye三者的组合。

mongos> db.watch([], {maxAwaitTimeMS: 1000000, fullDocument: "updateLookup", explain: true})  
{
    "mergeType" : "mongos",
    "splitPipeline" : {
        "shardsPart" : [
            {
                "$changeStream" : {
                    "fullDocument" : "updateLookup",
                    "startAtOperationTime" : Timestamp(1588752756, 2)
                }
            }
        ],
        "mergerPart" : [
            {
                "$sort" : {
                    "sortKey" : {
                        "_id.clusterTime.ts" : 1,
                        "_id.uuid" : 1,
                        "_id.documentKey" : 1
                    },
                    "mergePresorted" : true
                }
            },
            {
                "$_internalLookupChangePostImage" : {

                }
            }
        ]
    },
    "shards" : {
        "d-uf6be18f6f6b7e74" : {
            "host" : "11.218.89.101:3129",
            "stages" : [
                {
                    "$cursor" : {
                        "query" : {
                            // 这里就是第1节里面介绍的$match
                        },
                        "fields" : {
                            "ns" : 1,
                            "o" : 1,
                            "o2" : 1,
                            "op" : 1,
                            "ts" : 1,
                            "ui" : 1,
                            "_id" : 0
                        },
                        "queryPlanner" : {
                            "plannerVersion" : 1,
                            "namespace" : "local.oplog.rs",
                            "indexFilterSet" : false,
                            "parsedQuery" : {
                                "$and" : [
                                    // 参考前面的query请求
                                ]
                            },
                            "winningPlan" : {
                                // 省略了
                            },
                            "rejectedPlans" : [ ]
                        }
                    }
                },
                {
                    "$changeStream" : {
                        "fullDocument" : "updateLookup",
                        "startAtOperationTime" : Timestamp(1588752756, 2)
                    }
                }
            ]
        },
        "d-uf6de02da7923da4" : {
            // 同上面shard
        }
    },
    "ok" : 1,
    "operationTime" : Timestamp(1588752756, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1588752756, 1),
        "signature" : {
            "hash" : BinData(0,"UHPQX1iqGr3joKrhaMYGcvOcXnk="),
            "keyId" : NumberLong("6791488152868487169")
        }
    }
}

不同的shard也是直接过滤的fromMigrate的消息处理的。另外,在query在找到后,通过$project直接提取出来感兴趣的列:

"fields" : {
  "ns" : 1,
  "o" : 1,
  "o2" : 1,
  "op" : 1,
  "ts" : 1,
  "ui" : 1,
  "_id" : 0
},

4. Q&A

4.1 对于固定collection的watch,是否即使某个shard没有对应的表,也会发到这个shard上?

A: 对的。从watch的explain可以看到,不仅仅发到primary shard,而是发到所有的shard。

4.2 如果没有指定起始时间startAtOperationTime,那么是从什么时间开始的?

A: 从mongos的当前时间开始的,而不是真正意义上的当前时间戳。mongos的请求会带有clusterTime时间戳,shard收到以后只会返回大于等于这个clusterTime时间戳的oplog,之前的都会过滤掉。

4.3 MongoS上是否有等待的策略?shard返回的oplog就立刻返回吗?

A: 有等待策略。shard返回的oplog将会携带postBatchResumeToken,以告知mongos,当前shard下一个返回的时间肯定不会早于这个时间戳。这样,在mongos上,就可以对不同的oplog进行排序聚合,并返回早于所有shard中最小的postBatchResumeToken。

5. 总结

  目前change stream的主要瓶颈在于第二个transform转换阶段。所以有2个优化方向,但是官方计划都比较长期。

  1. 第一个阶段$match和第二个阶段transform可以合并成一个阶段。
  2. 一个change stream可以分发成多个streams进行并行处理,通过并发解析解决单线程性能不足的问题。

此外,给大家遗留一个问题,为什么不同shard的oplog拉过来直接过滤fromMigrate就可以合并,这里没有因果一致性的问题吗?答案请见我的之前博客:混合逻辑时钟文章中的“MongoShake解决sharding的move chunk问题”小节。

说明

转载请注明出错:http://vinllen.com/mongodb-change-streamliu-cheng/

参考

https://jira.mongodb.org/browse/SERVER-46979
https://developer.mongodb.com/community/forums/t/the-dispatching-and-merging-policy-about-change-stream-in-mongos/2711/2


About the author

vinllen chen

Beijing, China

格物致知


Discussions

comments powered by Disqus