本文介绍canal的一个核心类CanalServerWithNetty
以及与client之间的交互。
canal的架构如下图所示:
server代表一个canal运行实例,对应一个jvm。instance对应于一个数据队列。1个server可以对应多个instance。
server是最外层的服务类,它的核心接口为CanalServer
,其中有2个实现类:CanalServerWithNetty
、CanalServerWithEmbedded
。
CanalServerWithEmbedded
是直接与CanalInstance
打交道的server。可以直接在应用中嵌入CanalServerWithEmbedded
,这样我们就可以直接读取binlog信息,如此一来同步binlog信息的效率会更高。但是对于使用者的技术要求比较高。所以一般我们不会直接使用CanalServerWithEmbedded
。我们在前文canal——代码初读说到过canal 1.1.x
版本之后,canal原生支持kafka消息投递,原理就是canal加入了一个CanalMQStarter
,在CanalMQStarter
中直接使用CanalServerWithEmbedded
读取binlog信息,然后发送到mq中。
在之前的版本中(1.1.x版本之后也可以指定canal.serverMode
为tcp
)canal会启动一个CanalServerWithNetty
来与canal client进行通信,所有的canal client的请求统一由CanalServerWithNetty
接受,之后CanalServerWithNetty
会将客户端请求派给CanalServerWithEmbedded
进行真正的处理。CanalServerWithEmbedded
内部维护了多个canal instance,每个canal instance伪装成不同的mysql实例的slave,而CanalServerWithEmbedded
会根据客户端请求携带的destination参数确定要由哪一个canal instance为期提供服务。
CanalServerWithNetty
新建
CanalServerWithNetty
类中包含以下变量:
1 | private CanalServerWithEmbedded embeddedServer; |
其中embeddedServer
是内嵌的server,最终的请求将委派给embeddedServer
进行处理。ip
和port
是netty监听的网络ip和端口。Channel
、ServerBootstrap
、ChannelGroup
是netty的配置。
CanalServerWithNetty
是单例,调用instance()
方法获取实例。在构造函数中设置embeddedServer
、childGroups
。
启动
CanalServerWithNetty
在start()
方法中启动netty服务:
- 如果
embeddedServer
没有启动则先调用start()
方法启动CanalServerWithEmbedded
- 新建
ServerBootstrap
- 设置netty的参数
child.keepAlive
为true
,child.tcpNoDelay
为true
- 加入pipeline:
FixedHeaderFrameDecoder
、HandshakeInitializationHandler
、ClientAuthenticationHandler
、SessionHandler
其中FixedHeaderFrameDecoder
用于解析header头信息。
HandshakeInitializationHandler
用于向client发送握手信息。
ClientAuthenticationHandler
用户认证client身份,认证成功后向pipeline加入IdleStateHandler
和IdleStateAwareChannelHandler
用于发送心跳保持连接。
SessionHandler
用于真正的处理客户端请求,下面我们来分析SessionHandler
SessionHandler
netty接收到请求后,核心的处理逻辑都在SessionHandler
中,我们来看它的messageReceived
方法。
首先判断消息类型,将请求委派给embeddedServer
处理。一共有5中类型:
- SUBSCRIPTION:客户端订阅请求,调用
embeddedServer.subscribe
方法 - UNSUBSCRIPTION:客户端取消订阅请求,调用
embeddedServer.unsubscribe
方法 - GET:获取binlog请求,调用
embeddedServer.getWithoutAck
方法 - CLIENTACK:客户端消费成功ack请求,调用
embeddedServer.ack
方法 - CLIENTROLLBACK:客户端消费失败回滚请求,调用
embeddedServer.rollback
方法
embeddedServer的相关方法在canal——binlog消费位点的控制中已经有所介绍,这里不再赘述。
client
前面我们介绍了CanalServerWithNetty
。它使用netty启动一个服务端,接收并在SessionHandler
类中处理客户端的请求,根据不同的消息类型将请求委派给CanalServerWithEmbedded
来处理。
接下来我们来介绍与CanalServerWithNetty
交互的客户端。
首先介绍client的运行步骤:
- 新建
CanalConnector
- 调用
CanalConnector.connect
方法连接服务端 - 调用
CanalConnector.subscribe
方法向服务端注册客户端信息 - 接着循环调用
CanalConnector.getWithoutAck
方法获取binlog数据。如果消费成功调用CanalConnector.ack
方法确认数据,否则调用CanalConnector.rollback
方法回滚数据
CanalConnector
有两个实现类:SimpleCanalConnector
、ClusterCanalConnector
。
SimpleCanalConnector
SimpleCanalConnector
是执行与server交互的最终类,ClusterCanalConnector
的功能其实也是委托给SimpleCanalConnector
完成,因此我们首先介绍SimpleCanalConnector
。
connect
方法调用doConnect
方法执行连接操作。客户端对网络的操作没有使用netty,而是直接使用了jdk nio方法。其步骤如下:
- 与服务端建立连接
- 接收并验证服务端返回的握手信息
- 向服务端发送客户端的验证信息
- 接收并验证服务端返回的应答信息
如果前面的步骤完成并验证成功,则客户端与服务端的连接建立成功。
subscribe
方法向服务端发送SUBSCRIPTION
类型的数据包,其中包含destination
、clientId
、filter
信息。接收并验证服务端返回的应答信息
getWithoutAck
方法向服务端发送GET
类型的数据包,其中包含autoAck
、destination
、clientId
、fetchSize
、timeout
、unit
信息。接收服务端的信息,将其转换成Message
并返回。
ack
方法向服务端发送CLIENTACK
类型的数据包,其中包含destination
、clientId
、batchId
信息
rollback
方法向服务端发送CLIENTROLLBACK
类型的数据包,其中包含destination
、clientId
、batchId
信息
ClusterCanalConnector
ClusterCanalConnector
是集群版本connector实现,自带故障转移功能。它会监听zookeeper,如果server发生了故障,可以自动重新连接新的server。
CanalNodeAccessStrategy
首先介绍一下CanalNodeAccessStrategy
,这是一个集群节点访问控制接口:
1 | public interface CanalNodeAccessStrategy { |
它有了两个实现类:SimpleNodeAccessStrategy
、ClusterNodeAccessStrategy
。
如果我们在创建CanalConnector
时指定了所有的canal server,则使用SimpleNodeAccessStrategy
。SimpleNodeAccessStrategy
比较简单,就是从新建时指定的server列表中挑选server。
如果我们在创建CanalConnector
时指定了zookeeper的地址,则使用ClusterNodeAccessStrategy
。ClusterNodeAccessStrategy
会从zookeeper中读取server列表,然后从server列表中挑选server。
ClusterNodeAccessStrategy
在新建时会创建两个zookeeper的监听:
1 | childListener = new IZkChildListener() { |
其中childListener
监听zookeeper的/otter/canal/destinations/{destination}/cluster
目录,如果目录下的数据有变动说明canal server有变动,这个时候调用initClusters
方法重新读取server列表。
dataListener
监听zookeeper的/otter/canal/destinations/{destination}/running
节点,如果这个节点的数据有变动说明正在运行的server发生了切换,这个时候调用initRunning
方法获取正在运行的server节点。
ClusterCanalConnector建立连接
回到ClusterCanalConnector
,来看新建ClusterCanalConnector
的连接建立过程:
- 新建
SimpleCanalConnector
- 设置超时时间
- 设置filter
- 如果
accessStrategy
是ClusterNodeAccessStrategy
类,则调用setZkClientx
方法设置zookeeper地址 - 调用
connect()
方法与服务端建立连接
唯一需要关注的是第4步,setZkClientx
方法:
1 | public void setZkClientx(ZkClientx zkClientx) { |
setZkClientx
方法处理保存zookeeper的地址,还调用initClientRunningMonitor
方法初始化client运行时监视器ClientRunningMonitor
。
ClientRunningMonitor
ClientRunningMonitor
的作用是监听客户端的运行。其在SimpleCanalConnector
的initClientRunningMonitor
被创建:
1 | private synchronized void initClientRunningMonitor(ClientIdentity clientIdentity) { |
新建ClientRunningMonitor
时,分别设置了clientData
、destination
、zkClientx
以及一个回调方法。
SimpleCanalConnector
在建立连接时调用ClientRunningMonitor
的start()
方法来启动。
ClientRunningMonitor
的启动过程分为两步:
第一步:在zookeeper的/otter/canal/destinations/{destination}/{clientId}/running
节点上设置监听:
1 | dataListener = new IZkDataListener() { |
如果/otter/canal/destinations/{destination}/{clientId}/running
节点数据被删除:
- 调用
processActiveExit()
断开当前client的连接 - 如果上一次active的client就是本机,调用
initRunning
方法开始运行本机的client。initRunning
方法会调用processActiveEnter()
方法建立与server的连接 - 否则等待一定时间再调用
initRunning
方法开始运行本机的client。
如果/otter/canal/destinations/{destination}/{clientId}/running
节点数据发生了变化:
- 读取节点的数据
- 如果数据显示本机的client状态变成了非active,说明出现了主动释放的操作。调用
releaseRunning
方法释放本机的连接。releaseRunning
方法会调用processActiveExit()
断开当前client的连接
第二步:调用initRunning
方法开始运行本机的client。
总结
本文我们介绍了CanalServerWithNetty
以及server与从client的交互。
对于CanalServerWithNetty
来说,它的功能是使用netty启动一个server,接收请求,通过判断请求消息的类型委托给CanalServerWithEmbedded
来处理,并将处理完的数据返回给client。
对于client来说,我们介绍了两个CanalConnector
的实现类:SimpleCanalConnector
、ClusterCanalConnector
。
CanalConnector
承担了建立连接、订阅客户端、获取binlog数据、确认batchId、回滚batchId的功能。
其中SimpleCanalConnector
实现类完成了这一系列的功能,而ClusterCanalConnector
在SimpleCanalConnector
的基础上通过监听zookeeper增加了故障转移的功能。
http://www.tianshouzhi.com/api/tutorials/canal/382
https://github.com/alibaba/canal/wiki/Introduction