Elasticsearch——网络数据的读写

上文Elasticsearch——transport网络模块我们分析了Elasticsearch中用户与节点、节点与节点之间如何通过网络来通信,本文我们来分析一下通过网络传输的数据是如何被读写的。涉及上文Elasticsearch——transport网络模块相关的内容,本文不再重复。

用户与节点间的数据

请求数据的读操作

用户与Elasticsearch的节点通信使用的是http协议,数据流转的初始点在Netty4HttpRequestHandlerchannelRead0方法。

下面根据函数的调用流程来看看用户的请求数据经过了哪些处理:

  1. Netty4HttpRequestHandler.channelRead0:netty生成并传入的是FullHttpRequest实例,使用这个实例包装出一个Netty4HttpRequest实例。Netty4HttpRequest类定义了http请求的所有内容,包括methodurlheadercookie协议版本请求体内容Netty4HttpRequest的构造函数中调用Netty4Utils.toBytesReference(final ByteBuf buffer)方法将netty生成的由ByteBuf实例表示的请求体包装成ByteBufBytesReference实例。ByteBufBytesReference是抽象类BytesReference的实现,提供了数据在字节这个层面的读写方法。
  2. AbstractHttpServerTransport.handleIncomingRequest:请求数据由Netty4HttpRequest实例再进一步包装成RestRequest实例。相比于Netty4HttpRequestRestRequest进一步从请求中分离出paramspath信息。

有了RestRequest实例,处理程序可以方便地从中获取有关请求的所有信息,除了请求体信息。此时的请求体信息还是以BytesReference实例的形式存在的,还需要将以字节形式存在的数据解析成拥有具体类型的数据。

这一步骤关键的接口是XContentParser。这里先介绍几个相关的接口和类:

  • XContentTypeElasticsearch支持4种数据类型,分别是JSONSMILEYAMLCBORXContentType是表示这四种数据类型的枚举类。
  • XContent:数据的抽象。因为支持4种数据类型,因此XContent有4种实现,分别是JsonXContentSmileXContentYamlXContentCborXContent
  • XContentParser:数据的解析器,4种数据类型的解析器实现分别是JsonXContentParserSmileXContentParserYamlXContentParserCborXContentParser

请求数据的解析流程如下:

  1. 根据数据的类型创建解析器XContentParser
  2. 将字节形式的数据转化成StreamInput流数据形式
  3. 使用解析器XContentParser将流数据解析成具体表示数据的类

响应数据的写操作

响应数据的发送初始点在RestResponseListenerprocessResponse方法。它调用由具体操作实现的抽象方法buildResponse构造响应数据,然后调用channel.sendResponse发送数据。

这里有一个相关的类XContentBuilder,该类是XContent数据的构造器。向XContentBuilder中添加数据,XContentBuilder可以根据指定的类型(4种)生成相应二进制格式的数据。

buildResponse返回一个RestResponse实例。RestResponse是定义响应数据的抽象类,实现类是BytesRestResponse。其中包含响应头、响应状态、数据内容、数据类型。其中数据内容以BytesReference实例的形式存在。BytesRestResponse构造的时候会调用BytesReference.bytes(XContentBuilder xContentBuilder)方法将XContentBuilder转化成字节形式的二进制格式。

最后根据BytesRestResponse数据生成Netty4HttpResponse实例,通过netty提供的方法发送。

节点之间的数据

假设A节点向B节点发送数据,完成一次通信包含4个步骤:A向B写请求数据,B接收请求数据,B向A写响应数据,A接收响应数据。下面分析这4个操作中数据是如何被读写的。

A向B写请求数据

对节点的请求数据以TransportRequest的实现类来表示。继承关系如下:

TransportRequest

可以看到,TransportRequest实现了WriteableStreamable接口。

Writeable接口定义了如下方法:

1
void writeTo(StreamOutput out) throws IOException;

这个接口的实现类具有向StreamOutput写入流数据的能力。

Streamable接口定义了如下方法:

1
2
void readFrom(StreamInput in) throws IOException;
void writeTo(StreamOutput out) throws IOException;

这个接口的实现类既能向StreamOutput写入流数据,也能从StreamInput中读取流数据。

所以TransportRequest类同时具有对二进制流的写出与读入功能。

TransportRequest通过OutboundHandlersendRequest方法被发送:

1
2
3
4
5
6
7
8
9
10
void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
final TransportRequest request, final TransportRequestOptions options, final Version channelVersion,
final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException {
Version version = Version.min(this.version, channelVersion);
OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action,
requestId, isHandshake, compressRequest);
ActionListener<Void> listener = ActionListener.wrap(()
-> messageListener.onRequestSent(node, requestId, action, request, options));
sendMessage(channel, message, listener);
}

