Elasticsearch——transport网络模块

Elasticsearch作为一个的分布式搜索引擎,能迅速发展并且流行,一个很大的原因是它大大降低了搜索引擎的使用门槛。想象一下,如果要为我们的后台系统开发一个搜索服务,需要从lucene开始开发,那将是一项巨大的工程。之所以Elasticsearch能达到很高的易用性,就在于它将底层的lucene进行了包装,与用户通过http请求来交互,屏蔽了诸如数据分片、副本、分布式存储检索、缓存、日志等一系列细节,基本能做到Elasticsearch宣称的开箱即用的易用程度。

transport是其中比较重要的一个模块,用户通过transport模块和Elasticsearch进行交互,Elasticsearch多个节点之间也通过transport模块传输信息。

基于faiss的分布式特征向量搜索系统一文中,我提到了整个系统是基于Elasticsearch的框架完成的。本文是对transport网络模块学习的一个总结。

HttpServerTransport

HttpServerTransport定义了用户请求处理的接口,具体的实现是Netty4HttpServerTransport

为了灵活性,Elasticsearch设计了一套简单的插件系统,同一个功能模块可以开发不同的实现,方便地进行替换。具体到网络模块,可以继承NetworkPlugin类,在继承类中定义HttpServerTransportTransport的实现。其中Transport是定义Elasticsearch节点之间通信的接口。目前Elasticsearchtransport-netty4中实现了整个网络模块。

Netty4HttpServerTransport

顾名思义,目前Elasticsearch是基于netty4来构建整个网络模块的。Netty4HttpServerTransport用于构建一个处理用户http请求的网络服务,继承关系如图所示:

Netty4HttpServerTransport

其主要的构建过程在doStart()方法中,熟悉netty使用的话可以很容易地理解其中的步骤:对网络参数进行一系列配置,然后设置一个childHandlerHttpChannelHandler

HttpChannelHandler中设置针对http的编解码器、http消息的压缩器、跨域请求的处理器、pipeline处理器(用于请求的按序返回响应),以及最重要的请求处理器Netty4HttpRequestHandler

http请求的处理

http请求处理方法经过了如下调用:

1
2
3
4
5
Netty4HttpRequestHandler.channelRead0
AbstractHttpServerTransport.incomingRequest
AbstractHttpServerTransport.handleIncomingRequest
AbstractHttpServerTransport.dispatchRequest
Dispatcher.dispatchRequest

其中Dispatcher是http请求的”调度器”,其实现类是RestControllerdispatchRequest方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request, channel);
return;
}
try {
tryAllHandlers(request, channel, threadContext);
} catch (Exception e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error(() ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}

首先对图标进行了一个特殊处理。接着调用tryAllHandlers尝试所有的http请求处理器(handler):

1
2
3
4
5
6
7
8
9
10
11
12
13
Iterator<MethodHandlers> allHandlers = getAllHandlers(request);
for (Iterator<MethodHandlers> it = allHandlers; it.hasNext(); ) {
final Optional<RestHandler> mHandler = Optional.ofNullable(it.next()).flatMap(mh -> mh.getHandler(request.method()));
requestHandled = dispatchRequest(request, channel, client, mHandler);
if (requestHandled) {
break;
}
}

// If request has not been handled, fallback to a bad request error.
if (requestHandled == false) {
handleBadRequest(request, channel);
}

RestController中维护了所有的http请求处理器(handler)。以搜索请求的处理器RestSearchAction为例:

1
2
3
4
5
6
7
8
9
10
11
public RestSearchAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(GET, "/_search", this);
controller.registerHandler(POST, "/_search", this);
controller.registerHandler(GET, "/{index}/_search", this);
controller.registerHandler(POST, "/{index}/_search", this);

// Deprecated typed endpoints.
controller.registerHandler(GET, "/{index}/{type}/_search", this);
controller.registerHandler(POST, "/{index}/{type}/_search", this);
}

它在函数构造的时候会调用registerHandler函数向RestController注册处理器(handler)。

RestControllertryAllHandlers方法处理请求时,根据请求的路径和method在所有的handler中筛选匹配的handler。然后调用dispatchRequest方法依次使用匹配的handler来处理请求,直到请求处理成功。如果所有的handler都处理失败则调用handleBadRequest方法返回400错误信息。

dispatchRequest方法首先检查http的一些请求参数,比如ContentType、请求体的格式等。最后调用handlerhandleRequest方法来处理请求。

handleRequest方法位于抽象类BaseRestHandler中,它主要的功能是检查请求参数、增加统计数据、调用方法prepareRequest获取请求的具体处理方法,然后调用这个方法。

prepareRequest是一个抽象方法,具体的实现在各个具体的handler中。还是以RestSearchAction为例,它在prepareRequest中返回的方法如下:

1
return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));

即调用NodeClientsearch方法:

1
2
3
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
execute(SearchAction.INSTANCE, request, listener);
}

分析调用关系可以知道execute方法最终调用的NodeClient中的executeLocally方法:

1
2
3
4
5
public <    Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(Action<Response> action, Request request, ActionListener<Response> listener) {
return transportAction(action).execute(request, listener);
}

Elasticsearch在这里有一个设计,它将不同类型的请求抽象成Action,请求的处理抽象成TransportAction。以搜索请求为例,它Action的实现类是SearchActionTransportAction的实现类是TransportSearchAction。其他的请求也以这样的设计来分别实现不同的ActionTransportActionActionTransportAction的绑定操作在ActionModulesetupActions方法中,在程序启动时会调用该方法完成所有请求Action的绑定。

所以上面的executeLocally方法中就容易理解了。transportAction方法根据Action找到对应的TransportAction,这里就是TransportSearchAction,然后调用execute方法执行处理操作。

execute方法中新建一个RequestFilterChain,然后调用其中的proceed方法。RequestFilterChain的目的是为了在请求的处理之前先执行过滤器操作(比如安全性验证等),最后执行doExecute

doExecute是一个抽象函数,由各个具体的实现类(比如TransportSearchAction)来实现。

返回Response

前面我们分析了用户的http请求是如何经过调度一步一步的到达相应的处理类,现在我们来看看请求处理完之后是如何返回处理结果的。

还是以搜索请求为例,我们在前面看到它prepareRequest函数中传入了一个RestStatusToXContentListener实例,来看一下它的继承关系:

RestStatusToXContentListene

上溯RestStatusToXContentListener的继承关系,它最初的源头是ActionListener

ActionListener定义了处理response和failure的方法。请求处理结束之后会生成一个结果,调用ActionListeneronResponse方法返回结果。

对于不同的请求,响应的返回过程会有一些相同和不同点。下面来分析一下响应是如何被返回的。

首先onResponse方法的实现在RestActionListener中,它调用了抽象方法processResponseprocessResponse的实现在RestResponseListener中:

1
2
3
protected final void processResponse(Response response) throws Exception {
channel.sendResponse(buildResponse(response));
}

它调用抽象方法buildResponse构造返回数据,然后调用channel.sendResponse发送数据。

对于buildResponse,不同的请求会有不同的实现,还是以RestStatusToXContentListener为例,它的实现在RestToXContentListener中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public RestResponse buildResponse(Response response) throws Exception {
return buildResponse(response, channel.newBuilder());
}

public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
assert response.isFragment() == false;
response.toXContent(builder, channel.request());
return new BytesRestResponse(getStatus(response), builder);
}

protected RestStatus getStatus(Response response) {
return RestStatus.OK;
}

可以看到,这里返回数据按格式解析成二进制数据。

Transport

Transport是网络模块中第二个重要的接口,它定义了Elasticsearch节点之间的通信。

Elasticsearch一个重要的特征就是它是一个分布式的搜索引擎,它可以将索引和搜索等操作分配到多个节点中进行,这样就可以充分利用多个节点的存储与计算资源。

