本文介绍canal的一个核心类CanalServerWithNetty以及与client之间的交互。
canal的架构如下图所示:

server代表一个canal运行实例,对应一个jvm。instance对应于一个数据队列。1个server可以对应多个instance。
server是最外层的服务类,它的核心接口为CanalServer,其中有2个实现类:CanalServerWithNetty、CanalServerWithEmbedded。
CanalServerWithEmbedded是直接与CanalInstance打交道的server。可以直接在应用中嵌入CanalServerWithEmbedded,这样我们就可以直接读取binlog信息,如此一来同步binlog信息的效率会更高。但是对于使用者的技术要求比较高。所以一般我们不会直接使用CanalServerWithEmbedded。我们在前文canal——代码初读说到过canal 1.1.x版本之后,canal原生支持kafka消息投递,原理就是canal加入了一个CanalMQStarter,在CanalMQStarter中直接使用CanalServerWithEmbedded读取binlog信息,然后发送到mq中。
在之前的版本中(1.1.x版本之后也可以指定canal.serverMode为tcp)canal会启动一个CanalServerWithNetty来与canal client进行通信,所有的canal client的请求统一由CanalServerWithNetty接受,之后CanalServerWithNetty会将客户端请求派给CanalServerWithEmbedded进行真正的处理。CanalServerWithEmbedded内部维护了多个canal instance,每个canal instance伪装成不同的mysql实例的slave,而CanalServerWithEmbedded会根据客户端请求携带的destination参数确定要由哪一个canal instance为期提供服务。
CanalServerWithNetty
新建
CanalServerWithNetty类中包含以下变量:
1 | private CanalServerWithEmbedded embeddedServer; |
其中embeddedServer是内嵌的server,最终的请求将委派给embeddedServer进行处理。ip和port是netty监听的网络ip和端口。Channel、ServerBootstrap、ChannelGroup是netty的配置。
CanalServerWithNetty是单例,调用instance()方法获取实例。在构造函数中设置embeddedServer、childGroups。
启动
CanalServerWithNetty在start()方法中启动netty服务:
- 如果
embeddedServer没有启动则先调用start()方法启动CanalServerWithEmbedded - 新建
ServerBootstrap - 设置netty的参数
child.keepAlive为true,child.tcpNoDelay为true - 加入pipeline:
FixedHeaderFrameDecoder、HandshakeInitializationHandler、ClientAuthenticationHandler、SessionHandler
其中FixedHeaderFrameDecoder用于解析header头信息。
HandshakeInitializationHandler用于向client发送握手信息。
ClientAuthenticationHandler用户认证client身份,认证成功后向pipeline加入IdleStateHandler和IdleStateAwareChannelHandler用于发送心跳保持连接。
SessionHandler用于真正的处理客户端请求,下面我们来分析SessionHandler
SessionHandler
netty接收到请求后,核心的处理逻辑都在SessionHandler中,我们来看它的messageReceived方法。
首先判断消息类型,将请求委派给embeddedServer处理。一共有5中类型:
- SUBSCRIPTION:客户端订阅请求,调用
embeddedServer.subscribe方法 - UNSUBSCRIPTION:客户端取消订阅请求,调用
embeddedServer.unsubscribe方法 - GET:获取binlog请求,调用
embeddedServer.getWithoutAck方法 - CLIENTACK:客户端消费成功ack请求,调用
embeddedServer.ack方法 - CLIENTROLLBACK:客户端消费失败回滚请求,调用
embeddedServer.rollback方法
embeddedServer的相关方法在canal——binlog消费位点的控制中已经有所介绍,这里不再赘述。
client
前面我们介绍了CanalServerWithNetty。它使用netty启动一个服务端,接收并在SessionHandler类中处理客户端的请求,根据不同的消息类型将请求委派给CanalServerWithEmbedded来处理。
接下来我们来介绍与CanalServerWithNetty交互的客户端。
首先介绍client的运行步骤:
- 新建
CanalConnector - 调用
CanalConnector.connect方法连接服务端 - 调用
CanalConnector.subscribe方法向服务端注册客户端信息 - 接着循环调用
CanalConnector.getWithoutAck方法获取binlog数据。如果消费成功调用CanalConnector.ack方法确认数据,否则调用CanalConnector.rollback方法回滚数据
CanalConnector有两个实现类:SimpleCanalConnector、ClusterCanalConnector。
SimpleCanalConnector
SimpleCanalConnector是执行与server交互的最终类,ClusterCanalConnector的功能其实也是委托给SimpleCanalConnector完成,因此我们首先介绍SimpleCanalConnector。
connect方法调用doConnect方法执行连接操作。客户端对网络的操作没有使用netty,而是直接使用了jdk nio方法。其步骤如下:
- 与服务端建立连接
- 接收并验证服务端返回的握手信息
- 向服务端发送客户端的验证信息
- 接收并验证服务端返回的应答信息
如果前面的步骤完成并验证成功,则客户端与服务端的连接建立成功。
subscribe方法向服务端发送SUBSCRIPTION类型的数据包,其中包含destination、clientId、filter信息。接收并验证服务端返回的应答信息
getWithoutAck方法向服务端发送GET类型的数据包,其中包含autoAck、destination、clientId、fetchSize、timeout、unit信息。接收服务端的信息,将其转换成Message并返回。
ack方法向服务端发送CLIENTACK类型的数据包,其中包含destination、clientId、batchId信息
rollback方法向服务端发送CLIENTROLLBACK类型的数据包,其中包含destination、clientId、batchId信息
ClusterCanalConnector
ClusterCanalConnector是集群版本connector实现,自带故障转移功能。它会监听zookeeper,如果server发生了故障,可以自动重新连接新的server。
CanalNodeAccessStrategy
首先介绍一下CanalNodeAccessStrategy,这是一个集群节点访问控制接口:
1 | public interface CanalNodeAccessStrategy { |
它有了两个实现类:SimpleNodeAccessStrategy、ClusterNodeAccessStrategy。
如果我们在创建CanalConnector时指定了所有的canal server,则使用SimpleNodeAccessStrategy。SimpleNodeAccessStrategy比较简单,就是从新建时指定的server列表中挑选server。
如果我们在创建CanalConnector时指定了zookeeper的地址,则使用ClusterNodeAccessStrategy。ClusterNodeAccessStrategy会从zookeeper中读取server列表,然后从server列表中挑选server。
ClusterNodeAccessStrategy在新建时会创建两个zookeeper的监听:
1 | childListener = new IZkChildListener() { |
其中childListener监听zookeeper的/otter/canal/destinations/{destination}/cluster目录,如果目录下的数据有变动说明canal server有变动,这个时候调用initClusters方法重新读取server列表。
dataListener监听zookeeper的/otter/canal/destinations/{destination}/running节点,如果这个节点的数据有变动说明正在运行的server发生了切换,这个时候调用initRunning方法获取正在运行的server节点。
ClusterCanalConnector建立连接
回到ClusterCanalConnector,来看新建ClusterCanalConnector的连接建立过程:
- 新建
SimpleCanalConnector - 设置超时时间
- 设置filter
- 如果
accessStrategy是ClusterNodeAccessStrategy类,则调用setZkClientx方法设置zookeeper地址 - 调用
connect()方法与服务端建立连接
唯一需要关注的是第4步,setZkClientx方法:
1 | public void setZkClientx(ZkClientx zkClientx) { |
setZkClientx方法处理保存zookeeper的地址,还调用initClientRunningMonitor方法初始化client运行时监视器ClientRunningMonitor。
ClientRunningMonitor
ClientRunningMonitor的作用是监听客户端的运行。其在SimpleCanalConnector的initClientRunningMonitor被创建:
1 | private synchronized void initClientRunningMonitor(ClientIdentity clientIdentity) { |
新建ClientRunningMonitor时,分别设置了clientData、destination、zkClientx以及一个回调方法。
SimpleCanalConnector在建立连接时调用ClientRunningMonitor的start()方法来启动。
ClientRunningMonitor的启动过程分为两步:
第一步:在zookeeper的/otter/canal/destinations/{destination}/{clientId}/running节点上设置监听:
1 | dataListener = new IZkDataListener() { |
如果/otter/canal/destinations/{destination}/{clientId}/running节点数据被删除:
- 调用
processActiveExit()断开当前client的连接 - 如果上一次active的client就是本机,调用
initRunning方法开始运行本机的client。initRunning方法会调用processActiveEnter()方法建立与server的连接 - 否则等待一定时间再调用
initRunning方法开始运行本机的client。
如果/otter/canal/destinations/{destination}/{clientId}/running节点数据发生了变化:
- 读取节点的数据
- 如果数据显示本机的client状态变成了非active,说明出现了主动释放的操作。调用
releaseRunning方法释放本机的连接。releaseRunning方法会调用processActiveExit()断开当前client的连接
第二步:调用initRunning方法开始运行本机的client。
总结
本文我们介绍了CanalServerWithNetty以及server与从client的交互。
对于CanalServerWithNetty来说,它的功能是使用netty启动一个server,接收请求,通过判断请求消息的类型委托给CanalServerWithEmbedded来处理,并将处理完的数据返回给client。
对于client来说,我们介绍了两个CanalConnector的实现类:SimpleCanalConnector、ClusterCanalConnector。
CanalConnector承担了建立连接、订阅客户端、获取binlog数据、确认batchId、回滚batchId的功能。
其中SimpleCanalConnector实现类完成了这一系列的功能,而ClusterCanalConnector在SimpleCanalConnector的基础上通过监听zookeeper增加了故障转移的功能。
http://www.tianshouzhi.com/api/tutorials/canal/382
https://github.com/alibaba/canal/wiki/Introduction