介绍
论文原名: DBLog: A Watermark Based Change-Data-Capture Framework
, 基于 Watermark
的 Change-Data-Capture
(数据库实时捕获已提交的变更记录) 框架, 本质上是解决数据库同步(全量+增量)的框架, Watermark
是框架使用的一种手段, 在源表中创建表,生成唯一 uuid 并更新表数据, 在源表中就会生成一条变更记录,记作 Watermark
的变更记录, 通过 High Watermark
和 Low Watermark
将变更记录分割, 保证 select chunk 数据包含了增量的变更记录.
框架整体架构如下:
框架特点:
- 按顺序处理捕获到的
changelog
; - 转储可以随时进行,跨所有表,针对一个特定的表或者针对一个表的具体主键;
- 以块(chunk)的形式获取转储,日志与转储事件交错。通过这种方式,
changelog
可以与转储处理一起进行。如果进程终止,它可以在最后一个完成的块之后恢复,而不需要从头开始。这还允许在需要时对转储进行调整和暂停; - 不会获取表级锁,这可以防止影响源数据库上的写流量;
- 支持任何类型的输出,因此,输出可以是流、数据存储甚或是 API;
- 设计充分考虑了高可用性。因此,下游的消费者可以放心,只要源端发生变化,它们就可以收到变化事件。
注意, 本文并非详细介绍 DBLog
框架本身, 而是分析其框架背后的设计思路.
算法流程
chunk 划分
对于源表数据, 全量数据使用分块读取, 基于 primary key
顺序排序, 将全量数据划分为 N 个 chunk;
watermark
基于 chunk 划分, 然后 chunk 数据全量写入下游之后, 再将源表的变更记录 changelog
增量同步到下游, 整体思路就是这样, 但是划分 chunk 有个问题需要解决, 就是先同步到下游的数据不一定的最终的数据, 例如上图 chunk1 中的数据在同步到下游之后可能会删除, 那chunk1 的数据写到下游之后, 下游就会出现脏数据; 如何解决 chunk 和 changelog
之间不会相互覆盖的问题?
为了解决这一问题, DBLog 的解决办法是引入 watermark
的机制, 在查询 chunk 期间, 对 changelog
进行标记 , 然后去移除 select chunk 期间, chunk 数据中对应的 changelog
数据, 这样就解决了 select chunk 数据和期间对应的 changelog
数据的顺序问题 , 这也就是论文的精妙之处!
watermark
是通过源数据库中的一个表实现的, 表存储在专用的命名空间中,因此不会与应用程序表发生冲突。改表只包含一行,存储 UUID
字段; 通过将这一行更新为特定的 UUID
来生成watermark
, 行更新生成一个更改事件。
算法伪码
算法伪代码如下:
// step1 暂停处理 changelog
pause log event processing
lw := uuid()
hw := uuid()
// step2 通过更新watermark表生成 low watermark
update watermark table set value = lw
// step3 为下一个块运行 SELECT 语句,并将结果集存储在内存中,按主键索引
chunk := select next chunk from table
// step4 通过更新watermark表生成 high watermark
update watermark table set value = hw
// step5 恢复处理 changelog, 监听 high watermark
resume log event processing
inwindow := false
// other steps of event prosessing loop
while true do
e := next event from changelog
if not inwindow then{
if e is not watermark then
append e to outputbuffer
else if e is watermark with value lw then
inwindow := true
}else{
if e is not watermark then{
// step 6 接收到lw事件后,开始从结果集中删除所有在lw之后接收到的changelog主键的条目
if chunk contains e.key then{
remove e.key from chunk
}
append e to outputbuffer
}else if e is watermark with value hw then {
// step7 接收到hw事件后,将所有剩余的结果集条目发送到输出
for each row in chunk do{
append row to outputbuffer
}
}
}
算法步骤分析
示例表,k1 到 k6 为主键。每个更改日志条目表示主键的创建、更新或删除事件。在下图展示了watermark
的生成和chunk的选择(step 1 到 step 4)。重点看从位于watermark
之间的主键结果: 删除选定chunk的行(step 5 到 step 7)。
如果一个或多个事务在 lw
和 hw
之间提交了大量的行更改,则可能会出现大量的 changelog
。这就是为什么论文在 step 2-4 期间会短暂地暂停日志处理,从而保证不会遗漏watermark
, 这样changelog
处理就可以在以后逐个事件地恢复。日志处理暂停的时间很短,因为 step 2-4 预计会比较快:水印更新是单个的写操作,而 SELECT
操作有一定的范围, 可能耗时较长。
在第 7 步接收到hw
后,非冲突的chunk块将被提交写入,以便按顺序发送到下游。这是一个非阻塞操作,因为写入在单独的线程中运行,允许在step 7 之后快速恢复日志处理。然后,changelog
处理将继续处理hw
之后发生的事件。
在下图中,使用上图相同的示例来描述整个同步数据的写顺序, 出现在hw
之前的changelog
首先被写入, 然后是chunk结果(被修正后), 最后是在hw
之后发生的changelog
。
问题
当chunk划分完之后, 进入增量的 binlog offset 应该如何选取?
chunk 划分过程中,实际上已经把 lw
到 hw
范围中的增量数据, 合并到 chunk 的全量数据中(只合并chunk范围内的数据), 因此, 增量阶段的 binlog offset 应该要从chuck中最小的 hw
开始, 一直到最大的 hw
为止, 这个范围内的数据, 如果 binlog 已经包含在 chunk 中, 就无需处理, 否则往下游发送;
增量阶段开始的 offset 为什么不是最小的 lw
? 如果从chuck中最小的 hw
开始, 那从 lw - hw
这一段的 binlog中, 没被合并到 chunk 中的 binlog不就漏掉了?
如下图, [lw1, hw1] 这个范围中, 如果在 chunk 中, 会合并, 如果不在 chunk 中, 这个范围的数据在下一个 chunk 的全量阶段能查询到, 所以从 min(hw)
的 offset 开启增量阶段即可.
多个 chunk 读取是并行的, 如果两个 chunk 的 lw
到 hw
binlog 数据有交叉, 或者 chunk 乱序, 是否会影响结果?
在增量阶段开始时, 会从 min(hw)
的 offset 开始消费 binlog, 直到 max(hw)
, 此范围中的数据, 如果是不在对应的 chunk 中(全量阶段已经合并), 则会向下游发送;
当 watermark 有交叉时, 交叉部分的 binlog 会依次在不同的 chunk 中进行判断, 因此不会重复; 当 watermark 乱序时, 全量阶段,每个 chunk 划分的范围不同, 乱序没有影响, 至于 binlog 消费, 也是顺序的, 不会影响增量阶段的顺序;
参考
https://arxiv.org/pdf/2010.12597.pdf
https://netflixtechblog.com/dblog-a-generic-change-data-capture-framework-69351fb9099b