上文Elasticsearch——transport网络模块我们分析了Elasticsearch
中用户与节点、节点与节点之间如何通过网络来通信,本文我们来分析一下通过网络传输的数据是如何被读写的。涉及上文Elasticsearch——transport网络模块相关的内容,本文不再重复。
用户与节点间的数据
请求数据的读操作
用户与Elasticsearch
的节点通信使用的是http
协议,数据流转的初始点在Netty4HttpRequestHandler
的channelRead0
方法。
下面根据函数的调用流程来看看用户的请求数据经过了哪些处理:
Netty4HttpRequestHandler.channelRead0
:netty生成并传入的是FullHttpRequest
实例,使用这个实例包装出一个Netty4HttpRequest
实例。Netty4HttpRequest
类定义了http请求的所有内容,包括method
、url
、header
、cookie
、协议版本
、请求体内容
。Netty4HttpRequest
的构造函数中调用Netty4Utils.toBytesReference(final ByteBuf buffer)
方法将netty生成的由ByteBuf
实例表示的请求体包装成ByteBufBytesReference
实例。ByteBufBytesReference
是抽象类BytesReference
的实现,提供了数据在字节这个层面的读写方法。AbstractHttpServerTransport.handleIncomingRequest
:请求数据由Netty4HttpRequest
实例再进一步包装成RestRequest
实例。相比于Netty4HttpRequest
,RestRequest
进一步从请求中分离出params
、path
信息。
有了RestRequest
实例,处理程序可以方便地从中获取有关请求的所有信息,除了请求体信息。此时的请求体信息还是以BytesReference
实例的形式存在的,还需要将以字节形式存在的数据解析成拥有具体类型的数据。
这一步骤关键的接口是XContentParser
。这里先介绍几个相关的接口和类:
XContentType
:Elasticsearch
支持4种数据类型,分别是JSON
、SMILE
、YAML
、CBOR
。XContentType
是表示这四种数据类型的枚举类。XContent
:数据的抽象。因为支持4种数据类型,因此XContent
有4种实现,分别是JsonXContent
、SmileXContent
、YamlXContent
、CborXContent
。XContentParser
:数据的解析器,4种数据类型的解析器实现分别是JsonXContentParser
、SmileXContentParser
、YamlXContentParser
、CborXContentParser
。
请求数据的解析流程如下:
- 根据数据的类型创建解析器
XContentParser
- 将字节形式的数据转化成
StreamInput
流数据形式 - 使用解析器
XContentParser
将流数据解析成具体表示数据的类
响应数据的写操作
响应数据的发送初始点在RestResponseListener
的processResponse
方法。它调用由具体操作实现的抽象方法buildResponse
构造响应数据,然后调用channel.sendResponse
发送数据。
这里有一个相关的类XContentBuilder
,该类是XContent
数据的构造器。向XContentBuilder
中添加数据,XContentBuilder
可以根据指定的类型(4种)生成相应二进制格式的数据。
buildResponse
返回一个RestResponse
实例。RestResponse
是定义响应数据的抽象类,实现类是BytesRestResponse
。其中包含响应头、响应状态、数据内容、数据类型。其中数据内容以BytesReference
实例的形式存在。BytesRestResponse
构造的时候会调用BytesReference.bytes(XContentBuilder xContentBuilder)
方法将XContentBuilder
转化成字节形式的二进制格式。
最后根据BytesRestResponse
数据生成Netty4HttpResponse
实例,通过netty
提供的方法发送。
节点之间的数据
假设A节点向B节点发送数据,完成一次通信包含4个步骤:A向B写请求数据,B接收请求数据,B向A写响应数据,A接收响应数据。下面分析这4个操作中数据是如何被读写的。
A向B写请求数据
对节点的请求数据以TransportRequest
的实现类来表示。继承关系如下:
可以看到,TransportRequest
实现了Writeable
和Streamable
接口。
Writeable
接口定义了如下方法:
1 | void writeTo(StreamOutput out) throws IOException; |
这个接口的实现类具有向StreamOutput
写入流数据的能力。
Streamable
接口定义了如下方法:
1 | void readFrom(StreamInput in) throws IOException; |
这个接口的实现类既能向StreamOutput
写入流数据,也能从StreamInput
中读取流数据。
所以TransportRequest
类同时具有对二进制流的写出与读入功能。
TransportRequest
通过OutboundHandler
的sendRequest
方法被发送:
1 | void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action, |
TransportRequest
在sendRequest
中被包装成OutboundMessage.Request
类,然后调用sendMessage
方法:
1 | private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException { |
sendMessage
方法中,使用发送数据OutboundMessage
生成一个数据序列化器MessageSerializer
,将MessageSerializer
传递给SendContext
,然后调用internalSend
方法将数据发送出去。在internalSend
方法中获取到二进制数据然后发送。二进制数据的获取如下所示:
1 | public BytesReference get() throws IOException { |
分析OutboundMessage
的serialize
方法:
1 | BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { |
可以看到,最终发送的数据分为两个部分:头部、内容。我们知道,netty
对于拆包粘包的问题设计了FrameDecoder
,这里的头部的用途之一就是数据包的解码。
首先将头部数据的位置先空出来,然后调用writeTo
写入action
名称(比如cluster:monitor/nodes/usage[n]
)。接着调用writeMessage
方法,writeMessage
方法内部调用Writeable.writeTo
方法写入具体的二进制数据。前面我们说过对节点的请求数据类都要实现TransportRequest
,因此这里实际调用的就是各个实现类定义的writeTo
方法。
接着将写入位置重新定位到开头,写入头部数据。头部数据包含数据标记(固定为ES
)、数据大小、请求id、状态、Elasticsearch
程序版本。详细内容参看TcpHeader
类。
B接收请求数据
前文我们分析过,请求数据到达Netty4MessageChannelHandler
的channelRead
方法。到达channelRead
方法后调用Netty4Utils.toBytesReference
将ByteBuf
表示的数据转换成BytesReference
表示的二进制数据。
数据经过流转到达InboundHandler
的messageReceived
方法,步骤如下:
- 调用
InboundMessage.Reader
的deserialize
方法将二进制数据初步解析成InboundMessage
,提取出其中的头部数据以及内容。 - 调用
handleRequest
方法处理请求。InboundMessage
中的请求内容数据解析成TransportRequest
。 - 新建线程执行
RequestHandler
,处理请求数据。
B向A写响应数据
请求经过处理之后生成以TransportResponse
表示的响应数据。继承关系如下:
TransportResponse
和TransportRequest
一样,同样继承了Writeable
和Streamable
接口。然后调用OutboundHandler
的sendResponse
方法发送响应数据:
1 | void sendResponse(final Version nodeVersion, final Set<String> features, final TcpChannel channel, |
可以看到sendResponse
方法和前面分析的sendRequest
方法大同小异,区别在于响应数据被包装成OutboundMessage.Response
。和数据请求一样,调用OutboundMessage
的serialize
来生成响应的二进制数据。
A接收响应数据
和接收请求数据一样,到达channelRead
方法后调用Netty4Utils.toBytesReference
将ByteBuf
表示的数据转换成BytesReference
表示的二进制数据。响应数据经过流转到达InboundHandler
的messageReceived
方法。
不一样的地方在于对于请求数据调用handleRequest
方法来处理,响应数据调用handleResponse
方法来处理。如前文所示,handleResponse
调用TransportResponseHandler.read
方法将二进制流数据转化成NodeResponse
数据,最后调用TransportResponseHandler.handleResponse
方法生成http响应数据并返回。
总结
综上所述,Elasticsearch
中围绕数据的读写有三个重要的类:BytesReference
、StreamInput
、StreamOutput
。
BytesReference
用于数据的二进制表示,StreamInput
和StreamOutput
用于数据流的读写。因为这样的设计,Elasticsearch
对数据才能高效地进行读写。