canal——代码初读

canal是阿里巴巴开发的,用于解析数据库日志的工具,提供增量数据订阅和消费。目前主要支持mysql。

canal的原理并不复杂:模拟mysql slave协议,将自己伪装为slave,收到log之后进行解析然后发送给应用程序。参考:canal产品介绍

canal在我们公司的主要用途是同步mysql数据到ElasticSearch。目前需要基于canal做二次开发,因此需要熟悉canal的源码。本文是对canal源码的初读。

采用的源码版本是canal 1.1.x,它与之前版本最大的不同就是原生支持kafka消息投递。这样我们就不再需要启动canal client来读取数据并投递到kafka中。

启动流程

canal的入口函数是CanalLaunchermain方法,我们来跟踪代码的执行流程。

  1. 加载canal.properties。如果指定了canal.conf则使用指定的配置,否则使用默认的canal.properties文件。
  2. 新建CanalStater并启动

    1. 判断canal.serverMode,如果为kafka则新建CanalKafkaProducer。设置canal.withoutNetty为true,以及用户定义的canal.destinations
    2. 新建CanalController

      1. 调用initGlobalConfig方法初始化全局参数设置

        1. 获取并设置mode,默认为SPRING
        2. 获取并设置lazy,默认为false
        3. 获取并设置managerAddress,默认为null
        4. 获取并设置springXml,默认为classpath:spring/file-instance.xml
        5. 创建instanceGenerator,实例生成器。用于根据destination生成实例
      2. 调用initInstanceConfig初始化实例配置

        1. 获取canal.destinations配置
        2. canal.destinations,分割
        3. 针对每个destination:

          1. 调用parseInstanceConfig方法解析destination的配置。与初始化全局参数设置类似,这里根据具体的destination配置modelazymanagerAddressspringXml
          2. 将解析得到的destination配置保存在instanceConfigs
      3. 如果配置了canal.socketChannel,设置canal.socketChannel属性

      4. 如果存在的话,分别设置canal.instance.rds.accesskeycanal.instance.rds.secretkey属性
      5. 获取cidipport属性
      6. 获取embededCanalServer,并设置instanceGeneratorembededCanalServer的类型为CanalServerWithEmbedded
      7. 获取并设置embededCanalServermetricsPort
      8. 如果canal.withoutNetty为null或者false,创建canalServer并配置ipport
      9. 如果ip属性为空,配置本机ip
      10. 获取canal.zkServers,zookeeper的地址

        1. 如果canal.zkServers不为空,在zookeeper中创建/otter/canal/destinations/otter/canal/cluster目录
      11. 创建服务器运行信息ServerRunningData

      12. ServerRunningData设置在服务器运行监控ServerRunningMonitors中。在ServerRunningMonitors中设置每个destination的运行监控器ServerRunningMonitor
      13. 获取canal.auto.scan属性,默认为true

        1. 创建InstanceAction,实例执行器。其中定义了实例启动、停止、重启3个操作
        2. 创建InstanceConfigMonitor,实例配置监视器。
    3. 调用start()方法启动CanalController

      1. 在zookeeper中创建canal服务器的path,path为/otter/canal/cluster/{ip}:{port}
      2. 在zookeeper中创建状态变化的监听器
      3. 调用start()方法启动embededCanalServer

        1. 加载并初始化CanalMetricsService
        2. 创建canalInstances
      4. 遍历各个instance

        1. 调用ServerRunningMonitor.start()方法启动每个destination的ServerRunningMonitor

          1. 调用processStart()方法。在zookeeper中新建/otter/canal/destinations/{name}/cluster/{ip}:{port}目录,并监听zookeeper状态的修改
          2. 监听zookeeper中/otter/canal/destinations/{name}/running目录的变动
          3. 调用initRunning()方法

            1. 在zookeeper的/otter/canal/destinations/{name}/running目录中增加正在运行的canal服务器信息
            2. 调用processActiveEnter方法触发destination对应的canal实例(CanalInstance)开始执行
        2. 为每个destination注册InstanceAction

      5. 启动实例配置的监听器InstanceConfigMonitor

      6. 如果canalServer不为null,则调用start()方法启动canalServer。如果没有指定mq模式,则会启动canalServercanalServer是使用Netty写的服务端,接收用户连接,发送数据。
    4. 设置设置退出时执行的钩子线程shutdownThread

    5. 如果canalMQProducer不为null,新建并启动CanalMQStarter

      1. 设置mq的属性
      2. 为每个destination新建一个CanalMQRunnable并启动

启动流程总结

canal的简易时序图如下所示(第一次画时序图,欢迎指正):

cana

从时序图可以看出CanalController是canal启动过程中处于中心调用位置的类,负责初始化各种配置并启动CanalServerWithEmbedded

CanalServerWithEmbedded可以看成是一个canal实例的管理容器,其中有一个Map<String, CanalInstance> canalInstances变量保存所有的canal实例,负责各个canal实例(CanalInstance)的启动。

CanalInstance是真正执行mysql日志解析的类。用户配置了多少个destinations,就会启动多少个CanalInstance。每个CanalInstance会连接mysql,dump binlog,然后将数据交给parser解析,sink过滤,store存储。接下来,我们来分析CanalInstance的执行。

CanalInstance

通过前面的启动流程知道,CanalInstance的启动流程如下:

  1. CanalLauncher.main()
  2. CanalStater.start()
  3. CanalController.start()
  4. ServerRunningMonitor.start()
  5. ServerRunningMonitor.initRunning()
  6. ServerRunningMonitor.processActiveEnter()
  7. CanalServerWithEmbedded.start(final String destination)

CanalServerWithEmbedded.start(final String destination)方法负责具体destination的启动:

  1. canalInstances中获取destination对应的CanalInstance

    canalInstances是一个Map,如果其中不存在对应destination的CanalInstance,调用CanalInstanceGenerator.generate(String destination)生成CanalInstance

    1. instanceConfigs中获取相应destination的配置
    2. 默认通过Spring生成Instance。

      1. 创建SpringCanalInstanceGenerator
      2. 调用getBeanFactory(String springXml)根据spring配置文件生成Spring Context。Spring Context中生成了几个重要的Bean:instance(Canal实例)、eventParser(解析)、eventSink(过滤)、eventStore(存储)、metaManager(元数据管理)、alarmHandler(报警)
      3. 调用generate(String destination)方法从Spring Context中获取destination对应的CanalInstanceCanalInstance的实际类为CanalInstanceWithSpring
  2. 调用CanalInstance.start()方法启动Instance

    按先后顺序分别启动metaManagerFileMixedMetaManager)、alarmHandlerLogAlarmHandler)、eventStoreMemoryEventStoreWithBuffer)、eventSinkEntryEventSink)、eventParserRdsBinlogEventParserProxy)。

CanalEventParser

CanalEventParserCanalInstance启动时被启动。CanalEventParser的实际类是RdsBinlogEventParserProxy,其真正的start()方法处于父类AbstractEventParser中。启动过程完成以下三件事:

  1. 配置EventTransactionBuffer
  2. 构建BinlogParser
  3. 新建ParseThread(binlog解析线程)并启动

binlog解析线程的执行

  1. 创建Mysql连接
  2. 为Mysql连接启动一个心跳
  3. 调用preDump方法执行dump前的准备工作

    1. 调用connect()方法连接mysql
    2. 验证Mysql中配置的binlog-format是否能被支持
    3. 验证Mysql中配置的binlog-image是否能被支持
  4. 调用connect()方法连接mysql

  5. 获取serverId
  6. 调用findStartPosition方法获取binlog的开始位置
  7. 调用processTableMeta方法回滚到指定位点
  8. 调用reconnect()方法重新链接,因为在找position过程中可能有状态,需要断开后重建
  9. 调用MysqlConnection.dump方法dump数据

    1. 向mysql发送更新设置的请求
    2. 获取binlog的checksum信息
    3. 向mysql注册slave信息
    4. 向mysql发送dump binlog的请求
    5. 接下去循环读取binlog,存储在LogBuffer

      调用MultiStageCoprocessor.publish投递数据

      MultiStageCoprocessor的实际类为MysqlMultiStageCoprocessor

      MysqlMultiStageCoprocessor中维护着一个disruptorMsgBufferdisruptorMsgBuffer的类是RingBuffer,这是一个无锁队列。存储在LogBuffer中的binlog数据被投递到disruptorMsgBuffer中。

      MysqlMultiStageCoprocessor针对解析器提供一个多阶段协同的处理。LogBuffer被投递到disruptorMsgBuffer之后分为3个阶段被处理:

      1. 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息),调用SimpleParserStage.onEvent处理
      2. 事件深度解析 (多线程, DML事件数据的完整解析),调用DmlParserStage.onEvent处理
      3. 投递到store (单线程),调用SinkStoreStage.onEvent处理

        SinkStoreStage.onEvent中如果event的Entry不为null,则将其添加到EventTransactionBuffer中。EventTransactionBuffer缓冲event队列,提供按事务刷新数据的机制。

        EventTransactionBuffer根据event调用EntryEventSinksink方法,sink处理之后保存在MemoryEventStoreWithBuffer之中。

CanalMQProducer

经过前面的分析,我们知道了binlog经过解析、过滤等步骤之后最终被保存在MemoryEventStoreWithBuffer之中。下面我们来分析CanalMQProducer的执行。

CanalStater的启动过程的最后,判断canalMQProducer是否为null。

如果我们设置了serverMode为kafka或者rocketmqcanalMQProducer的对象分别为CanalKafkaProducerCanalRocketMQProducer,此时canalMQProducer不为null。于是新建CanalMQStarter,将canalMQProducer作为参数传入,然后启动CanalMQStarter

CanalMQStarter的启动过程会为每个destination都新建一个CanalMQRunnable,每个destination都在单独的线程中执行。

CanalMQRunnable执行流程如下:

  1. 根据destination创建ClientIdentity
  2. 调用canalServer.subscribe(clientIdentity)订阅client信息
  3. 循环调用canalServer.getWithoutAck从canal中获取消息

    1. 获取最后获取到的数据的位置
    2. 调用getEvents方法获取数据。调用MemoryEventStoreWithBuffer.get,最终调用MemoryEventStoreWithBuffer.doGet方法获取保存的数据
  4. 调用canalMQProducer.send向mq发送消息

总结

经过上面的分析,对canal的工作流程有了一个初步的印象。canal的代码模块、流程等比较清晰,可以比较方便地在其上进行定制开发。

canal的1.1.x版本之后我们不再需要使用client来获取canal的数据,而是可以通过配置将数据发送到mq中,简化了我们的开发工作。

https://juejin.im/post/5c74f96a6fb9a049e4135b85