jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之Rest模块

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 2661 次 | 发布:2016-03-06 10:58 p.m.

看了看上一篇博客的发表时间,发现自己已经有两个月没有更新过了,过年前后的确是有些忙。OK,抽空继续来Elasticsearch的源码研究之路,我们来开启一个全新的部分,关于Elasticsearch的restful api的,本篇文章我会首先大体介绍一下,restful模块的构建,后面会有博文来具体介绍几个api的处理过程。

启动httpserver

首先要做的当然是在http node上面启动一个http server,之前也说了,Elasticsearch中的数据信息传输底层都是用的Netty,在这里也不例外,由NettyHttpServerTransport来负责充当http server的角色,在启动之前呢,会有一大批的设置需要加载,包括端口啊,绑定的host啊,compression啊等等,有个特点是这些设置都是以http.来开头的,在这里就不做多的阐述了。需要注意的几个设置:

http.netty.max_content_length:最大的长度,默认是100MB,如果你设置的的byte数大于int最大值时,会强制被置为100MB;

http.cors.allow-origin:在用kibana4访问的时候有时会出现跨域的问题,所以需要将这个配置设置为"*";

http.netty.worker_count:用于http处理的线程数,默认为Math.min(32, Runtime.getRuntime().availableProcessors())的两倍;

下面开始启动这个server,新建一个Netty中的ServerBootstrap实例,并将上面的配置都加载上,再将address bind上去。

对于http请求的处理,则交给RestController来处理,在RestController中有方法dispatchRequest用于分发不同的请求到相应的handler,并有handler执行handler.handleRequest(request, channel);

restful api的注册过程

1、 首先会将RestController绑定在RestModule模块上,那么RestController其实就是控制整个模块的核心了,在其中会初始化几个RestHandler用以处理不同类型的request请求,它们包括:

private final PathTrie<RestHandler> getHandlers = new PathTrie<>(RestUtils.REST_DECODER);
private final PathTrie<RestHandler> postHandlers = new PathTrie<>(RestUtils.REST_DECODER);
private final PathTrie<RestHandler> putHandlers = new PathTrie<>(RestUtils.REST_DECODER);
private final PathTrie<RestHandler> deleteHandlers = new PathTrie<>(RestUtils.REST_DECODER);
private final PathTrie<RestHandler> headHandlers = new PathTrie<>(RestUtils.REST_DECODER);
private final PathTrie<RestHandler> optionsHandlers = new PathTrie<>(RestUtils.REST_DECODER);

get、post、put、delete、head和option,当每次有新的接口需要注册时,就会将相应的url的path以及对此接口的处理,加入到handler中:

public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
    switch (method) {
        case GET:
            getHandlers.insert(path, handler);
            break;
        case DELETE:
            deleteHandlers.insert(path, handler);
            break;
        case POST:
            postHandlers.insert(path, handler);
            break;
        case PUT:
            putHandlers.insert(path, handler);
            break;
        case OPTIONS:
            optionsHandlers.insert(path, handler);
            break;
        case HEAD:
            headHandlers.insert(path, handler);
            break;
        default:
            throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
    }
}

2、 接下来需要将restPluginsActions注册进去,其中restPluginsActions是一个包含一些通过plugin添加的restHandler的列表,此时会依次将他们注册进来:

for (Class<? extends BaseRestHandler> restAction : restPluginsActions) {
    bind(restAction).asEagerSingleton();
}

3、 Elasticsearch里面包含了大量的restful api,在这里需要将它们都注册进来,看了一下,大约有一百多个这样的api接口,通过如下方式注册进来:

bind(RestMainAction.class).asEagerSingleton();

bind(RestNodesInfoAction.class).asEagerSingleton();
bind(RestNodesStatsAction.class).asEagerSingleton();
bind(RestNodesHotThreadsAction.class).asEagerSingleton();
bind(RestClusterStatsAction.class).asEagerSingleton();
bind(RestClusterStateAction.class).asEagerSingleton();
bind(RestClusterHealthAction.class).asEagerSingleton();

// ………………(太多,剩余的省略掉了)

值得注意的是对于cat api,有特殊的处理方式,因为其第一级的path是一样的,通过Multibinder来注册:

Multibinder<AbstractCatAction> catActionMultibinder = Multibinder.newSetBinder(binder(), AbstractCatAction.class);
catActionMultibinder.addBinding().to(RestAllocationAction.class).asEagerSingleton();
catActionMultibinder.addBinding().to(RestShardsAction.class).asEagerSingleton();
catActionMultibinder.addBinding().to(RestMasterAction.class).asEagerSingleton();
catActionMultibinder.addBinding().to(RestNodesAction.class).asEagerSingleton();

以RestMainAction为例来说说处理过程

我们以最简单的/来看看具体Elasticsearch是怎么处理请求的。

1、 就像之前所说的,首先要将其添加到RestController中:

controller.registerHandler(GET, "/", this);
controller.registerHandler(HEAD, "/", this);

2、 override方法handleRequest,其实真正的处理逻辑在这里:

protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;

接收三个参数,分别是request(包括请求的method、uri、path、header、param以及body),channel(用于构造输出并发送responses),和client(Elasticsearch的client实例,其包含一些index、get、update等等各种操作)。

具体到这个例子中通过XContentBuilder builder = channel.newBuilder();channel来创建一个构造返回结果的build,构造过程:

builder.startObject();
if (settings.get("name") != null) {
    builder.field("name", settings.get("name"));
}
builder.field("cluster_name", clusterName.value());
builder.startObject("version")
        .field("number", version.number())
        .field("build_hash", Build.CURRENT.hash())
        .field("build_timestamp", Build.CURRENT.timestamp())
        .field("build_snapshot", version.snapshot)
        .field("lucene_version", version.luceneVersion.toString())
        .endObject();
builder.field("tagline", "You Know, for Search");
builder.endObject();

对滴,这就是我们访问rootpath所看到的结果,然后通过channel把结果返回:

channel.sendResponse(new BytesRestResponse(status, builder));

这里的status在Elasticsearch中的返回状态码都有不同的含义,比如:CONTINUE(100),SWITCHING_PROTOCOLS(101),OK(200),CREATED(201),ACCEPTED(202), NON_AUTHORITATIVE_INFORMATION(203),BAD_REQUEST(400)……太多太多了,不一一列举了。

总结

大体的内容就是这些了,这里只是举了简单的例子,后面我会就一些常用的看看其处理的过程,比如查询啊、一些plugin啥的。


转载请注明出处:http://www.opscoder.info/es_rest.html

其他分类: