jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之Transport模块

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 3126 次 | 发布:2015-11-22 6:09 p.m.

感觉很有必要将transport模块早点看看,这个模块在elasticsearch中用的很广泛,比如之前说的集群node之间的通信、数据的传输、transport client方式的数据发送等等,只要数和通信、数据传输相关的都离不开transport模块的作用。

transport模块分为LocalTransport和NettyTransport两种,在TransportModule中注册中可以通过node是local还是network的来判别使用哪一种transport,可以通过配置node.mode来决定,bind逻辑如下:

    String defaultType = DiscoveryNode.localNode(settings) ? LOCAL_TRANSPORT : NETTY_TRANSPORT;
    String typeName = settings.get(TRANSPORT_TYPE_KEY, defaultType);
    Class<? extends Transport> clazz = transports.get(typeName);
    if (clazz == null) {
        throw new IllegalArgumentException("Unknown Transport [" + typeName + "]");
    }
    bind(Transport.class).to(clazz).asEagerSingleton();

默认而且通常是使用netty的,所以我们只来讨论NettyTransport。

NettyTransport分为四种类型的连接,分别是:

  • recovery:做数据恢复recovery,默认个数2个;
  • bulk:用于bulk请求,默认个数3个;
  • med/reg:典型的搜索和单doc索引,默认个数6个;
  • high:如集群state的发送等,默认个数1个;
  • ping:就是node之间的ping咯。默认个数1个;

其中recovery和bulk之前版本是同一个的,叫做low,表示大数据量的传输,它们可能会导致通常的请求(如search或是单数据索引)耗时加长;

NettyTransport的底层用的是java的一个nio框架Netty,对于Netty的了解不是很多,代码里面有很多用到的地方就暂且一笔带过了。

配置的加载

简单说一下需要加载的一些配置吧,捡几个重点的说啊:

workerCount

workerCount表示transport的总共的worker数目,由transport.netty.worker_count来配置,默认值是32和Runtime.getRuntime().availableProcessors()中的最小值,也就是不能超过32,为什么会有这个限制呢?是因为在elasticsearch的github上有人提了个issues/3478,当使用core很多的机器的时候(比如48core),会创建太多的内存从而导致OOM,所以设置了32的上限来避免太多线程给系统产生压力。

connection number

对于每种连接都可以设置连接数: 其中最后面的三个是必须大于0的。

ReceiverPredictor

再看看ReceiverPredictor参数,因为Netty是nio的,在Netty中通过ReceiveBufferSizePredictor根据上次消息的大小来决定预测本次消息所需的缓存大小。从channel读取数据到缓存到,并向上行流通知消息接收事件。在ES中的默认值是这么算出来的:

    long defaultReceiverPredictor = 512 * 1024;
    if (JvmInfo.jvmInfo().getMem().getDirectMemoryMax().bytes() > 0) {
        // we can guess a better default...
        long l = (long) ((0.3 * JvmInfo.jvmInfo().getMem().getDirectMemoryMax().bytes()) / workerCount);
        defaultReceiverPredictor = Math.min(defaultReceiverPredictor, Math.max(l, 64 * 1024));
    }

先解释一下JvmInfo.jvmInfo().getMem().getDirectMemoryMax(),其实最终调用的就是jdk中sun.misc.VM类里面的directMemory ,瞅了眼源代码,http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/sun/misc/VM.java,就是通过 -XX:MaxDirectMemorySize来配置的,默认是64M.
如果transport.netty.receive_predictor_mintransport.netty.receive_predictor_max没有配置的话就会使用上面的默认值,根据max、min是否相等,来初始化receiveBufferSizePredictorFactory,分别为FixedReceiveBufferSizePredictorFactoryAdaptiveReceiveBufferSizePredictorFactory官方也建议设置的时候两者相等。都是写Netty的东西,在这里就不深究了。

运行

在doStart中会根据配置启动一个client和一个server,分别是ClientBootstrapServerBootstrap(都是Netty中的),因为节点之间要相互通信的,所以client和sever都要启动。并分别注册各自的PipelineFactory,在PipelineFactory中创建各自的channelPipeline,其中注册了消息处理的方式。

创建连接

其实就是创建Netty中的Channel,一个连接就是第一个Channel,根据之前所说的会有多个类型的连接会创建:

nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);

根据配置的连接数创建,具体代码如下:

    ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
    ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
    ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
    ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
    ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
    InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    for (int i = 0; i < connectRecovery.length; i++) {
        connectRecovery[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectBulk.length; i++) {
        connectBulk[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectReg.length; i++) {
        connectReg[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectState.length; i++) {
        connectState[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectPing.length; i++) {
        connectPing[i] = clientBootstrap.connect(address);
    }

注意之前说过后面三个的连接不能为0,那么前面两个呢,逻辑是这样的,如果recovery的连接为0,使用bulk的连接作为recovery,如果bulk为0,就会用reg的连接,就是一级一级往下找。创建其实就是使用Netty中的ChannelFuture。注意:在创建连接的过程中会有一个连接锁,来保证不可能有一个已经存在的连接。

request发送

在方法sendRequest中,这个方法会传入一个参数options,是一个TransportRequestOptions的实例,包含三个属性:timeout(超时时间)、compress(是否压缩)、type(发送的类型,即上面说的五个之一)。根据targetnode和type来获取相应的连接。获取连接后,数据该压缩的压缩(压缩方法在CompressorFactory中实现),并写入version和action;通过ChannelBuffers创建出buffer;最后通过Netty中的targetChannel.write(buffer);将数据发送成功。

消息处理

消息处理在之前所说的channelPipeline中注册,MessageChannelHandler类负责消息接受及处理逻辑,在其他模块中会对不同的消息(Action)注册对应的处理程序(handler)。在对收到的内容进行解析的过程中获取到action,找到对应的handler进行处理;;消息处理则需要messageReceived,里面会有对于request和response分别有相应的handler来处理:handleRequest、handleResponse。

总结

transport大体就是这么写东西,主要就是消息是怎么发的,怎么接受的;怎么处理的话交给各自的模块来处理。


转载请注明出处:http://www.opscoder.info/es_transport.html

其他分类: