经过前文canal——代码初读的分析,我们对canal的工作流程有了初步的了解。本文,我们来分析canal的monitor机制。
canal的流程描述中提及了两个monitor:InstanceConfigMonitor
、ServerRunningMonitor
。本文重点对这两个monitor进行分析。
InstanceConfigMonitor
从字面意思就可以得出,InstanceConfigMonitor
是对canal实例配置的监控。实际上也确实如此,它的功能就是监控canal实例的配置文件——instance.properties
,如果其中有增加、修改、删除,执行实例的启动、重启、停止操作。下面我们来分析InstanceConfigMonitor
的代码。
InstanceConfigMonitor的创建
InstanceConfigMonitor
在CanalController
构造函数中被创建:
首先获取canal.auto.scan
配置,该配置控制是否监听canal实例配置的变化。如果该配置为false
,则不需要创建InstanceConfigMonitor
。默认该配置为true
,此时需要创建InstanceConfigMonitor
。
在创建InstanceConfigMonitor
之前先创建一个defaultAction
(实际类为InstanceAction
),InstanceAction
是一个接口,其中定义了三个函数,分别对应启动实例、定制实例、重启实例:
1 | public interface InstanceAction { |
InstanceAction
的具体操作我们在后面再分析。
接着创建一个instanceConfigMonitors
,它是一个Map<InstanceMode, InstanceConfigMonitor>
,当获取InstanceConfigMonitor
是发现Map中不存在时(从调用流程上看,InstanceConfigMonitor
其实是在其将要启动时获取的),调用apply
方法创建一个InstanceConfigMonitor
:
1 | int scanInterval = Integer |
创建流程如下:
- 获取配置
canal.auto.scan.interval
,它指定了配置监听扫描的间隔时间,默认为5秒。 - 判断mode是否为
SPRING
。mode在配置canal.instance.global.mode
中指定,可以正对各个destination配置不同的mode。全局mode默认为spring
- 新建
SpringInstanceConfigMonitor
- 设置
scanInterval
- 设置
defaultAction
。defaultAction
为刚刚创建的InstanceAction
。 - 获取并设置配置
canal.conf.dir
,这是监控进行的根目录。
InstanceConfigMonitor的启动
InstanceConfigMonitor
的启动位于CanalController
的start()
方法中。
在启动之前,还有一步遍历destination并注册InstanceAction
。
1 | if (autoScan) { |
其中执行instanceConfigMonitors.get(config.getMode())
时发现instanceConfigMonitors
不存在对应的InstanceConfigMonitor
,先创建InstanceConfigMonitor
。
接着调用start()
方法启动。对于SpringInstanceConfigMonitor
来说,start()
方法中启动了一个定时任务,每隔scanIntervalInSecond
时间(默认5秒),执行其中的scan()
方法:
1 | private void scan() { |
scan()
方法的作用就扫描instance的配置文件,如果有增加、修改、删除,则分别调用notifyStart
、notifyReload
、notifyStop
操作。三个操作的代码如下:
1 | private void notifyStart(File instanceDir, String destination, File[] instanceConfigs) { |
可以看到notifyStart
、notifyReload
、notifyStop
三个操作主要就是调用defaultAction
中定义的Instance操作对Instance执行启动、重启、停止操作。
下面来看defaultAction
中定义的操作:
1 | defaultAction = new InstanceAction() { |
可以看到,defaultAction
中定义了三个操作,其中reload
操作就是简单地停止并启动instance,stop
操作删除instance的配置然后调用ServerRunningMonitor
的stop()
方法停止instance,start
操作读取instance配置并调用调用ServerRunningMonitor
的start()
方法启动instance。
ServerRunningMonitor
ServerRunningMonitor
是对canal实例进行控制并监控的类。
ServerRunningMonitor
在ServerRunningMonitors
中维护,ServerRunningMonitors
中的runningMonitors
变量保存各个destination对应的ServerRunningMonitor
。
控制canal的实例的启停
ServerRunningMonitor
第一个功能就是控制canal的实例的启停,由start()
、stop()
两个方法负责:
1 | public synchronized void start() { |
先介绍其中涉及到四个process**
回调方法:processStart
、processStop
、processActiveEnter
、processActiveExit
。这个四个方法执行的功能非常简单,就是调用ServerRunningListener
中相应的方法。ServerRunningListener
是一个接口,其中定义了四个instance状态发生改变时回调的方法:
1 | public interface ServerRunningListener { |
canal在ServerRunningMonitor
的新建过程中新建并设置了ServerRunningListener
的实现,下面分别介绍这个四个方法的实现:
processStart
:如果使用了zookeeper,在zookeeper中新建cluster的目录(目录名:/otter/canal/destinations/{destination}/cluster/{ip}:{port}
),监控zookeeper状态改变processStop
:如果使用了zookeeper,删除zookeeper中的cluster目录processActiveEnter
:调用CanalServerWithEmbedded
的start
方法启动destination对应的实例processActiveExit
:调用CanalServerWithEmbedded
的stop
方法停止destination对应的实例
现在回到ServerRunningMonitor
的start
和stop
方法:
start()
:- 调用
processStart
方法,作用如上文所示 - 如果使用了zookeeper,监听zookeeper中的
/otter/canal/destinations/{destination}/running
节点。调用initRunning
方法,运行canal实例 - 如果没有使用zookeeper,直接调用
processActiveEnter
方法启动实例
- 调用
stop()
:- 如果使用了zookeeper,取消监听zookeeper中的
/otter/canal/destinations/{destination}/running
节点。调用releaseRunning
方法停止canal实例 - 如果没有使用zookeeper,直接调用
processActiveExit
方法停止实例
- 如果使用了zookeeper,取消监听zookeeper中的
initRunning
方法如下所示:
它的作用是在zookeeper的/otter/canal/destinations/{destination}/running
节点中写入当前运行的canal服务器信息,并调用processActiveEnter
方法启动实例。
如果该节点已经存在,则重新获取数据并将该数据作为正在运行的canal信息保存在activeData
中。
1 | private void initRunning() { |
releaseRunning
方法如下所示。它的作用是删除zookeeper的/otter/canal/destinations/{destination}/running
节点,并调用processActiveExit
方法停止实例。
1 | private boolean releaseRunning() { |
监听zookeeper,如果zookeeper中的数据发生改变采取措施
ServerRunningMonitor
第二个功能就是监听zookeeper,如果zookeeper中的数据发生改变采取措施。
前面我们看到ServerRunningMonitor
在执行start()
方法时会监听zookeeper中的/otter/canal/destinations/{destination}/running
节点,其响应方法在dataListener中定义:
1 | dataListener = new IZkDataListener() { |
当/otter/canal/destinations/{destination}/running
节点的数据发生修改会调用handleDataChange
方法:
- 获取该目录修改之后的数据
- 如果数据显示正在运行的canal的地址和本机不同,说明已经有其他机器上的canal正在运行
- 如果数据显示正在运行的canal就是本机,并且状态不是active,说明本机出现了主动释放的操作,此时调用
releaseRunning()
方法删除zookeeper的/otter/canal/destinations/{destination}/running
节点,并调用processActiveExit
方法停止实例 - 将正在运行的canal信息保存在
activeData
中
当/otter/canal/destinations/{destination}/running
节点的数据被删除会调用handleDataDeleted
方法:
- 如果上次运行canal的就是本机,则执行
initRunning
重新运行本机的canal - 否则等待
delayTime
(5秒)时间后再执行initRunning
重新运行本机的canal
总结
本文分析了canal的两个monitor机制:InstanceConfigMonitor
、ServerRunningMonitor
。通过这两个监控机制,canal实现了实例配置的修改与生效,以及实例在不同机器上的高可用。