canal是阿里巴巴开发的,用于解析数据库日志的工具,提供增量数据订阅和消费。目前主要支持mysql。
canal的原理并不复杂:模拟mysql slave协议,将自己伪装为slave,收到log之后进行解析然后发送给应用程序。参考:canal产品介绍
canal在我们公司的主要用途是同步mysql数据到ElasticSearch。目前需要基于canal做二次开发,因此需要熟悉canal的源码。本文是对canal源码的初读。
采用的源码版本是canal 1.1.x,它与之前版本最大的不同就是原生支持kafka消息投递。这样我们就不再需要启动canal client来读取数据并投递到kafka中。
启动流程
canal的入口函数是CanalLauncher的main方法,我们来跟踪代码的执行流程。
- 加载
canal.properties。如果指定了canal.conf则使用指定的配置,否则使用默认的canal.properties文件。 新建
CanalStater并启动- 判断
canal.serverMode,如果为kafka则新建CanalKafkaProducer。设置canal.withoutNetty为true,以及用户定义的canal.destinations 新建
CanalController调用
initGlobalConfig方法初始化全局参数设置- 获取并设置
mode,默认为SPRING - 获取并设置
lazy,默认为false - 获取并设置
managerAddress,默认为null - 获取并设置
springXml,默认为classpath:spring/file-instance.xml - 创建
instanceGenerator,实例生成器。用于根据destination生成实例
- 获取并设置
调用
initInstanceConfig初始化实例配置- 获取
canal.destinations配置 - 将
canal.destinations以,分割 针对每个destination:
- 调用
parseInstanceConfig方法解析destination的配置。与初始化全局参数设置类似,这里根据具体的destination配置mode、lazy、managerAddress、springXml - 将解析得到的destination配置保存在
instanceConfigs
- 调用
- 获取
如果配置了
canal.socketChannel,设置canal.socketChannel属性- 如果存在的话,分别设置
canal.instance.rds.accesskey和canal.instance.rds.secretkey属性 - 获取
cid、ip、port属性 - 获取
embededCanalServer,并设置instanceGenerator。embededCanalServer的类型为CanalServerWithEmbedded - 获取并设置
embededCanalServer的metricsPort - 如果
canal.withoutNetty为null或者false,创建canalServer并配置ip和port。 - 如果
ip属性为空,配置本机ip 获取
canal.zkServers,zookeeper的地址- 如果
canal.zkServers不为空,在zookeeper中创建/otter/canal/destinations和/otter/canal/cluster目录
- 如果
创建服务器运行信息
ServerRunningData- 将
ServerRunningData设置在服务器运行监控ServerRunningMonitors中。在ServerRunningMonitors中设置每个destination的运行监控器ServerRunningMonitor 获取
canal.auto.scan属性,默认为true- 创建
InstanceAction,实例执行器。其中定义了实例启动、停止、重启3个操作 - 创建
InstanceConfigMonitor,实例配置监视器。
- 创建
调用
start()方法启动CanalController- 在zookeeper中创建canal服务器的path,path为
/otter/canal/cluster/{ip}:{port} - 在zookeeper中创建状态变化的监听器
调用
start()方法启动embededCanalServer- 加载并初始化
CanalMetricsService - 创建
canalInstances
- 加载并初始化
遍历各个instance
调用
ServerRunningMonitor.start()方法启动每个destination的ServerRunningMonitor- 调用
processStart()方法。在zookeeper中新建/otter/canal/destinations/{name}/cluster/{ip}:{port}目录,并监听zookeeper状态的修改 - 监听zookeeper中
/otter/canal/destinations/{name}/running目录的变动 调用
initRunning()方法- 在zookeeper的
/otter/canal/destinations/{name}/running目录中增加正在运行的canal服务器信息 - 调用
processActiveEnter方法触发destination对应的canal实例(CanalInstance)开始执行
- 在zookeeper的
- 调用
为每个destination注册
InstanceAction
启动实例配置的监听器
InstanceConfigMonitor- 如果
canalServer不为null,则调用start()方法启动canalServer。如果没有指定mq模式,则会启动canalServer。canalServer是使用Netty写的服务端,接收用户连接,发送数据。
- 在zookeeper中创建canal服务器的path,path为
设置设置退出时执行的钩子线程
shutdownThread如果
canalMQProducer不为null,新建并启动CanalMQStarter- 设置mq的属性
- 为每个destination新建一个
CanalMQRunnable并启动
- 判断
启动流程总结
canal的简易时序图如下所示(第一次画时序图,欢迎指正):

从时序图可以看出CanalController是canal启动过程中处于中心调用位置的类,负责初始化各种配置并启动CanalServerWithEmbedded。
CanalServerWithEmbedded可以看成是一个canal实例的管理容器,其中有一个Map<String, CanalInstance> canalInstances变量保存所有的canal实例,负责各个canal实例(CanalInstance)的启动。
CanalInstance是真正执行mysql日志解析的类。用户配置了多少个destinations,就会启动多少个CanalInstance。每个CanalInstance会连接mysql,dump binlog,然后将数据交给parser解析,sink过滤,store存储。接下来,我们来分析CanalInstance的执行。
CanalInstance
通过前面的启动流程知道,CanalInstance的启动流程如下:
- CanalLauncher.main()
- CanalStater.start()
- CanalController.start()
- ServerRunningMonitor.start()
- ServerRunningMonitor.initRunning()
- ServerRunningMonitor.processActiveEnter()
- CanalServerWithEmbedded.start(final String destination)
CanalServerWithEmbedded.start(final String destination)方法负责具体destination的启动:
从
canalInstances中获取destination对应的CanalInstancecanalInstances是一个Map,如果其中不存在对应destination的CanalInstance,调用CanalInstanceGenerator.generate(String destination)生成CanalInstance- 从
instanceConfigs中获取相应destination的配置 默认通过Spring生成Instance。
- 创建
SpringCanalInstanceGenerator - 调用
getBeanFactory(String springXml)根据spring配置文件生成Spring Context。Spring Context中生成了几个重要的Bean:instance(Canal实例)、eventParser(解析)、eventSink(过滤)、eventStore(存储)、metaManager(元数据管理)、alarmHandler(报警) - 调用
generate(String destination)方法从Spring Context中获取destination对应的CanalInstance。CanalInstance的实际类为CanalInstanceWithSpring。
- 创建
- 从
调用
CanalInstance.start()方法启动Instance按先后顺序分别启动
metaManager(FileMixedMetaManager)、alarmHandler(LogAlarmHandler)、eventStore(MemoryEventStoreWithBuffer)、eventSink(EntryEventSink)、eventParser(RdsBinlogEventParserProxy)。
CanalEventParser
CanalEventParser在CanalInstance启动时被启动。CanalEventParser的实际类是RdsBinlogEventParserProxy,其真正的start()方法处于父类AbstractEventParser中。启动过程完成以下三件事:
- 配置
EventTransactionBuffer - 构建
BinlogParser - 新建
ParseThread(binlog解析线程)并启动
binlog解析线程的执行
- 创建Mysql连接
- 为Mysql连接启动一个心跳
调用
preDump方法执行dump前的准备工作- 调用
connect()方法连接mysql - 验证Mysql中配置的binlog-format是否能被支持
- 验证Mysql中配置的binlog-image是否能被支持
- 调用
调用
connect()方法连接mysql- 获取
serverId - 调用
findStartPosition方法获取binlog的开始位置 - 调用
processTableMeta方法回滚到指定位点 - 调用
reconnect()方法重新链接,因为在找position过程中可能有状态,需要断开后重建 调用
MysqlConnection.dump方法dump数据- 向mysql发送更新设置的请求
- 获取binlog的checksum信息
- 向mysql注册slave信息
- 向mysql发送dump binlog的请求
接下去循环读取binlog,存储在
LogBuffer中调用
MultiStageCoprocessor.publish投递数据MultiStageCoprocessor的实际类为MysqlMultiStageCoprocessor。MysqlMultiStageCoprocessor中维护着一个disruptorMsgBuffer。disruptorMsgBuffer的类是RingBuffer,这是一个无锁队列。存储在LogBuffer中的binlog数据被投递到disruptorMsgBuffer中。MysqlMultiStageCoprocessor针对解析器提供一个多阶段协同的处理。LogBuffer被投递到disruptorMsgBuffer之后分为3个阶段被处理:- 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息),调用
SimpleParserStage.onEvent处理 - 事件深度解析 (多线程, DML事件数据的完整解析),调用
DmlParserStage.onEvent处理 投递到store (单线程),调用
SinkStoreStage.onEvent处理SinkStoreStage.onEvent中如果event的Entry不为null,则将其添加到EventTransactionBuffer中。EventTransactionBuffer缓冲event队列,提供按事务刷新数据的机制。EventTransactionBuffer根据event调用EntryEventSink的sink方法,sink处理之后保存在MemoryEventStoreWithBuffer之中。
- 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息),调用
CanalMQProducer
经过前面的分析,我们知道了binlog经过解析、过滤等步骤之后最终被保存在MemoryEventStoreWithBuffer之中。下面我们来分析CanalMQProducer的执行。
在CanalStater的启动过程的最后,判断canalMQProducer是否为null。
如果我们设置了serverMode为kafka或者rocketmq,canalMQProducer的对象分别为CanalKafkaProducer和CanalRocketMQProducer,此时canalMQProducer不为null。于是新建CanalMQStarter,将canalMQProducer作为参数传入,然后启动CanalMQStarter。
CanalMQStarter的启动过程会为每个destination都新建一个CanalMQRunnable,每个destination都在单独的线程中执行。
CanalMQRunnable执行流程如下:
- 根据destination创建
ClientIdentity - 调用
canalServer.subscribe(clientIdentity)订阅client信息 循环调用
canalServer.getWithoutAck从canal中获取消息- 获取最后获取到的数据的位置
- 调用
getEvents方法获取数据。调用MemoryEventStoreWithBuffer.get,最终调用MemoryEventStoreWithBuffer.doGet方法获取保存的数据
调用
canalMQProducer.send向mq发送消息
总结
经过上面的分析,对canal的工作流程有了一个初步的印象。canal的代码模块、流程等比较清晰,可以比较方便地在其上进行定制开发。
canal的1.1.x版本之后我们不再需要使用client来获取canal的数据,而是可以通过配置将数据发送到mq中,简化了我们的开发工作。