TransportRequestsendRequest中被包装成OutboundMessage.Request类,然后调用sendMessage方法:

1
2
3
4
5
private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException {
MessageSerializer serializer = new MessageSerializer(networkMessage, bigArrays);
SendContext sendContext = new SendContext(channel, serializer, listener, serializer);
internalSend(channel, sendContext);
}

sendMessage方法中,使用发送数据OutboundMessage生成一个数据序列化器MessageSerializer,将MessageSerializer传递给SendContext,然后调用internalSend方法将数据发送出去。在internalSend方法中获取到二进制数据然后发送。二进制数据的获取如下所示:

1
2
3
4
public BytesReference get() throws IOException {
bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
return message.serialize(bytesStreamOutput);
}

分析OutboundMessageserialize方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
storedContext.restore();
bytesStream.setVersion(version);
bytesStream.skip(TcpHeader.HEADER_SIZE);

BytesReference reference;
try(CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
stream.setVersion(version);
threadContext.writeTo(stream);
writeTo(stream);
reference = writeMessage(stream);
}
bytesStream.seek(0);
TcpHeader.writeHeader(bytesStream, requestId, status, version, reference.length() - TcpHeader.HEADER_SIZE);
return reference;
}

可以看到,最终发送的数据分为两个部分:头部、内容。我们知道,netty对于拆包粘包的问题设计了FrameDecoder,这里的头部的用途之一就是数据包的解码。

首先将头部数据的位置先空出来,然后调用writeTo写入action名称(比如cluster:monitor/nodes/usage[n])。接着调用writeMessage方法,writeMessage方法内部调用Writeable.writeTo方法写入具体的二进制数据。前面我们说过对节点的请求数据类都要实现TransportRequest,因此这里实际调用的就是各个实现类定义的writeTo方法。

接着将写入位置重新定位到开头,写入头部数据。头部数据包含数据标记(固定为ES)、数据大小、请求id、状态、Elasticsearch程序版本。详细内容参看TcpHeader类。

B接收请求数据

前文我们分析过,请求数据到达Netty4MessageChannelHandlerchannelRead方法。到达channelRead方法后调用Netty4Utils.toBytesReferenceByteBuf表示的数据转换成BytesReference表示的二进制数据。

数据经过流转到达InboundHandlermessageReceived方法,步骤如下:

  1. 调用InboundMessage.Readerdeserialize方法将二进制数据初步解析成InboundMessage,提取出其中的头部数据以及内容。
  2. 调用handleRequest方法处理请求。InboundMessage中的请求内容数据解析成TransportRequest
  3. 新建线程执行RequestHandler,处理请求数据。

B向A写响应数据

请求经过处理之后生成以TransportResponse表示的响应数据。继承关系如下:

TransportResponse

TransportResponseTransportRequest一样,同样继承了WriteableStreamable接口。然后调用OutboundHandlersendResponse方法发送响应数据:

1
2
3
4
5
6
7
8
9
void sendResponse(final Version nodeVersion, final Set<String> features, final TcpChannel channel,
final long requestId, final String action, final TransportResponse response,
final boolean compress, final boolean isHandshake) throws IOException {
Version version = Version.min(this.version, nodeVersion);
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, response, version,
requestId, isHandshake, compress);
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response));
sendMessage(channel, message, listener);
}

可以看到sendResponse方法和前面分析的sendRequest方法大同小异,区别在于响应数据被包装成OutboundMessage.Response。和数据请求一样,调用OutboundMessageserialize来生成响应的二进制数据。

A接收响应数据

和接收请求数据一样,到达channelRead方法后调用Netty4Utils.toBytesReferenceByteBuf表示的数据转换成BytesReference表示的二进制数据。响应数据经过流转到达InboundHandlermessageReceived方法。

不一样的地方在于对于请求数据调用handleRequest方法来处理,响应数据调用handleResponse方法来处理。如前文所示,handleResponse调用TransportResponseHandler.read方法将二进制流数据转化成NodeResponse数据,最后调用TransportResponseHandler.handleResponse方法生成http响应数据并返回。

总结

综上所述,Elasticsearch中围绕数据的读写有三个重要的类:BytesReferenceStreamInputStreamOutput

BytesReference用于数据的二进制表示,StreamInputStreamOutput用于数据流的读写。因为这样的设计,Elasticsearch对数据才能高效地进行读写。