jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

etcd-raft使用分析

作者:jasper | 分类:分布式 | 标签:     | 阅读 549 次 | 发布:2018-01-26 11:57 p.m.

最近在做一些分布式方面的项目,涉及到的数据一致性的问题当然没有造轮子,而是使用的开源的的实现,由于是Golang语言的,所以使用了hashcorp的raft库来实现,该库使用起来比较方便;在业界还有一个现成的raft实现就是etcd,使用会麻烦的多,这篇文章就来看看其具体的使用。

前言

现在已知的Golang版本的raft的开源实现主要有两个:一个是coreOS的etcd中raft的实现,使用的项目有比如tidb,cockroachdb等;另外一个是hashcorp的raft实现,使用的项目有比如consul,influxdb等。相比而言,hashcorp的raft实现比较完整,包括snapshot,wal,storage等等皆有实现,在使用的时候只需要关心业务代码,并apply需要同步的信息。而etcd的raft则是一个raft协议的轻量级实现,前面说的这些都需要调用方去实现,这无疑增加了复杂度,但是带来的好处则是使用更加灵活,所以tidb,cockroachdb都是基于etcd的raft来实现Multi Raft的。

那么要学习etcd的raft的用法,我们最好是找到一个最简单的例子来分析,好在在etcd中,作者写了个示例程序——一个内存型 KV Store,简单展示了应用程序该如何使用etcd-raft。该示例程序位于etcd/contrib/raftexample之中。

代码分析

程序入口

在初始化的时候,提供给我们一个最上层的raft的数据流向的思路,为了实现用户代码和raft的交互,我们需要4个channel:

  • proposeC := make(chan string)
  • confChangeC := make(chan raftpb.ConfChange)
  • commitC := make(chan *string)
  • errorC := make(chan error)

其中proposeC让用户代码可以向raft提交写请求(Propose)。 confChangeC是用户代码向raft提交配置变更请求(ProposeConfChange)。这两个channel直接在 main.go 这个文件里就创建好并传递给各个组件。它们是raft数据入口

而剩下的两个channel在raft.go:newRaftNode里边被创建. 它们是raft数据出口commitC把已经committed entriesraft中暴露出来给用户代码来进行写State MachineerrorC 让用户可以及时处理raft抛出的错误信息。

raftNode

raftNode是etcd中真正的执行者. 它主要是封装了一个Node的interface在里边,然后围绕这个interface做各种事情. 其中包括:

  • 维护几个主要的数据通道: proposeC, confChangeC, commitC, errorC 等
  • 节点的各种信息: id, peers, index, raftStorage, ...
  • WAL 的读写
  • snapshot 读写
  • 状态机的操作等等

其中Node主要用到的一些方法有:

  • Tick()
  • Stop()
  • Advance()
  • ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
  • ApplyConfChange(cc pb.ConfChange) *pb.ConfState
  • Propose(ctx context.Context, data []byte) error
  • Ready() <-chan Ready
  • Step(ctx context.Context, msg pb.Message) error

这些函数看名字都大体知道其作用,在这里我们先不细说,下面流程中会依次碰到。

raft集群增加/删除节点

大致流程:

  1. 在从confChangeC中收到请求之后立刻调用raft.Node这个interface中的ProposeConfChange方法来让raft内部处理这个请求.
  2. raft处理完成之后,raft会通过Ready() <-chan Ready把请求内容再次吐出来. 用户代码需要消费并且处理这些请求.
func (rc *raftNode) serveChannels() 
//...
case cc, ok := <-rc.confChangeC:
    //...
    rc.node.ProposeConfChange(context.TODO(), cc)


func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool
//...
case raftpb.EntryConfChange:
    var cc raftpb.ConfChange
    cc.Unmarshal(ents[i].Data)
    rc.confState = *rc.node.ApplyConfChange(cc)
    switch cc.Type {
    case raftpb.ConfChangeAddNode:
        if len(cc.Context) > 0 {
            rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
        }
    case raftpb.ConfChangeRemoveNode:
        if cc.NodeID == uint64(rc.id) {
            log.Println("I've been removed from the cluster! Shutting down.")
            return false
        }
        rc.transport.RemovePeer(types.ID(cc.NodeID))
    }
}

这其实就是raft的最核心思想了. raft本身就是实现了一个Replicated State Machine. State Machine 具有这么一个特性: 按照相同的顺序来执行一组相同的命令,最终产生完全相同的结果Replicated State Machine 就是多个完全一样的State Machine按照相同的顺序来执行一组相同的命令,最终所有的State Machine都产生相同的结果. 为了做到这一点,就需要把所有的命令全部都复制到所有的State Machine里去。这就需要序列化/反序列化操作. 这也是为什么接口Propose接收的是[]byte而不是interface. 因为只有用户才知道你的命令需要如何序列化/反序列化. 而ProposeConfChange因为是固定的几个命令,所以省去了用户代码的这个操作。

由此可以推断出. raft 的基本工作流程是

这里的raft 把cmd []byte 复制到所有的节点其实也是一个迷。我们之后会慢慢解开.

最后用户代码通过调用rc.transport.AddPeerrc.transport.RemovePeer来执行添加/删除节点的命令。这就是一个最简单的raft的完整工作流。管理集群节点这种操作数据量实在太小了。所以我们用这样的简单工作流基本上是没什么问题的。但操作KV数据就要开始复杂一些了。

写数据流程

写数据的大致流程如下:

简单来说就是: 用户发送一个http的PUT请求,用来写入KV数据。这个http请求会调用kvStore.Propose方法把请求数据通过proposeC这个数据入口发给raftraft经过一系列的操作之后会把数据通过commitC这个数据出口暴露出来。kvStore会消费这个channel,把消费到的结果最终写入到内存中的map[string]stringe里边去。

需要指出的是用户代码在消费commitC的数据之前,还需要处理raftsnapshot数据. 例子中用的是etcd已经实现好的github.com/coreos/etcd/snap这个包来处理的. 在本例中做的事情其实非常简单,snapshot有正反两个相对的操作: 序列化和反序列化. 例子中直接对内存中的map做json.Marshal(s.kvStore)json.Unmarshal(snapshot, &store)

整个流程中真正能让我们感兴趣的应该在4. raft, 5. commitC, 以及5->6这几个部分。从这里开始复杂起来了,我们也不得不一步一步在代码中挖下去.

3 -> 4 就是raftNode调用raftNode.node.Propose方法把数据交给raft去处理。

// etcd/raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error {
    return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

一步一步追踪代码会把我们带到func (n *node) run(r *raft)里边去. 这个方法是一个long running的方法. 所有调用的地方都是go note.run(r). 这是raft最主要的一个大loop。

// etcd/raft/node.go
func (n *node) run(r *raft) {
    //...

    propc = n.propc

    select {
    case m := <-propc:
        m.From = r.id
        r.Step(m)
    }
    //...
}
// etcd/raft/raft.go
func (r *raft) Step(m pb.Message) error {
    //...
    switch m.Type {
        default:
            r.step(r, m)
    }
}

Note: 只是pb.MsgProp类型的pb.Message是这个样子。我们省略了太多细节。因为那些细节和我们目前想看的数据处理流程完全没有关系.

看来看去最终所有的m pb.Message都最终被一个叫r.step的方法给取走了.

这里需要说一下r.step这个方法。r.step有三个不同的实现。stepLeader, stepCandidate, stepFollower. 具体用哪个就要根据当前raft节点的状态来决定了. 如果当前节点是StateLeaderr.step就执行stepLeader. 类似的对应其他两种状态. (看becomeLeader, becomeFollower, ...等几个方法)

stepCandidate我们就不去看了。先看看相对简单的stepFollower吧。因为raft单leader的设计,所有的写操作(Append)都需要由leader来代劳。所以stepFollower可以看到所有的MsgProp(Propose Message)都被直接转发给leader了。多么简单直接.