节点之间的请求

在分析Transport之前,先来分析一下Elasticsearch是如何向各个节点发送请求的。为了简单起见,我们以节点使用统计接口为例(url:/_nodes/usage),它所实现的TransportActionTransportNodesUsageAction,继承关系如下:

TransportNodesUsageAction

节点使用统计接口需要向所有节点发送请求,针对这种情况Elasticsearch开发了一个抽象类TransportNodesActionTransportNodesAction有四个抽象方法:

1
2
3
4
5
6
7
protected abstract NodesResponse newResponse(NodesRequest request, List<NodeResponse> responses, List<FailedNodeException> failures);

protected abstract NodeRequest newNodeRequest(String nodeId, NodesRequest request);

protected abstract NodeResponse newNodeResponse();

protected abstract NodeResponse nodeOperation(NodeRequest request);

这里涉及4种泛型:

  • NodesRequest:用户给出的请求
  • NodesResponse:返回给用户的响应
  • NodeRequest:对各个节点的请求
  • NodeResponse:各个节点返回的响应

理清了这四种泛型,我们就能明白TransportNodesAction四个抽象方法的用途了:

  • newResponse:生成返回给用户的响应
  • newNodeRequest:生成对各个节点的请求
  • newNodeResponse:生成节点的响应实例,将各个节点的响应填充到生成的这个实例中
  • nodeOperation:定义节点收到请求后的操作

这四个抽象类的具体使用在下文分析。

AsyncAction

上文我们分析过,请求最后的处理在TransportAction的抽象方法doExecute,这个doExecute方法是由各个处理类自己实现的。TransportNodesUsageActiondoExecute方法在父类TransportNodesAction中定义:新建AsyncAction示例,并调用其start()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
AsyncAction(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
this.task = task;
this.request = request;
this.listener = listener;
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
return;
}
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
}
for (int i = 0; i < nodes.length; i++) {
final int idx = i;
final DiscoveryNode node = nodes[i];
final String nodeId = node.getId();
try {
TransportRequest nodeRequest = newNodeRequest(nodeId, request);
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
NodeResponse nodeResponse = newNodeResponse();
nodeResponse.readFrom(in);
return nodeResponse;
}

@Override
public void handleResponse(NodeResponse response) {
onOperation(idx, response);
}

@Override
public void handleException(TransportException exp) {
onFailure(idx, node.getId(), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} catch (Exception e) {
onFailure(idx, nodeId, e);
}
}
}

我们来分析这个方法,可以达到管中窥豹的效果。

首先调用concreteNodes方法获取请求涉及的节点。涉及的节点是在构造AsyncAction时调用resolveRequest方法解析并设置的。如果节点为空,则直接返回空响应。

接着遍历涉及的节点,向每个节点发送请求:调用newNodeRequest方法生成对节点的请求,然后调用TransportServicesendRequest方法向节点发送请求。

sendRequest方法的调用关系如下:

1
2
3
4
5
TransportServivce.sendRequest
TransportServivce.getConnection
TransportServivce.sendRequest
TransportServivce.sendRequestInternal
Transport.Connection.sendRequest

主要的流程就是根据节点获取与节点的连接,然后调用sendRequest方法向节点发送数据。

发送数据的过程根据节点类型的不同有所不同。

本地节点的发送流程如下:

1
2
3
TransportService.sendLocalRequest
RequestHandlerRegistry.processMessageReceived
NodeTransportHandler.messageReceived

NodeTransportHandler.messageReceived中调用nodeOperation处理请求、生成节点响应,然后返回响应。

远程节点的发送流程如下:

1
2
3
4
5
NodeChannels.sendRequest
OutboundHandler.sendRequest
OutboundHandler.sendMessage
OutboundHandler.internalSend
Netty4TcpChannel.sendMessage

可以看到,对远程节点的请求最终通过netty的方法发送给相应的节点。

