经过前文canal——代码初读的分析,我们对canal的工作流程有了初步的了解。本文,我们来分析canal的monitor机制。
canal的流程描述中提及了两个monitor:InstanceConfigMonitor、ServerRunningMonitor。本文重点对这两个monitor进行分析。
InstanceConfigMonitor
从字面意思就可以得出,InstanceConfigMonitor是对canal实例配置的监控。实际上也确实如此,它的功能就是监控canal实例的配置文件——instance.properties,如果其中有增加、修改、删除,执行实例的启动、重启、停止操作。下面我们来分析InstanceConfigMonitor的代码。
InstanceConfigMonitor的创建
InstanceConfigMonitor在CanalController构造函数中被创建:
首先获取canal.auto.scan配置,该配置控制是否监听canal实例配置的变化。如果该配置为false,则不需要创建InstanceConfigMonitor。默认该配置为true,此时需要创建InstanceConfigMonitor。
在创建InstanceConfigMonitor之前先创建一个defaultAction(实际类为InstanceAction),InstanceAction是一个接口,其中定义了三个函数,分别对应启动实例、定制实例、重启实例:
1 | public interface InstanceAction { |
InstanceAction的具体操作我们在后面再分析。
接着创建一个instanceConfigMonitors,它是一个Map<InstanceMode, InstanceConfigMonitor>,当获取InstanceConfigMonitor是发现Map中不存在时(从调用流程上看,InstanceConfigMonitor其实是在其将要启动时获取的),调用apply方法创建一个InstanceConfigMonitor:
1 | int scanInterval = Integer |
创建流程如下:
- 获取配置
canal.auto.scan.interval,它指定了配置监听扫描的间隔时间,默认为5秒。 - 判断mode是否为
SPRING。mode在配置canal.instance.global.mode中指定,可以正对各个destination配置不同的mode。全局mode默认为spring - 新建
SpringInstanceConfigMonitor - 设置
scanInterval - 设置
defaultAction。defaultAction为刚刚创建的InstanceAction。 - 获取并设置配置
canal.conf.dir,这是监控进行的根目录。
InstanceConfigMonitor的启动
InstanceConfigMonitor的启动位于CanalController的start()方法中。
在启动之前,还有一步遍历destination并注册InstanceAction。
1 | if (autoScan) { |
其中执行instanceConfigMonitors.get(config.getMode())时发现instanceConfigMonitors不存在对应的InstanceConfigMonitor,先创建InstanceConfigMonitor。
接着调用start()方法启动。对于SpringInstanceConfigMonitor来说,start()方法中启动了一个定时任务,每隔scanIntervalInSecond时间(默认5秒),执行其中的scan()方法:
1 | private void scan() { |
scan()方法的作用就扫描instance的配置文件,如果有增加、修改、删除,则分别调用notifyStart、notifyReload、notifyStop操作。三个操作的代码如下:
1 | private void notifyStart(File instanceDir, String destination, File[] instanceConfigs) { |
可以看到notifyStart、notifyReload、notifyStop三个操作主要就是调用defaultAction中定义的Instance操作对Instance执行启动、重启、停止操作。
下面来看defaultAction中定义的操作:
1 | defaultAction = new InstanceAction() { |
可以看到,defaultAction中定义了三个操作,其中reload操作就是简单地停止并启动instance,stop操作删除instance的配置然后调用ServerRunningMonitor的stop()方法停止instance,start操作读取instance配置并调用调用ServerRunningMonitor的start()方法启动instance。
ServerRunningMonitor
ServerRunningMonitor是对canal实例进行控制并监控的类。
ServerRunningMonitor在ServerRunningMonitors中维护,ServerRunningMonitors中的runningMonitors变量保存各个destination对应的ServerRunningMonitor。
控制canal的实例的启停
ServerRunningMonitor第一个功能就是控制canal的实例的启停,由start()、stop()两个方法负责:
1 | public synchronized void start() { |
先介绍其中涉及到四个process**回调方法:processStart、processStop、processActiveEnter、processActiveExit。这个四个方法执行的功能非常简单,就是调用ServerRunningListener中相应的方法。ServerRunningListener是一个接口,其中定义了四个instance状态发生改变时回调的方法:
1 | public interface ServerRunningListener { |
canal在ServerRunningMonitor的新建过程中新建并设置了ServerRunningListener的实现,下面分别介绍这个四个方法的实现:
processStart:如果使用了zookeeper,在zookeeper中新建cluster的目录(目录名:/otter/canal/destinations/{destination}/cluster/{ip}:{port}),监控zookeeper状态改变processStop:如果使用了zookeeper,删除zookeeper中的cluster目录processActiveEnter:调用CanalServerWithEmbedded的start方法启动destination对应的实例processActiveExit:调用CanalServerWithEmbedded的stop方法停止destination对应的实例
现在回到ServerRunningMonitor的start和stop方法:
start():- 调用
processStart方法,作用如上文所示 - 如果使用了zookeeper,监听zookeeper中的
/otter/canal/destinations/{destination}/running节点。调用initRunning方法,运行canal实例 - 如果没有使用zookeeper,直接调用
processActiveEnter方法启动实例
- 调用
stop():- 如果使用了zookeeper,取消监听zookeeper中的
/otter/canal/destinations/{destination}/running节点。调用releaseRunning方法停止canal实例 - 如果没有使用zookeeper,直接调用
processActiveExit方法停止实例
- 如果使用了zookeeper,取消监听zookeeper中的
initRunning方法如下所示:
它的作用是在zookeeper的/otter/canal/destinations/{destination}/running节点中写入当前运行的canal服务器信息,并调用processActiveEnter方法启动实例。
如果该节点已经存在,则重新获取数据并将该数据作为正在运行的canal信息保存在activeData中。
1 | private void initRunning() { |
releaseRunning方法如下所示。它的作用是删除zookeeper的/otter/canal/destinations/{destination}/running节点,并调用processActiveExit方法停止实例。
1 | private boolean releaseRunning() { |
监听zookeeper,如果zookeeper中的数据发生改变采取措施
ServerRunningMonitor第二个功能就是监听zookeeper,如果zookeeper中的数据发生改变采取措施。
前面我们看到ServerRunningMonitor在执行start()方法时会监听zookeeper中的/otter/canal/destinations/{destination}/running节点,其响应方法在dataListener中定义:
1 | dataListener = new IZkDataListener() { |
当/otter/canal/destinations/{destination}/running节点的数据发生修改会调用handleDataChange方法:
- 获取该目录修改之后的数据
- 如果数据显示正在运行的canal的地址和本机不同,说明已经有其他机器上的canal正在运行
- 如果数据显示正在运行的canal就是本机,并且状态不是active,说明本机出现了主动释放的操作,此时调用
releaseRunning()方法删除zookeeper的/otter/canal/destinations/{destination}/running节点,并调用processActiveExit方法停止实例 - 将正在运行的canal信息保存在
activeData中
当/otter/canal/destinations/{destination}/running节点的数据被删除会调用handleDataDeleted方法:
- 如果上次运行canal的就是本机,则执行
initRunning重新运行本机的canal - 否则等待
delayTime(5秒)时间后再执行initRunning重新运行本机的canal
总结
本文分析了canal的两个monitor机制:InstanceConfigMonitor、ServerRunningMonitor。通过这两个监控机制,canal实现了实例配置的修改与生效,以及实例在不同机器上的高可用。