// etcd/raft/raft.go
func stepFollower(r *raft, m pb.Message) {
    switch m.Type {
        //...
        case pb.MsgProp:
            //...
            m.To = r.lead
            r.send(m)
        //...

其实stepLeader也很简单:

// etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) {
    switch m.Type {
        //...
        case pb.MsgProp:
            r.appendEntry(m.Entries...)
            r.bcastAppend()
            return
        //...
    }
    //...

leader只做了两件事情.

  1. leader 把消息中的Entries Append到自己本地的raftLog里边去了。至于raftLog是什么我们之后再去挖掘。
  2. 接着调用bcastAppend 方法把这一些Entries挨个发给所有的peers. (是的没看错是所有的peers. 这也是为什么我来看etcd和cockroachdb的raft使用的出发点. 我们更期望能够把raft集群分成多个group.也就是cockroachdb的multiraft做的事情.)

当然,bcastAppend并不会无脑的把所有的数据立刻发给所有的peers. etcd在这里实现了一个分批的机制.这样才可以支撑起更大的量。

// raft.bcastAppend -> raft.sendAppend -> raft.send

// etcd/raft/raft.go
func (r *raft)  send(m pb.Message) {
    // ...
    r.msgs = append(r.msgs, m)
    // ...

这个分批机制的核心就是Readystruct. 我们终于来到了一开始不理解的那个type Ready structraft.noderun方法里取走了r.msgs封装到一个Ready里,然后把这个ready发到readyc里。最后用户代码通过Ready() <-chan Ready 把它拿走处理掉。

// etcd/raft/node.go

func (n *node) run(r *raft) {
    // ...
    rd = newReady(r, prevSoftSt, prevHardSt)
    // ...

    case readyc <- rd:
        // ...
        r.msgs = nil
}

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
    rd := Ready{
        Entries:          r.raftLog.unstableEntries(),
        CommittedEntries: r.raftLog.nextEnts(),
        Messages:         r.msgs,
    }
    //...
}

func (n *node) Ready() <-chan Ready { return n.readyc }

到这里为止,4. raft 已经走完了大部分的路。但是我们的数据还没有走到最后的commitC里边去呢。这到底是为什么呢?

现在让我先看看type Ready struct吧. 这个东西太重要了。但是却又不是那么好理解。

type Ready struct {
    *SoftState

    pb.HardState

    ReadStates []ReadState

    Entries []pb.Entry

    Snapshot pb.Snapshot

    CommittedEntries []pb.Entry

    Messages []pb.Message

    MustSync bool
}

SoftState 我们直接忽略就好。

HardState 保存了当前节点最大的Term uint64, 当前节点在这个Term里投票给谁的Vote uint64, 以及当前节点已知的最大的已经被持久化起来的日志位置Commit uint64. 为什么用这三个数字就需要去看raft的基本理论了。

etcd/raft/raftpb/raftpb.go:HardState.Commit -> r.raftLog.committed

// etcd/raft/log.go
type raftLog struct {
    // ...
    // committed is the highest log position that is known to be in
    // stable storage on a quorum of nodes.
    committed uint64
    // ...
}

Messages 这个是节点之间通信的消息。Entries, CommittedEntries等信息一般都是被封装成不同的Messages发送给集群中所有节点的。所以Ready中只有这个东西需要参与网络传出。另外,一般Leader有多少个Follower, 消息就会被自动创建多少个副本,每个副本对应一个Follower(每个副本就是To这个字段不同).

CommittedEntries 简单来讲就是已经commit但是还没有apply到state machine中的日志。然而这里有几个需要澄清的问题。什么是日志(log)? 谁来负责commit? 怎么才算commit?

下面我们来一一回答这几个问题:

  • 什么是日志(log)?

这里说的日志,和我们平时程序中记录程序运行状态写的文本日志不是一个东西。这里说的log其实来源于WAL这个技术. 全称是: Write Ahead Log。简单的理解就是: 我们把一条序列化成二进制形式的的命令/指令叫做一条log entry。 我们把这些一条一条的log entries按顺序写入文件中,这就是WAL最最基本的工作内容之一。其他细节我们不深究了。大家自己google好了。

  • 谁来负责commit?怎么才算commit

按照etcd raft的设计思路,所有的IO操作基本都是交给用户代码去实现的。其中包括: WAL, Snapshot, Transport, Commit等,当然用户的State Machine也需要自己去写代码。 严格来讲commit不仅仅是一个IO操作。commitraft集群层面是一个过程,表示绝大多数节点都把同一条日志写入到了持久化的存储中。这个过程持续进行,节点只保留已知最大的commit。 更为详细的commit过程解析,还是看raft consensus algorithum简介

commit分为两步:

首先, leader把某个command 先保存到本地持久化存储中,发起一个request把这个消息广播下去. follower 收到请求之后,会把请求中带过来的entry也保存到本地持久化存储中,接着发送一个消息说消息已经收到且保存成功. 回复的消息中会附带上term,matchIndex等信息. 当绝大多数follower发回来的这个请求被leader收到之后,leader才会认为当前处理的entry已经被commit了。这是第一步.

接着leadercommitted真正apply到state machine中,并且发送一个新的消息说之前的entry idcommit了,并且附带上prevTerm, prevIndex 这几个信息用在match机制中。follower收到这个消息之后也会开始把之前的entry真正apply到state machine中。这是第二步。

需要注意的是,写状态机的代码:ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)) entriesToApply是获取从lastApply到commit index之间的日志,只有在接收到超过半数的回复才会更新commit index(比如committed idx = 3, 当 idx: 5的日志有半数的节点成功添加了,这时才会将 committed idx 改为 5),这部分Raft论文有详细介绍。因此这里提交是之前已经接收到的、超过半数人同意的日志,而刚通过send发送的,此刻永远不会被提交。