Netty4Transport

通过前面对节点的请求分析,我们对Elasticsearch的分布式请求有了初步的认识。下面我们来看看响应这个请求的网络服务是如何构建的。

Netty4TransportTransport的实现类,其继承关系如下:

Netty4Transport

先看看它的初始化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 protected void doStart() {
boolean success = false;
try {
ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory);
clientBootstrap = createClientBootstrap(eventLoopGroup);
if (NetworkService.NETWORK_SERVER.get(settings)) {
for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings, eventLoopGroup);
bindServer(profileSettings);
}
}
super.doStart();
success = true;
} finally {
if (success == false) {
doStop();
}
}
}

其初始化过程并不复杂:

  1. 创建一个客户端
  2. 根据配置创建服务端并绑定ip和端口

服务端的创建和上文提到的Netty4HttpServerTransport类似,配置相关的参数并设置请求处理的handlerNetty4MessageChannelHandler

节点请求的处理

节点请求最初到达Netty4MessageChannelHandlerchannelRead方法,这是由netty调用的。请求数据经过如下的调用:

1
2
3
4
5
Netty4MessageChannelHandler.channelRead
TcpTransport.inboundMessage
InboundHandler.inboundMessage
InboundHandler.messageReceived
InboundHandler.handleRequest

InboundHandler.handleRequest方法首先调用getRequestHandler获取请求的handler。这个handlerTransportNodesAction构造的时候注册的:

1
2
this.transportNodeAction = actionName + "[n]";
transportService.registerRequestHandler(transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler());

它在原来的actionName后加上[n]以此来表示这是对节点的请求。

然后启动新线程执行RequestHandler线程。调用关系如下:

1
2
RequestHandlerRegistry.processMessageReceived
NodeTransportHandler.messageReceived

processMessageReceived方法调用之前注册的NodeTransportHandler中的messageReceived方法,messageReceived方法调用nodeOperation处理请求、生成节点响应,然后返回响应。

节点响应的处理

节点响应最初到达Netty4MessageChannelHandlerchannelRead方法,这是由netty调用的。响应数据经过如下的调用:

1
2
3
4
5
6
7
8
9
Netty4MessageChannelHandler.channelRead
TcpTransport.inboundMessage
InboundHandler.inboundMessage
InboundHandler.messageReceived
InboundHandler.handleResponse
ContextRestoreResponseHandler.read
TransportResponseHandler.read
ContextRestoreResponseHandler.handleResponse
TransportResponseHandler.handleResponse

首先经过TransportResponseHandler.read方法,在前面的AsyncAction.start方法中,我们看到read方法以匿名类的方式定义:首先调用newNodeResponse生成节点响应实例,然后调用readFrom将二进制数据转化并填充到响应中。

得到响应之后,接着调用TransportResponseHandler.handleResponse方法,它调用onOperation来处理生成的响应数据:

1
2
3
4
5
6
private void onOperation(int idx, NodeResponse nodeResponse) {
responses.set(idx, nodeResponse);
if (counter.incrementAndGet() == responses.length()) {
finishHim();
}
}

将响应数据保存到responses中,responses是一个数据对象。

如果所有节点的响应都返回了,则调用finishHim方法来处理各个节点的响应数据:

1
2
3
4
5
6
7
8
9
10
11
private void finishHim() {
NodesResponse finalResponse;
try {
finalResponse = newResponse(request, responses);
} catch (Exception e) {
logger.debug("failed to combine responses from nodes", e);
listener.onFailure(e);
return;
}
listener.onResponse(finalResponse);
}

finishHim方法调用newResponse将各个节点的响应数据汇总成一个最终的响应。然后调用listener.onResponse方法将响应返回给用户。

总结

transport网络模块是Elasticsearch分布式系统的基础,了解了它的工作原理,对于Elasticsearch整体架构会有进一步的理解。同时对于其他工具的开发也会有很大的借鉴意义。