Etcd-Raft Source Code Analysis
Etcd是Go语言实现的基于Raft协议的kv存储,是目前市面上比较成熟的Raft实现。代码层面上,Etcd-Raft模块的封装跟逻辑都是非常优秀的。这里希望学习Etcd 能够更加深入的了解Raft。
Etcd-Raft 模块是一个独立的模块,它按照Raft论文实现了各种协议类型的处理,但对于Entries如何落盘,如何传输等问题都没有涉及。Etcd默认这些逻辑的实现由调Raft模块的调用者来实现。所以,Etcd在Raft模块之上实现了RaftNode,并给出了一个RaftExample。RaftNode封装了一系列的管道跟Etcd-Raft通信,封装了WAL日志管理的接口,快照的接口。RaftExample 又集成了RaftNode模块,Http服务端模块,kvstore模块形成了一个可以对外提供服务的完整实现。RaftExample的结构如下图所示。
这里会着重于从主要从代码层面来学习Etcd-Raft的实现。关于HttpServer,RaftNode等模块的实现读者可以自己学习一下Etcd的代码,这里不做过多介绍。
0, Etcd-Raft
这里默认读者比较熟悉Raft算法的原理,一些基本的Raft概念可以参考RAFT论文 Leader Election & Log Replication。整个的Etcd-Raft的实现是基本上没有锁的,也就是说只有一个线程在操作这个模块,为了保证效率,Etcd-Raft内部是没有磁盘的操作的,尤其是对于Entry的操作,Etcd-Raft完全是纯内存的操作。下面来看一下Etcd-Raft关于Entry的主要数据结构。
Entry的操作分为两个数据结构,分别是MemoryStorage和Unstable,为了实现Entry的快速访问和修改,Etcd会将没有compact到snapshot的数据保留一份在内存当中,这部分数据分别存在MemstoryStoreage跟unstable中。
MemoryStorage继承于Storage,提供一份可以快速访问的盘上数据的内存拷贝。个人理解应该只是落盘的数据中没有compact到snapshot的entry的内存拷贝,并不是一份完整的库的拷贝。上层模块的逻辑会定时的compact,以至于这个MemoryStorage不会特别大。
需要说明的是这两个数据结构都用到了Term 和Index,对于Term跟Index熟悉Raft的读者肯定不会陌生,这里Etcd-Raft的Term是任期的概念,Index是entries的全局唯一编号,是自增的正整数。下面分别介绍这两个数据结构。
1.1, Storage
Storage 提供了以下接口:
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return
// ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}
具体关注Append方法:
// Append the new entries to storage.
// TODO (xiangli): ensure the entries are continuous and
// entries[0].Index > ms.entries[0].Index
func (ms *MemoryStorage) Append(entries []pb.Entry) error {
if len(entries) == 0 {
return nil
}
ms.Lock()
defer ms.Unlock()
first := ms.firstIndex()
last := entries[0].Index + uint64(len(entries)) - 1
// shortcut if there is no new entry.
if last < first {
return nil
}
// truncate compacted entries
if first > entries[0].Index {
entries = entries[first-entries[0].Index:]
}
offset := entries[0].Index - ms.ents[0].Index
switch {
case uint64(len(ms.ents)) > offset:
ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
ms.ents = append(ms.ents, entries...)
case uint64(len(ms.ents)) == offset:
ms.ents = append(ms.ents, entries...)
default:
raftLogger.Panicf("missing log entry [last: %d, append at: %d]",
ms.lastIndex(), entries[0].Index)
}
return nil
1,如果需要append的entries,有一部分在compact内部,则这部分entries不需要再append
2,计算待append的entries和MemoryStorage的entries的差值,得出offset
3,根据offset进行append
对于Append的两种方式如下图所示:
1.2, Unstable
Unstable 对于主维护了客户端请求对应的Entry记录,对于从来说维护了从主复制来的Entry记录。
// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
// record error msg
logger Logger
}
具体关注Append方法:
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
after := ents[0].Index
switch {
case after == u.offset+uint64(len(u.entries)):
// after is the next index in the u.entries
// directly append
u.entries = append(u.entries, ents...)
case after <= u.offset:
u.logger.Infof("replace the unstable entries from index %d", after)
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
u.offset = after
u.entries = ents
default:
// truncate to after and copy to u.entries
// then append
u.logger.Infof("truncate the unstable entries before index %d", after)
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...)
}
}
1,获得待添加的entries的起始位置
2,分情况append entries
对于Unstable的Append方式如下图所示:
1.3, RaftLog
RafLogt是SnapShot + MemoryStorage + Unstable的集合,也是真正Raft协议模块调用的模块,他们的组合逻辑如下图所示。
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
logger Logger
// maxNextEntsSize is the maximum number aggregate
// byte size of the messages
// returned from calls to nextEnts.
maxNextEntsSize uint64
}
重点关注raftLog的Append操作:
// maybeAppend returns (0, false) if the entries cannot be appended.
// Otherwise, it returns (last index of new entries, true).
func (l *raftLog)
maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry)
(lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry
[committed(%d)]", ci, l.committed)
default:
offset := index + 1
l.append(ents[ci-offset:]...)
}
// AZ: follower behavior
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]",
after, l.committed)
}
// AZ: append in unstable only
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
1,查看输入参数index和logTerm跟raftLog下的Entry是否一致
2,找到待添加的entries与raftLog 冲突的位置
3,按照冲突位置进行合并
4,append到unstable当中
Append如下图所示:
// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The first entry MUST have an index equal to the argument 'from'.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
// AZ: check every ents in raftLog, if theie term match
for _, ne := range ents {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
l.logger.Infof("found conflict")
}
return ne.Index
}
}
return 0
}
1,遍历待添加的Entry,寻找第一个待添加的Entry 与raftLog 不一致的位置。
以上就是RaftLog Append的实现方法,主要是调用了Unstable的append方法进行追加。介绍完了RaftLog 之后,接下来具体介绍Etcd当中Raft部分的逻辑是如何实现的。
2, Etcd-Raft 算法实现
type raft struct {
id uint64
Term uint64
Vote uint64
prs tracker.ProgressTracker
state StateType
// AZ: step 是一个函数指针,具体的实现由其角色决定
//(stepLeader stepCandidate stepFollower)
step stepFunc
}
对于Raft算法的主要实现是在Raft模块内部实现的。整个Raft模块可以看作是一个消息处理机,处理上层模块的各种消息,包括内部消息和来自follower,leader的外部消息。Etcd-Raft实现的消息类型非常之多,本次就先关注Entries复制相关的消息类型。Raft的消息处理入口函数如下所示。
func (r *raft) Step(m pb.Message) error {
// Handle the message term
// which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
// If a server receives a RequestVote request within the minimum
// election timeout of hearing from a current leader, it does not
// update its term or grant its vote
switch {
// Process Prevote Logic
...
default:
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat
|| m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
if (r.checkQuorum) && (m.Type == pb.MsgHeartbeat ||m.Type == pb.MsgApp) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp}) // AZ: with term 0
} else {
// ignore other cases
}
return nil
}
switch m.Type{
// process pb.MsgHup and pb.MsgVote, pb.MsgPreVote
default:
err := r.step(r, m)
if err != nil {
return err
}
}
}
}
Step是Raft模块消息处理的入口参数,根据不同的Message类型进行相应的处理。
1,收到消息Term比当前Term大,则自己变为Follower
2,收到消息Term比当前Term小,则发送MsgAppResp消息
3,通过step挂载的函数(小写的step)对于不同的消息进行下一步的处理
2.1, Leader处理客户端请求
如果节点角色是Leader,step(小写的)挂载stepLeader函数。收到客户端的请求之后,上层模块会封装成MsgProp消息传到step函数当中。处理流程如下:
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
// AZ: check if there is config change pending. If there is one pending,
// drop this proposal.
// AZ: go over m.Entries, to check is there is config change request.
// Merge config changes to be one config change request.
r.appendEntry(m.Entries...)
r.bcastAppend()
}
通过step指针链接到了stepLeader函数内部
1,如果是config change的请求,做特殊处理。
2,调用appendEntry
3,将Entries同步到从
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
}
1,为pb当中的entry添加term 和 index
2,更新uncommitted 数据大小
3,调用raftLog的append
4,调用MaybeUpdate 和maybeCommit更新match index,next index 和 committed index
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
r.sendAppend(id)
})
}
这里的MsgApp消息等同于Raft论文中的AppendEntriesRPC。
Send MsgApp(Append Entries)
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
// send snapshot if we failed to get term or entries
if errt != nil || erre != nil {
pr.BecomeSnapshot(sindex)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1 // AZ: last_index
m.LogTerm = term // AZ: last_term
m.Entries = ents
m.Commit = r.raftLog.committed
}
r.send(m)
}
2.2, 处理MsgApp消息:
Recv MsgApp && Send MsgAppResp
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
...
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal")
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
...
}
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message
{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend
(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
// AZ:maybeAppend failed, last_index last_term not match
r.send(pb.Message
{To: m.From, Type: pb.MsgAppResp, Index: m.Index,
Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
stepFollower
1,如果是客户端请求,转发到对应的主上。
2,如果是主发来的MsgApp
2.1,如果index 小于 committed 返回 当前的committed index
2.2 调用raftLog maybeAppend
a) 如果成功,返回当前log的last_index
b) 如果失败,发送当前log的last_index
Recv MsgAppRecv
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {
if pr.MaybeDecrTo(m.Index, m.RejectHint) {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
}
} else {
// AZ: progress update match and next
if pr.MaybeUpdate(m.Index) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot
&& pr.Match >= pr.PendingSnapshot:
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.BecomeProbe()
pr.BecomeReplicate()
case pr.State == tracker.StateReplicate:
pr.Inflights.FreeLE(m.Index)
}
// AZ: Update leader committed id
if r.maybeCommit() {
// AZ: if update leader commit success, broadcast to peers
r.bcastAppend()
}
// AZ: send next round of MsgApp
r.maybeSendAppend(m.From, false)
}
}
}
}
func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
if pr.State == StateReplicate {
// The rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
return false
}
// Directly decrease next to match + 1.
pr.Next = pr.Match + 1
return true
}
return true
}
stepLeader
1,如果Reject,尝试调用MaybeDecrTo回退。MaybeDecrTo(m.Index, m.RejectHint),m.Index 为发送报文的last_index, m.RejectHint 为从的last_index
2,如果接受,更新对应progress中的match 和 next
3,状态转换
4,更新自己的committed id
5,继续发送entries
在Etcd-Raft的Entries复制机制中,提供了StateProbe这样一个状态。对于刚开始日志复制的时候,需要协商主从entries一致的位置,这一状态Etcd-Raft称之为StateProbe。StateProbe阶段,每次MsgApp消息只携带一条entry,直到主从建立了一致的同步点,之后会进入StateReplicate阶段用batch的方式发送Entries。
3, 总结
Etcd-Raft的优化非常之多,本文只是对Etcd-Raft的源码做一个大概的了解。更多的相关逻辑,读者可以自行阅读源码。