现在我们回到流程的解析。消息进入了r.msgsnewReady取走,用户代码通过消费Ready() <-chan Ready来处理各种消息。之所有消息没有立刻通过commitC暴露给用户的state machine, 就是因为上边我们了解掉的commit的过程。当raft说这个消息已经被commit掉了,它就会以committedEntries身份出现。这时候用户代码需要负责自己把这些committedEntries通过commitC抛给State Machine.

// etcd/contrib/raftexample/raft.go

func (rc *raftNode) serveChannels() {
    // ...
    case rd := <-rc.node.Ready():
        // 保存到持久化的存储中
        rc.wal.Save(rd.HardState, rd.Entries)
        //...

        // 通过commitC告诉给下游的用户代码
        if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                rc.stop()
                return
        }

        // 处理完成了需要主动告诉raft
        rc.node.Advance()
    // ...
}

func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
    for i := range ents {
        switch ents[i].Type {

        // 我们的HTTP PUT应该触发一个`EntryNormal`请求.
        case raftpb.EntryNormal:
            if len(ents[i].Data) == 0 {
                // ignore empty messages
                break
            }
            s := string(ents[i].Data)
            select {
            // 终于看到`commitC`了
            case rc.commitC <- &s:

            case <-rc.stopc:
                return false
            }

        // ...
        }

        // after commit, update appliedIndex
        rc.appliedIndex = ents[i].Index
    }
    // ...
}

到这里写数据的流程主体已经基本完成了。让我们再做一个简单的回顾吧:

  • 用户请求数据(我们序列化成binary的log entry)通过proposeC发送进raft.

leader上, raft通过stepLeader方法把entry 放到自己本地raftLog中,然后分发给所有的follower. 同时一批缓存起来的entry会在r.run中被打包成一个Ready通过readyC暴露给用户代码。接下来用户代码需要做的事情:

  • 保存HardState, Entries到持久化存储中(wal)
  • 处理snapshot
  • 添加Entries到本地raftStorage中 (需要区分raft内部的raftLog)
  • 发送MessagesFollower
  • committedEntries通过commitC暴露给用户用于应用到State Machine
  • 看看当前状态是否需要触发snapshot操作
  • 告诉raft我们处理完了一批Ready

至此,数据的写入流程就算完成了。

总结

对于etcd-raft的整个流程在这里就算解读完成了,是不是感觉很复杂,然而这里也只是冰山一角,还没有深入其raft协议的实现上去,对于snapshot,WAL等等也是一笔带过;后面有时间我们再来续集。

转载请注明出处:http://www.opscoder.info/ectd-raft-example.html

【上一篇】 深入Golang之CGO
【下一篇】 深入Golang之context
其他分类: