读完raft的论文,大致对于一些细节有所了解,整体的实现也有所了解,总结起来,raft是一个共识协议,保证了一个集群内log的一致性。当log一致,则把log apply到state machine后所得到的最终结果也是一致的。当然至于其它的一致性,比如数据读写的一致性,还需要别的协议来保证,比如以下的一种情况是自己一向认为是正确的,然而却是错误的:

以前一致认为,假设有5节点的集群,假设一份数据写入了三个节点,则该份数据也就被提交了,所以要去读这个数据的话,也只需要从5个节点中读到了三份最新的,则该数据就是被提交了的。然而这却是错误的,有一种场景,比如在某个term中,节点的数据只复制到了其余一个节点,然后该节点崩溃了,然后其余没有复制到数据的当选为leader,在term+1中提交了一份数据,这份数据还没开始等到复制到其余节点的时候,该节点也挂了。当原先的leader节点成功成为leader后,继续复制数据至第三个节点,这样term中被提交的数据已经被复制成大多数了,然而假设这个时候这个leader挂了,term+1中提交数据的leader又可以成为leader,此时被复制了3份的数据其实也是要被truncate掉的。

所以raft规定了只有当前任期中的提交被大多数节点所接受的时候,该commit以及之前的commit才确定已经被提交,不会被覆盖了,所以在实践的实现中,当一个leader当选后,需要发一个当前term的no op来保证即使当前任期中没有提交,也要尽量的把no op之前的数据确认commit。

etcd中的raft实现属于工程实践中比较可靠的一种实现,它的实现刨除了传输、log等的实现,所以没有接触过的人接触该库的时候是有点困难的。但是困难归困难,总是可以学到不少东西的,下面开始以raft example为例,开始慢慢的解读etcd中raft库的实现以及用法。

Propose的提交

在raft中,做一次提交,则就是向raft协议中提交一次propose,该propose需要得到大多数节点的认同才能确定提交,然后更新commitIndex,之后可以异步的更新appliedIndex。

在该例子中,假设是单节点,我们提交了一个propose,最终会被投入到raft协议处理中,对应的接口为:

// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))

我们可以看到注释,这个propose会一直堵塞知道raft状态机接受了这个propose。我们来进一步看看Propose的实现:

func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

...

func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
	ch := n.propc
	pm := msgWithResult{m: m}
	if wait {
		pm.result = make(chan error, 1)
	}
	select {
	case ch <- pm:
		if !wait {
			return nil
		}
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
	select {
	case rsp := <-pm.result:
		if rsp != nil {
			return rsp
		}
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
	return nil
}

这段的实现,主要就是生成一个msgWithResult,并且将m写入propc这个chan中,然后等待回应。我们来看一下关于propc的处理代码:

func (n *node) run(r *raft) {
	...
	select {
		// TODO: maybe buffer the config propose if there exists one (the way
		// described in raft dissertation)
		// Currently it is dropped in Step silently.
		case pm := <-propc:
			m := pm.m
			m.From = r.id
			err := r.Step(m)
			if pm.result != nil {
				pm.result <- err
				close(pm.result)
			}
			...
}

这里只是把核心的部分给拿了出来,我们可以看出实现主要就是去除之前塞入的m,然后进行处理,得到结果后把消息放入result chan来解除Propose的堵塞。raft这个状态机的推进,主要是在Step函数的实现中,我们跳过其余的处理,最终的处理是走到了raft.step函数中,该函数是一个函数指针,通过不同的角色来实现不同的逻辑,在这里我们是leader,我们来看一下stepLeader的实现:

func stepLeader(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgProp:
		if len(m.Entries) == 0 {
			r.logger.Panicf("%x stepped empty MsgProp", r.id)
		}
		if _, ok := r.prs[r.id]; !ok {
			// If we are not currently a member of the range (i.e. this node
			// was removed from the configuration while serving as leader),
			// drop any new proposals.
			return ErrProposalDropped
		}
		if r.leadTransferee != None {
			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
			return ErrProposalDropped
		}

		for i, e := range m.Entries {
			if e.Type == pb.EntryConfChange {
				if r.pendingConfIndex > r.raftLog.applied {
					r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
						e.String(), r.pendingConfIndex, r.raftLog.applied)
					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
				} else {
					r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
				}
			}
		}

		if !r.appendEntry(m.Entries...) {
			return ErrProposalDropped
		}
		r.bcastAppend()
		return nil
}

这里只是调出最核心的实现,在这个实现中,我们可以整理出一些处理的流程:

  1. 检查自己的进度,这个进度也就是raft实现中leader保存了一些状态信息,也就是matchIndex和nextIndex,分别表示了Follower复制的进度和要发向Follower的下一个发送log位置
  2. 追加进自身的日志
  3. 给所有未追上日志的Follower节点广播追加日志消息

我们首先看一下追加进自身的日志是怎么处理的,这里首先得看一下raft协议层对于log的定义:

type raftLog struct {
	// storage contains all stable entries since the last snapshot.
	storage Storage

	// unstable contains all unstable entries and snapshot.
	// they will be saved into storage.
	unstable unstable

	// committed is the highest log position that is known to be in
	// stable storage on a quorum of nodes.
	committed uint64
	// applied is the highest log position that the application has
	// been instructed to apply to its state machine.
	// Invariant: applied <= committed
	applied uint64

	logger Logger

	// maxNextEntsSize is the maximum number aggregate byte size of the messages
	// returned from calls to nextEnts.
	maxNextEntsSize uint64
}
  1. storage。该存储是一个稳定存储,也就是需要落盘,包含着从最新的snapshot点后所有的已确认提交的日志
  2. unstable。不稳定的存储,包含正在接收的快照以及还未确认提交的日志。这里注释里也特别说明了,假设日志确认被提交了,那么不稳定的log也会被提交入storage。
  3. committed。已被提交的日志中最高的日志索引
  4. applied。被提交的日志中已经被应用至状态机的最高的日志索引,它一直是小于等于committed的

etcd的raft实现,将日志进行抽象,分为稳定的与不稳定的日志,分别代表已经被确认提交的日志和还未确认被提交的日志。

接下来,我们来看一下appendEntry的实现:

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
	li := r.raftLog.lastIndex()
	for i := range es {
		es[i].Term = r.Term
		es[i].Index = li + 1 + uint64(i)
	}
	// Track the size of this uncommitted proposal.
	if !r.increaseUncommittedSize(es) {
		r.logger.Debugf(
			"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
			r.id,
		)
		// Drop the proposal.
		return false
	}
	// use latest "last" index after truncate/append
	li = r.raftLog.append(es...)
	r.getProgress(r.id).maybeUpdate(li)
	// Regardless of maybeCommit's return, our caller will call bcastAppend.
	r.maybeCommit()
	return true
}

步骤比较清晰:

  1. 取出log中最新的日志索引号和,它会依次从不稳定和稳定的存储中去获取,然后将自身的任期号和相应的索引进行赋值
  2. 检测未提交的log是否超过预设的尺寸限制了
  3. 将日志追加进raft的不稳定日志中,同时更新自身的进度,也就是本身的matchIndex和nextIndex都会增加
  4. 试着去推进提交索引,假设成功,则调用者会调用bcastAppend来广播追加消息

接下来主要就是看一下maybeCommit的实现了:

// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
	// Preserving matchBuf across calls is an optimization
	// used to avoid allocating a new slice on each call.
	if cap(r.matchBuf) < len(r.prs) {
		r.matchBuf = make(uint64Slice, len(r.prs))
	}
	mis := r.matchBuf[:len(r.prs)]
	idx := 0
	for _, p := range r.prs {
		mis[idx] = p.Match
		idx++
	}
	sort.Sort(mis)
	mci := mis[len(mis)-r.quorum()]
	return r.raftLog.maybeCommit(mci, r.Term)
}

这段代码的主要作用是将大部分节点的最小的matchIndex做commit操作。只要commit大于raft log的committed值,则commitIndex将会向前进行推进。由于我们当前是单节点环境,在我们把日志追加进不稳定日志的时候,已经更新了本节点的进度了,则maybeCommit就会更新本节点的commitIndex;不过当是大于1个节点的集群的情况,这里是不会被提交的,这里只是更新了本节点的进度,其余节点的进度是要等到复制完成才会被提交的。

在本节点的处理完毕后,则会把该消息做一次广播。我们来看一下广播的实现:

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
	r.forEachProgress(func(id uint64, _ *Progress) {
		if id == r.id {
			return
		}

		r.sendAppend(id)
	})
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
	r.maybeSendAppend(to, true)
}

剩下的发送代码主要在func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool中,代码比较长,它的主要功能是将未复制的日志复制到目标节点,我们梳理一下看看。

  1. 获取目标节点的进度
  2. 假设无法获取到日志的任期号和索引号,则发送快照,对应Follower的状态转为快照发送状态
  3. 否则批量发送一批数据,这批数据必须要带上前一个log的任期和索引,为何需要,请看论文
  4. 乐观的更新对应Follower的进度信息,也就是把nextIndex做对应的更新,同时增加inflight的值
  5. 将对应发送的消息放入raft实现的msgs中,供使用者发送

那我们如何从使用者的角度知道,raft协议层需要让我们把一些消息发往Follower呢?raft协议层和使用者之间的通讯,主要是通过Ready()这个函数,我们在最后再看一下raft协议层最核心的驱动函数,这里看看我们要怎么处理Ready()返回给我们的事件。首先我们来看看Ready的定义:

// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}

我们再看一下Ready的初始化函数:

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	if softSt := r.softState(); !softSt.equal(prevSoftSt) {
		rd.SoftState = softSt
	}
	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
		rd.HardState = hardSt
	}
	if r.raftLog.unstable.snapshot != nil {
		rd.Snapshot = *r.raftLog.unstable.snapshot
	}
	if len(r.readStates) != 0 {
		rd.ReadStates = r.readStates
	}
	rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
	return rd
}

然后我们就能分析一下Ready中各个成员变量的含义了:

  1. SoftState。表示的是对应的各个节点的可变状态,与raft论文中相对应。
  2. HardState。表示的是当前节点的Term/Vote/Commit信息,这些需要被持久化。
  3. ReadStates。主要用于实现线性一致性读,这里先不去深究。
  4. Entries。需要持久化保存的日志,主要是raft协议层中不稳定的日志,也就是没有提交的日志。
  5. Snapshot。需要持久化保存的快照。
  6. CommittedEntries。被提交的日志,但是还没有提交到状态机。
  7. Messages。在Entries被持久化入日志后,需要被发送到Follower节点的消息

我们在收到了Ready消息之后,只需要处理其中的变量,对对应的数据进行记录存储以及发送,将raft协议层与逻辑层相隔离。

共 0 条回复
暂时没有人回复哦,赶紧抢沙发
发表新回复

作者

sryan
today is a good day