MongoDB 变更流(Change Stream)介绍
阅读原文时间:2023年07月09日阅读:1

1. 什么是Change Stream
Change Stream 是MongoDB用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:

|              |  Change Stream  |  触发器        |  
|--------------|-----------------|---------------|  
|   触发方式    |  异步           |  同步(事务保证) |  
|   触发位置    |  应用回调事件    |  数据库触发器   |  
|   触发次数    |  每个订阅事件的客户端  |  1次(触发器) |  
|   故障恢复    |  从上次断点重新触发  |  事务回滚    |  

2. Change Stream 实现原理
Change Stream 是基于oplog实现的。它在oplog上开启一个tailable cursor 来追踪复制集上的变更操作,
最终调用应用中定义的回调函数。被追踪的变更事件主要包括:

· insert/update/delete:插入、更新、删除;  

. drop:集合被删除;  

. rename:集合被重命名;  

. dropDatabase:数据库被删除;  

. invalidate: drop/rename/dropDatabase 将导致invalidate被触发,并关闭 change stream;  

3. Change Stream 与可重复读
Change Stream 只推送已经在大多数节点上提交的变更操作。即“可重复读”的变更。
这个验证是通过{readConcern:"majority"}实现的。因此:

· 未开启majority readConcern的集群无法使用Change Stream;  

· 当集群无法满足{w:"majority"}时,不会触发Change Stream(例如PSA架构中的S因故宕机)。  

4. Change Stream 变更过滤
如果只对某些类型的变更事件感兴趣,可以使用聚合管道的过滤步骤过滤事件。
例如:
```
var cs = db.collection.watch([{
$match: {
operationType: {
$in: ['insert', 'delete']
}
}
}])
```
5. Change Stream 故障恢复
假设在一系列写入操作的过程中,订阅Change Stream的应用在接收到“写3”之后于t0时刻崩溃,重启后后续的变更怎么办?
[!qr](./images/026_t_1.png)
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的_id即可。
右侧所示是一次Change Stream回调所返回的数据。每条这样的数据都带有一个_id,这个_id可以用于断点恢复。例如:

var cs = db.collection.watch(\[\],{resumeAfter:<\_id>})  
即可从上一条通知中断处继续获取后续的变更通知。  

6. Change Stream 使用场景
· 跨集群的变更复制——在源集群中订阅Change Stream, 一旦得到任何变更立即写入目标集群。

· 微服务联动——当一个微服务变更数据库时,其他微服务得到通知并做出响应的变更。  

· 其他任何需要联动的场景。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章