jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之索引过程(二)

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 3385 次 | 发布:2015-12-05 6:06 p.m.

接着上一篇的,在写入了buffer和translog之后,数据仍然不能被搜到,必须还得执行一个操作---refresh,而将segment刷到磁盘还需要一个操作---flush;为了保证数据的一致性,还需要一个之前提到的translog,那么我们这篇就来研究他们是怎么工作的。

refresh

Lucene 把每次生成的倒排索引,叫做一个segment。然后另外使用一个 commit 文件,记录索引内所有的 segment。而生成 segment 的数据来源,则是内存中的 buffer,就是是上篇说道的数据最后写入的地方,要想数据被搜索就需要做refresh,盗来官方图来说明:

初始:

结束:

其实就是将buffer中刷到文件系统缓存中,那接下来从代码层面来看看:

每个shard上都会有一个schedule在定时地做refresh任务,其中refresh的频率由配置index.refresh_interval来决定,默认为1s。

一、首先得判断是否需要refresh,判断条件:
1、针对每一个shard都有相应的store实例,需增加store实例的引用计数成功。该store提供给了es shard的写入文件的权限,每个shard都有一个专门的store实例用于访问Lucene目录,该目录用于读写lucene索引。该store也提供了操作metadata信息的权限,例如已经commit过的文件的checksums,已经commit文件属于一个segment,在Lucene commit的时候写入。这里的引用计数用于决定该store何时被安全地关闭,例如只要没有引用了就需要关闭,其实相应地,在最后引用完成后要减少引用计数,不然的话该store永远不会被关闭了,当然计数减为0也就自动关闭了。使用如下:

   store.incRef();
   try {
     // use the store...

   } finally {
       store.decRef();
   }

2、当前的Searcher已经发生过改变了。

二、现在可以开始refresh了。

1、 在核实当前shard没有关闭之后开始refresh,因为在fresh的时候同时发生flush,所以开始时会获取一个读的lock,并保证只有一个线程在refresh当前shard。

2、refresh在Lucene中的searcherManager(管理IndexReader的重建和关闭,保证了线程安全,封装了IndexSearcher的生成)完成,其实就是尝试打开新的IndexReader,本质调用的是DirectoryReader.openIfChanged,来更新searcher引用。

至此,refresh完成,由于更新完searcher的引用,所以现在数据才可以被搜索到。

flush

上面的refresh已经说了将数据写入到了文件缓存,为了防止由于机器故障等原因产生的数据丢失,最后还是要写入到磁盘上才是上策,那么这一步是怎么做的呢?其实这就是flush,而且上一篇文章中说的写入数据的同时写入的translog 也会在其中扮演重要的角色。

同样盗来官方两张图来说明:

开始状态:

中间状态:

最后状态:

OK,继续来对代码解读分析:

一、判断执行条件是否满足:

Flush的频率控制是由配置index.translog.interval来控制,默认是5s。但是是否执行flush还需要一系列的条件满足:

  1. 当前索引不是关闭状态;
  2. flush不是disable的,因为配置index.translog.disable_flush可以关闭flush,其实在短时导入大量数据时,建议可以disable掉flush;
  3. translog实例不为空;
  4. 当前shard的operations大于配置index.translog.flush_threshold_ops,默认为Integer.MAX_VALUE,即2^31-1;
  5. 当前translog的size大于配置index.translog.flush_threshold_size,默认是512mb;
  6. 距离上一次flush的时间间隔大于配置index.translog.flush_threshold_period,默认是30min;

上面六条中前三是必须条件,后三条是任意满足就执行;初始化一个FlushRequest后就开始flush的执行。

二、执行flush:

1、flush的执行前要判断当前shard的状态,只有是STARTED或是RECOVERING或是POST_RECOVERY才允许操作;

2、这里会有两把锁,一是read lock,另一个是flush lock,而且锁的顺序是很重要的,不然会造成死锁: 线程1:通过api flush,获得了flush lock,但是read lock block了,因为线程2拥有了write lock; 线程2:在recovery末尾的flush拥有write lock,但是线程1的flush lock block了。

3、如果indexWriter拥有还没有commit的changes,或是api要求强制flush,那么进入执行程序;

4、首先执行prepareCommit:

    final TranslogWriter oldCurrent = current;
    oldCurrent.sync();
    currentCommittingTranslog = current.immutableReader();
    Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
    assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
    Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration()));
    Files.copy(checkpoint, commitCheckpoint);
    IOUtils.fsync(commitCheckpoint, false);
    IOUtils.fsync(commitCheckpoint.getParent(), true);

    current = createWriter(current.getGeneration() + 1);
    for (View view : outstandingViews) {
        view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter());
    }
    IOUtils.close(oldCurrent);
    logger.trace("current translog set to [{}]", current.getGeneration());
    assert oldCurrent.syncNeeded() == false : "old translog oldCurrent must not need a sync";

大体逻辑是读取checkpoint(存在文件translog.ckp中),如果checkpoint和当前tanslog的checkpoint不一样,就用文件translog.ckp拷贝到文件translog-xxx.ckp中(xxx可以看做是checkpoint的id),然后分别fsync commitCheckpoint和其同目录下的所有文件。然后创建一个新的tanslog文件,checkpoint id加1,并将读指向新的文件,关闭老的tanslog的写入。

5、执行commitIndexWriter:

    Translog.TranslogGeneration translogGeneration = translog.getGeneration();
    logger.trace("committing writer with translog id [{}]  and sync id [{}] ", translogGeneration.translogFileGeneration, syncId);
    Map<String, String> commitData = new HashMap<>(2);
    commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
    commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
    if (syncId != null) {
        commitData.put(Engine.SYNC_COMMIT_ID, syncId);
    }
    indexWriter.setCommitData(commitData);
    writer.commit();

6、执行refresh,对,你没有看错,就是上面的那个refresh,目的是清除老的Version values;

7、执行translog的commit,逻辑简单讲就三步:

   IOUtils.close(recoveredTranslogs);
   recoveredTranslogs.clear();

   toClose = this.currentCommittingTranslog;
   this.currentCommittingTranslog = null;

    IOUtils.close(toClose);

关闭recoveredTranslogs,并将列表清空,初始化当前committingTanslog。并关闭旧的committingTanslog;

到此为止,整个flush就介绍了啦。

总结

其实说了这么多,大家只需要知道些:ES新插入的数据必须refresh之后才能被搜索,最后写入到磁盘需要执行flush操作,为了避免flush前数据丢失,ES还记录了translog日志用以replay。这里的refresh和flush都是针对单个shard的。


转载请注明出处:http://www.opscoder.info/es_indexprocess2.html

其他分类: