上文Elasticsearch——transport网络模块我们分析了Elasticsearch中用户与节点、节点与节点之间如何通过网络来通信,本文我们来分析一下通过网络传输的数据是如何被读写的。涉及上文Elasticsearch——transport网络模块相关的内容,本文不再重复。
用户与节点间的数据
请求数据的读操作
用户与Elasticsearch的节点通信使用的是http协议,数据流转的初始点在Netty4HttpRequestHandler的channelRead0方法。
下面根据函数的调用流程来看看用户的请求数据经过了哪些处理:
Netty4HttpRequestHandler.channelRead0:netty生成并传入的是FullHttpRequest实例,使用这个实例包装出一个Netty4HttpRequest实例。Netty4HttpRequest类定义了http请求的所有内容,包括method、url、header、cookie、协议版本、请求体内容。Netty4HttpRequest的构造函数中调用Netty4Utils.toBytesReference(final ByteBuf buffer)方法将netty生成的由ByteBuf实例表示的请求体包装成ByteBufBytesReference实例。ByteBufBytesReference是抽象类BytesReference的实现,提供了数据在字节这个层面的读写方法。AbstractHttpServerTransport.handleIncomingRequest:请求数据由Netty4HttpRequest实例再进一步包装成RestRequest实例。相比于Netty4HttpRequest,RestRequest进一步从请求中分离出params、path信息。
有了RestRequest实例,处理程序可以方便地从中获取有关请求的所有信息,除了请求体信息。此时的请求体信息还是以BytesReference实例的形式存在的,还需要将以字节形式存在的数据解析成拥有具体类型的数据。
这一步骤关键的接口是XContentParser。这里先介绍几个相关的接口和类:
XContentType:Elasticsearch支持4种数据类型,分别是JSON、SMILE、YAML、CBOR。XContentType是表示这四种数据类型的枚举类。XContent:数据的抽象。因为支持4种数据类型,因此XContent有4种实现,分别是JsonXContent、SmileXContent、YamlXContent、CborXContent。XContentParser:数据的解析器,4种数据类型的解析器实现分别是JsonXContentParser、SmileXContentParser、YamlXContentParser、CborXContentParser。
请求数据的解析流程如下:
- 根据数据的类型创建解析器
XContentParser - 将字节形式的数据转化成
StreamInput流数据形式 - 使用解析器
XContentParser将流数据解析成具体表示数据的类
响应数据的写操作
响应数据的发送初始点在RestResponseListener的processResponse方法。它调用由具体操作实现的抽象方法buildResponse构造响应数据,然后调用channel.sendResponse发送数据。
这里有一个相关的类XContentBuilder,该类是XContent数据的构造器。向XContentBuilder中添加数据,XContentBuilder可以根据指定的类型(4种)生成相应二进制格式的数据。
buildResponse返回一个RestResponse实例。RestResponse是定义响应数据的抽象类,实现类是BytesRestResponse。其中包含响应头、响应状态、数据内容、数据类型。其中数据内容以BytesReference实例的形式存在。BytesRestResponse构造的时候会调用BytesReference.bytes(XContentBuilder xContentBuilder)方法将XContentBuilder转化成字节形式的二进制格式。
最后根据BytesRestResponse数据生成Netty4HttpResponse实例,通过netty提供的方法发送。
节点之间的数据
假设A节点向B节点发送数据,完成一次通信包含4个步骤:A向B写请求数据,B接收请求数据,B向A写响应数据,A接收响应数据。下面分析这4个操作中数据是如何被读写的。
A向B写请求数据
对节点的请求数据以TransportRequest的实现类来表示。继承关系如下:

可以看到,TransportRequest实现了Writeable和Streamable接口。
Writeable接口定义了如下方法:
1 | void writeTo(StreamOutput out) throws IOException; |
这个接口的实现类具有向StreamOutput写入流数据的能力。
Streamable接口定义了如下方法:
1 | void readFrom(StreamInput in) throws IOException; |
这个接口的实现类既能向StreamOutput写入流数据,也能从StreamInput中读取流数据。
所以TransportRequest类同时具有对二进制流的写出与读入功能。
TransportRequest通过OutboundHandler的sendRequest方法被发送:
1 | void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action, |
TransportRequest在sendRequest中被包装成OutboundMessage.Request类,然后调用sendMessage方法:
1 | private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException { |
sendMessage方法中,使用发送数据OutboundMessage生成一个数据序列化器MessageSerializer,将MessageSerializer传递给SendContext,然后调用internalSend方法将数据发送出去。在internalSend方法中获取到二进制数据然后发送。二进制数据的获取如下所示:
1 | public BytesReference get() throws IOException { |
分析OutboundMessage的serialize方法:
1 | BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { |
可以看到,最终发送的数据分为两个部分:头部、内容。我们知道,netty对于拆包粘包的问题设计了FrameDecoder,这里的头部的用途之一就是数据包的解码。
首先将头部数据的位置先空出来,然后调用writeTo写入action名称(比如cluster:monitor/nodes/usage[n])。接着调用writeMessage方法,writeMessage方法内部调用Writeable.writeTo方法写入具体的二进制数据。前面我们说过对节点的请求数据类都要实现TransportRequest,因此这里实际调用的就是各个实现类定义的writeTo方法。
接着将写入位置重新定位到开头,写入头部数据。头部数据包含数据标记(固定为ES)、数据大小、请求id、状态、Elasticsearch程序版本。详细内容参看TcpHeader类。
B接收请求数据
前文我们分析过,请求数据到达Netty4MessageChannelHandler的channelRead方法。到达channelRead方法后调用Netty4Utils.toBytesReference将ByteBuf表示的数据转换成BytesReference表示的二进制数据。
数据经过流转到达InboundHandler的messageReceived方法,步骤如下:
- 调用
InboundMessage.Reader的deserialize方法将二进制数据初步解析成InboundMessage,提取出其中的头部数据以及内容。 - 调用
handleRequest方法处理请求。InboundMessage中的请求内容数据解析成TransportRequest。 - 新建线程执行
RequestHandler,处理请求数据。
B向A写响应数据
请求经过处理之后生成以TransportResponse表示的响应数据。继承关系如下:

TransportResponse和TransportRequest一样,同样继承了Writeable和Streamable接口。然后调用OutboundHandler的sendResponse方法发送响应数据:
1 | void sendResponse(final Version nodeVersion, final Set<String> features, final TcpChannel channel, |
可以看到sendResponse方法和前面分析的sendRequest方法大同小异,区别在于响应数据被包装成OutboundMessage.Response。和数据请求一样,调用OutboundMessage的serialize来生成响应的二进制数据。
A接收响应数据
和接收请求数据一样,到达channelRead方法后调用Netty4Utils.toBytesReference将ByteBuf表示的数据转换成BytesReference表示的二进制数据。响应数据经过流转到达InboundHandler的messageReceived方法。
不一样的地方在于对于请求数据调用handleRequest方法来处理,响应数据调用handleResponse方法来处理。如前文所示,handleResponse调用TransportResponseHandler.read方法将二进制流数据转化成NodeResponse数据,最后调用TransportResponseHandler.handleResponse方法生成http响应数据并返回。
总结
综上所述,Elasticsearch中围绕数据的读写有三个重要的类:BytesReference、StreamInput、StreamOutput。
BytesReference用于数据的二进制表示,StreamInput和StreamOutput用于数据流的读写。因为这样的设计,Elasticsearch对数据才能高效地进行读写。