读完raft的论文,大致对于一些细节有所了解,整体的实现也有所了解,总结起来,raft是一个共识协议,保证了一个集群内log的一致性。当log一致,则把log apply到state machine后所得到的最终结果也是一致的。当然至于其它的一致性,比如数据读写的一致性,还需要别的协议来保证,比如以下的一种情况是自己一向认为是正确的,然而却是错误的:
以前一致认为,假设有5节点的集群,假设一份数据写入了三个节点,则该份数据也就被提交了,所以要去读这个数据的话,也只需要从5个节点中读到了三份最新的,则该数据就是被提交了的。然而这却是错误的,有一种场景,比如在某个term中,节点的数据只复制到了其余一个节点,然后该节点崩溃了,然后其余没有复制到数据的当选为leader,在term+1中提交了一份数据,这份数据还没开始等到复制到其余节点的时候,该节点也挂了。当原先的leader节点成功成为leader后,继续复制数据至第三个节点,这样term中被提交的数据已经被复制成大多数了,然而假设这个时候这个leader挂了,term+1中提交数据的leader又可以成为leader,此时被复制了3份的数据其实也是要被truncate掉的。
所以raft规定了只有当前任期中的提交被大多数节点所接受的时候,该commit以及之前的commit才确定已经被提交,不会被覆盖了,所以在实践的实现中,当一个leader当选后,需要发一个当前term的no op来保证即使当前任期中没有提交,也要尽量的把no op之前的数据确认commit。
etcd中的raft实现属于工程实践中比较可靠的一种实现,它的实现刨除了传输、log等的实现,所以没有接触过的人接触该库的时候是有点困难的。但是困难归困难,总是可以学到不少东西的,下面开始以raft example为例,开始慢慢的解读etcd中raft库的实现以及用法。
在raft中,做一次提交,则就是向raft协议中提交一次propose,该propose需要得到大多数节点的认同才能确定提交,然后更新commitIndex,之后可以异步的更新appliedIndex。
在该例子中,假设是单节点,我们提交了一个propose,最终会被投入到raft协议处理中,对应的接口为:
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
我们可以看到注释,这个propose会一直堵塞知道raft状态机接受了这个propose。我们来进一步看看Propose的实现:
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
...
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
ch := n.propc
pm := msgWithResult{m: m}
if wait {
pm.result = make(chan error, 1)
}
select {
case ch <- pm:
if !wait {
return nil
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
select {
case rsp := <-pm.result:
if rsp != nil {
return rsp
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
return nil
}
这段的实现,主要就是生成一个msgWithResult,并且将m写入propc这个chan中,然后等待回应。我们来看一下关于propc的处理代码:
func (n *node) run(r *raft) {
...
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
...
}
这里只是把核心的部分给拿了出来,我们可以看出实现主要就是去除之前塞入的m,然后进行处理,得到结果后把消息放入result chan来解除Propose的堵塞。raft这个状态机的推进,主要是在Step函数的实现中,我们跳过其余的处理,最终的处理是走到了raft.step函数中,该函数是一个函数指针,通过不同的角色来实现不同的逻辑,在这里我们是leader,我们来看一下stepLeader的实现:
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if _, ok := r.prs[r.id]; !ok {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return ErrProposalDropped
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
}
这里只是调出最核心的实现,在这个实现中,我们可以整理出一些处理的流程:
我们首先看一下追加进自身的日志是怎么处理的,这里首先得看一下raft协议层对于log的定义:
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
logger Logger
// maxNextEntsSize is the maximum number aggregate byte size of the messages
// returned from calls to nextEnts.
maxNextEntsSize uint64
}
etcd的raft实现,将日志进行抽象,分为稳定的与不稳定的日志,分别代表已经被确认提交的日志和还未确认被提交的日志。
接下来,我们来看一下appendEntry的实现:
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
r.logger.Debugf(
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
r.id,
)
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
}
步骤比较清晰:
接下来主要就是看一下maybeCommit的实现了:
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(r.matchBuf) < len(r.prs) {
r.matchBuf = make(uint64Slice, len(r.prs))
}
mis := r.matchBuf[:len(r.prs)]
idx := 0
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
sort.Sort(mis)
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term)
}
这段代码的主要作用是将大部分节点的最小的matchIndex做commit操作。只要commit大于raft log的committed值,则commitIndex将会向前进行推进。由于我们当前是单节点环境,在我们把日志追加进不稳定日志的时候,已经更新了本节点的进度了,则maybeCommit就会更新本节点的commitIndex;不过当是大于1个节点的集群的情况,这里是不会被提交的,这里只是更新了本节点的进度,其余节点的进度是要等到复制完成才会被提交的。
在本节点的处理完毕后,则会把该消息做一次广播。我们来看一下广播的实现:
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
r.forEachProgress(func(id uint64, _ *Progress) {
if id == r.id {
return
}
r.sendAppend(id)
})
}
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}
剩下的发送代码主要在func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool
中,代码比较长,它的主要功能是将未复制的日志复制到目标节点,我们梳理一下看看。
那我们如何从使用者的角度知道,raft协议层需要让我们把一些消息发往Follower呢?raft协议层和使用者之间的通讯,主要是通过Ready()这个函数,我们在最后再看一下raft协议层最核心的驱动函数,这里看看我们要怎么处理Ready()返回给我们的事件。首先我们来看看Ready的定义:
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
我们再看一下Ready的初始化函数:
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
}
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
return rd
}
然后我们就能分析一下Ready中各个成员变量的含义了:
我们在收到了Ready消息之后,只需要处理其中的变量,对对应的数据进行记录存储以及发送,将raft协议层与逻辑层相隔离。