canal是一个订阅消费模式的服务,于是其消费位点的控制就非常重要,在正常情况或者重启canal之后都要保证不重复消费同一个位点,也不能漏掉某一个位点。本文我们来看看canal是如何控制消费位点的。
首先我们来分析客户端是如何工作,前文canal——代码初读我们分析过CanalMQStarter
的启动过程会为每个destination都新建一个CanalMQRunnable
,在不同的线程中来执行CanalMQRunnable
。CanalMQRunnable
的执行调用CanalMQStarter
的worker
方法。数据的消费与传输都在这个worker
方法中执行。
CanalMQStarter.work
首先是一个准备过程:
- 新建ClientIdentity,作为client的标识
- 从
canalServer
(CanalServerWithEmbedded
)中获取instance - 新建
CanalDestination
,这个类保存了mq的配置信息 调用
canalServer.subscribe
向CanalServerWithEmbedded
中注册client的信息- 调用
checkStart
确保destination对应的instance已经启动 - 获取instance中保存的
CanalMetaManager
,CanalMetaManager
是meta信息的管理器,其中保存着关键的position信息 - 判断
CanalMetaManager
是否已经启动,如果没有启动则调用start()
方法启动CanalMetaManager。
正常情况下CanalMetaManager
在instance启动时已经被启动了。 - 调用
CanalMetaManager
的subscribe
方法在CanalMetaManager
中增加订阅信息 - 调用
CanalMetaManager
的getCursor
方法获取当前订阅binlog的位置信息 如果position为null,说明之前没有记录position信息:
- 从
CanalEventStore
中获取保存的第一条数据的position信息 - 如果
CanalEventStore
中保存的position信息不为null,调用CanalMetaManager
的updateCursor
方法更新CanalMetaManager
中保存的position信息
- 从
调用
CanalInstance
的subscribeChange
方法,通知CanalInstance
订阅关系变化- 如果
identity
中设置的filter
过滤条件不为空,则将其设置到eventParser
中
- 如果
- 调用
前面的准备过程基本就是一个向CanalMetaManager
注册客户端并处理消费位点的工作。
接下来是循环读取数据的过程:
调用
CanalServerWithEmbedded
的getWithoutAck
方法获取binlog消息。getWithoutAck
方法一次获取一批(batch)的binlog,canal会为这批binlog生成一个唯一的batchId。客户端如果消费成功,则调用ack方法对这个批次的数据进行确认。如果失败的话,可以调用rollback方法进行回滚。客户端可以连续多次调用getWithoutAck
方法来获取binlog,在ack的时候,需要按照获取到binlog的先后顺序进行ack。如果后面获取的binlog被ack了,那么之前没有ack的binlog消息也会自动被ack。注意
getWithoutAck
方法中有一个synchronized (canalInstance)
代码,即虽然destination
可以对应多个客户端,但是每个客户端不能并发获取binlog消息。- 调用
CanalMetaManager
的getLastestBatch
方法获取到该clientIdentity
消费的最新的位置,保存在PositionRange
中。这是为了确定从哪个位置开始继续消费binlog。 从
CanalEventStore
中读取数据,保存在events
中:- 如果
PositionRange
不为null,从CanalEventStore
中获取PositionRange
开始位置之后的数据 如果
PositionRange
为null,说明是ack后第一次获取数据:- 调用
CanalMetaManager
的getCursor(ClientIdentity clientIdentity)
方法获取cursor游标start
。如果start
为null,调用CanalEventStore
的getFirstPosition
方法获取CanalEventStore
中保存的第一条数据 - 从
CanalEventStore
中获取start
之后的数据
- 调用
- 如果
将
events
包装成Message
返回:- 如果
events
为空,返回空的Message
,batchId设置为-1
。 - 如果
events
不为空,调用CanalMetaManager
的addBatch
方法记录PositionRange
,返回一个唯一的batchId。将events
做相应转换之后包装成Message
返回。
根据位置从
CanalEventStore
中获取binlog数据,为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,没有获取到binlog也会立即返回。在
CanalMetaManager
中记录这个批次的binlog消息。CanalMetaManager
会为获取到的这个批次的binlog生成一个唯一的batchId。batchId是递增的,如果binlog信息为空,则直接把batchId设置为-1
。- 如果
- 调用
如果
message
不为空,将数据发送到mq中如果发送成功,调用
CanalServerWithEmbedded
的ack
方法确认小于batchId的消息已经被消费。- 调用
CanalMetaManager
的removeBatch
删除batchId表示的PositionRange
数据 - 调用
CanalMetaManager
的updateCursor
方法更新cursor - 调用
CanalEventStore
的ack
方法删除经过确认的数据
- 调用
如果发送失败,调用
CanalServerWithEmbedded
的rollback
方法回滚batchId表示的数据- 调用
CanalMetaManager
的removeBatch
删除batchId表示的PositionRange
数据 - 调用
CanalEventStore
的rollback
方法回滚到指定的位置
- 调用
下面我们来分析其中涉及到的细节。
CanalMetaManager
从前面的分析中,消费位点控制最关键的是CanalMetaManager
类。CanalMetaManager
主要用于记录客户端获取的未ack的PositionRange
日志信息(开始位置、结束位置、ack位置以及对应的batchId),实现重试功能,保证数据传输的可靠性。提供如下功能:
- 订阅行为处理:记录
destination
和ClientIdentity
的对应关系 - 未ack日志记录行为处理:通过
MemoryClientIdentityBatch
来实现获取指定batchId、最新或者第一个未ack日志的PositionRange
。 - 添加、获取未ack的日志记录:通过从eventstore中获取指定数量的event的
PositionRange
后(并不保存数据信息),添加到CanalMetaManager
中,并通过唯一batchId进行绑定,支持通过batchId获取未ack日志记录的功能。 - 删除已经ack日志记录的行为:通过batchId删除已经ack过的日志记录。注意:ack和rollback必须按照分发处理的顺序处理,即只能ack当前最小的batchId。不然容易出现丢数据的问题。
- 获取、清空所有未处理ack日志:获取和清空
MemoryClientIdentityBatch
中的记录 - 更新最近被ack的日志文件位置:从
PositionRange
中获取应该ack的Position
位置,进行更新到cursor游标中
下面我们来分析CanalMetaManager
类。
如上图所示,CanalMetaManager
是一个接口,其中有5个实现类。canal是如何来选择实现类的呢?前文canal——代码初读我们提到过canal的实例是通过Spring来生成的,生成实例配置的时候会指定xml
文件,Spring就是通过这个xml
文件来生成实例。
默认可以选择以下几种xml
文件,后面写上了它选择的CanalMetaManager
实现类:
- spring/default-instance.xml:PeriodMixedMetaManager
- spring/file-instance.xml:FileMixedMetaManager
- spring/group-instance.xml:MemoryMetaManager
- spring/memory-instance.xml:MemoryMetaManager
可以看到不同的xml
配置文件,选择的CanalMetaManager
实现类是不同的。其中最为重要的是MemoryMetaManager
,其中几个实现类也是基于它实现的。
下面我们来分析MemoryMetaManager
类。
MemoryMetaManager
如名称所示,MemoryMetaManager
将日志消费位点信息记录在内存中。
它有3个变量:
1 | protected Map<String, List<ClientIdentity>> destinations; |
destinations
中保存的是每个destination对应的客户端(从中可以看出针对一个destination可以启动多个客户端)
batches
中保存的是客户端消费的log位置范围
cursors
中保存的是客户端应答后的log位置
下面说明一下其中涉及的类:
ClientIdentity
:客户端的标识。其中保存着以下3个字段:
1
2
3private String destination;
private short clientId;
private String filter;
`clientId`默认为`1001`
MemoryClientIdentityBatch
:客户端消费的批量数据。其中保存着以下3个字段:
1
2
3private ClientIdentity clientIdentity;
private Map<Long, PositionRange> batches = new MapMaker().makeMap();
private AtomicLong atomicMaxBatchId = new AtomicLong(1);
`batches`中保存着batchId和`PositionRange`的对应关系。`PositionRange`中保存日志的开始位置、结束位置和ack位置。
`atomicMaxBatchId`记录最大的batchId。
Position
:Position
是一个抽象类,其实现类LogPosition
用于标识数据库日志位置。其中保存着一下2个字段:
1
2private LogIdentity identity;
private EntryPosition postion;
`LogIdentity`是数据库日志标识。保存着canal server的`slaveId`和`sourceAddress`信息
`EntryPosition`是数据库对象标识。保存着`included`、`journalName`、`position`、`serverId`、`gtid`、`timestamp`信息
下面来分析MemoryMetaManager
的几个方法:
subscribe相关的方法
subscribe相关的方法有如下4个,具体代码略
1 | /** |
主要的功能就是围绕destinations
变量,在ClientIdentity
列表中增减数据。本质上就是在destinations
中记录ClientIdentity
数据,来表示客户端正在订阅数据。删除订阅则是删除destinations
中的ClientIdentity
数据。
cursor相关的方法
cursor相关的方法有如下2个,具体代码略
1 | /** |
cursor相关的方法也是简单地围绕cursors
记录ClientIdentity
对应的Position
信息。
batch相关的方法
batch相关的方法有如下8个,具体代码略
1 | /** |
这些方法获取batches
中保存的MemoryClientIdentityBatch
,调用MemoryClientIdentityBatch
类中的相应方法:
1 | public synchronized void addPositionRange(PositionRange positionRange, Long batchId) { |
可以看到,MemoryClientIdentityBatch
中方法就是围绕着batches
变量对PositionRange
进行增删改查。
PeriodMixedMetaManager
我们再来看看PeriodMixedMetaManager
,它是default-instance.xml
中定义的CanalMetaManager
实现。
它和MemoryMetaManager
的主要区别就是增加了一个定时任务:
1 | updateCursorTasks = Collections.synchronizedSet(new HashSet<ClientIdentity>()); |
该定时任务的主要功能就是获取内存中保存的cursor
数据,将其保存到zookeeper中。当canal重启之后能获得之前已经处理过的cursor
数据,canal就可以从这个位置接下去获取新的数据。
更新cursor
的操作在updateCursor
方法中触发:
1 | public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException { |
FileMixedMetaManager
再来看看FileMixedMetaManager
,它是file-instance.xml
中定义的CanalMetaManager
实现。
它和PeriodMixedMetaManager
的主要区别就是PeriodMixedMetaManager
是将cursor
数据写到zookeeper,而FileMixedMetaManager
是将cursor
数据写到文件中,文件名为meta.dat
。这种机制主要适合单机部署canal的时候使用。
1 | updateCursorTasks = Collections.synchronizedSet(new HashSet<ClientIdentity>()); |
总结
canal在CanalMetaManager
中管理binlog消费位点。它的消费流程可以总结为以下几个步骤:
- 获取读取binlog的开始位置。这个开始位置依次从3个地方获取:最近的batch数据、cursor游标、
CanalEventStore
中保存的第一条数据 - 从
CanalEventStore
中获取这个上面开始位置之后的一批binlog数据,保存到CanalMetaManager
的batch中,返回一个唯一的batchId - 数据消费成功后,调用ack方法确认batchId表示的一批数据。否则调用rollback回滚batchId表示的数据
https://github.com/alibaba/canal/wiki/AdminGuide
https://blog.csdn.net/CWeeYii/article/details/78536374
http://www.tianshouzhi.com/api/tutorials/canal/382