0. 序言

在深入学习etcd中raft的源码之前,首先应该学会使用etcd的raft模块。幸运的是,etcd官方提供了一个基于etcd/raft的简单kvstore的实现,该实现在etcd/contrib/raftexample下。raftexample实现了一个基于http的分布式存储服务,下面将其称为raft-kv-http-server。

该项目的根目录下还提供了该分布式kv存储示例的使用方法和基本设计思路,这里建议读者先按照其README.md完整运行一遍示例,再继续接下来的学习。

1. 架构

raft-kv-http-server由三个组件构成:

  1. HTTP Server提供REST API接口
  2. KV Store提供map的键值对存储
  3. RAFT Server提供分布式共识服务

由RAFT支持的KV存储是一个持有所有已提交的键值对的map。该存储建立了RAFT服务器和REST服务器间的通信桥梁。键值对的更新通过该存储提交该RAFT服务器。当RAFT服务器报告有更新提交后,该存储便会更新其map。

REST服务器通过访问由RAFT支持的KV存储的方式暴露出当前RAFT达成的共识。

GET命令会在存储中查找键,如果键存在则会返回该键的值。

一个带键值的PUT命令会想存储中提出一个更新提议。

RAFT服务器和其集群中的对等节点(PEER)会参与共识的达成。

当REST服务器提交提议时,RAFT服务器会将该提议发送给其对等节点。

当RAFT达成共识时,服务器会通过一个提交信道来发布所有已提交的更新。

raft-kv-http-server的设计可以用一张图来表示,但是HTTP Server主要处理KV相关消息,集群配置比变更的消息会直接交给RAFT Server处理。


接下来,我们自下而上地学习raft-kv-http-server的实现。

2. httpapi

httpapi.go是REST服务器的实现,这并不是我们关注的重点,我们需要关注的只是其通过kvstore中的哪些方法来提供服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
switch r.Method {
case http.MethodPut:
val, err := io.ReadAll(r.Body)
// ...
h.store.Propose(key, string(val))
w.WriteHeader(http.StatusNoContent)
case http.MethodGet:
if val, ok := h.store.Lookup(key); ok {
w.Write([]byte(val))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case http.MethodPost:
url, err := io.ReadAll(r.Body)
// ...
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
// ...
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
h.confChangeC <- cc
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
// ...
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}
h.confChangeC <- cc
w.WriteHeader(http.StatusNoContent)
default:
// ...
}
}

上面的代码中,省略的一些对消息的序列化和反序列化及对性能的小优化,我们主要关注其通过哪些方法为请求提供服务。

请求方法 处理方式 功能
PUT kvstore.Propose(k,v) 添加|更新键值对
GET kvstore.Lookup(k) 查找键对应的值
POST confChangeC <- cc 向集群中添加节点
DELETE confChangeC <- cc 从集群中移除节点

从表中我们可以看出,与键值对相关的请求都会通过kvstore提供的方法处理,而有关集群配置的请求则是会编码为etcd/raft/v3/raftpb中proto定义的消息格式,直接传入confChangeC信道。从main.go中可以看出,该信道的消费者是raft模块。

3. kvstore

kvstore是连接RAFT Sever和REST Server的桥梁,是实现键值存储功能的重要组件,但是其实现很简单。

kvstore.gokvstore结构体非常简单,只有四个字段:

1
2
3
4
5
6
7
// a key-value store backed by raft
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
snapshotter *snap.Snapshotter
}

其中,proposeC信道我们将在之后讲解。除此之外,该结构体中只有一个读写锁mu、一个由map实现的键值存储kvStore,和一个etcd提供的默认快照管理模块实现的snapshotter

目前,从kvstore中,我们看不到多少有用的信息。接下来,我们关注一下创建KV存储的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore {
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
snapshot, err := s.loadSnapshot()
if err != nil {
logger.Panicf("[store] %v", err)
}
if snapshot != nil {
logger.Infof("[store] loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
logger.Panicf("[store] %v", err)
}
}
// read commits from raft into kvStore map until error
go s.readCommits(commitC, errorC)
return s
}

newKVStore函数的参数除了snapshotter外,proposeCcommitCerrorC均为信道。其中proposeC为输入信道,commitCerrorC为输出信道。我们可以推断出,kvStore会通过proposeC与raft模块交互,并通过commitCerrorC接收来自raft模块的消息。snapshotter来源于raft模块,启动时raft先夹在wal日志和wal快照,然后kvStore才能加载快照。

newKVStore中的逻辑也非常简单,将传入的参数写入kvStore结构体相应的字段中。先调用loadSnapshot方法从磁盘中导入快照,然后启动一个goroutine调用readCommits方法,来循环处理来自raft模块发送过来的消息。

readCommits方法稍有些复杂,我们先看看其比较简单的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (s *kvstore) Lookup(key string) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.kvStore[key]
return v, ok
}

func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
logger.Panicf("[store] %v", err)
}
s.proposeC <- buf.String()
}

func (s *kvstore) loadSnapshot() (*raftpb.Snapshot, error) {
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return nil, nil
}
if err != nil {
return nil, err
}
return snapshot, nil
}

func (s *kvstore) getSnapshot() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return json.Marshal(s.kvStore)
}

func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {
var store map[string]string
if err := json.Unmarshal(snapshot, &store); err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
return nil
}

Lookup方法会通过读锁来访问其用来记录键值的map,防止查找时数据被修改返回错误的结果。Propose方法将要更新的键值对编码为string,并传入proposeC信道,交给raft模块处理。loadSnapshot方法从快照指针snapshotter中加载快照,getSnapshotrecoverFromSnapshot方法分别将记录键值的map序列化与反序列化,并加锁防止争用。

接下来,我们深入readCommits的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
for commit := range commitC {
if commit == nil {
// signaled to load snapshot
snapshot, err := s.loadSnapshot()
if err != nil {
logger.Panicf("[store] %v", err)
}
if snapshot != nil {
logger.Infof("[store] loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
logger.Panicf("[store] %v", err)
}
}
continue
}

for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {
logger.Fatalf("[store] could not decode message: %v", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
close(commit.applyDoneC)
}
if err, ok := <-errorC; ok {
logger.Panicf("[store] %v", err)
}
}

该方法会循环遍历commitC信道中raft模块传下来的消息。因为raft-kv-http-server功能简单,因此其通过nil表示raft完成重放日志的信号或用来通知kvstore从上一个快照恢复的信号。

commit为nil时,该方法会通过kvstore的快照管理模块snapshotter尝试加载上一个快照;当commit不为nil时,说明这是raft模块发布的已经通过共识提交了的键值对,此时从commitdata中反序列化出键值对,并加锁修改map中的键值对。

我们可以看到,kvstore中基本上没有多少与raft相关的处理逻辑,大部分代码时对键值对存储抽象本身的实现。

4. raftnode

raft模块的实现在raftnode.go文件中,我们将其拆解成四个部分进行解读:

4.1 struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k, v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *commit // entries committed to log (k, v)
errorC chan<- error // errors from raft session

id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
walDir string // path to WAL directory
snapDir string // path to snapshot directory
getSnapshot func() ([]byte, error)

confState raftpb.ConfState
snapshotIndex uint64
appliedIndex uint64

node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL

snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // // signals when snapshotter is ready

snapCount uint64
transport *rafthttp.Transport
stopC chan struct{} // signals proposal channel closed
httpStopC chan struct{} // signals http server to shutdown
httpDoneC chan struct{} // signals http server shutdown complete

logger *zap.Logger
}

在结构体中,有四个用于与其他组件交互的信道:

信道 描述
proposeC <-chan string 接收来自其他组件传入的需要通过raft达成共识的普通提议。
confChangeC <-chan raftpb.ConfChange 接收来自其他组件的需要通过raft达成共识的集群变更提议。
commitC chan<- *string 用来已通过raft达成共识的已提交的提议通知给其他组件的信道。
errorC chan<- error 用来将错误报告给其他组件的信道。

结构体中还有一些用来记录节点信息的字段:

字段 描述
id int 节点id,同样也作为raft回话中的client id。
peer []string 对等raft节点的url。
join string 如果该节点是以加入已有集群的方式启动,那么该值为true,否则是false。
walDir string 预写日志的目录路径。
snapDir string 保存快照的目录路径。
getSnapshot func()([]byte, error) 获取快照的方法签名。
confState raftpb.ConfState 集群配置状态(详见其声明)。
snapshotIndex uint64 快照中的状态下最后一条日志的索引。
appliedIndex uint64 已应用的最后一条日志的索引。

结构体中还保存了etcd/raft提供的接口与其所需的相关组件:

字段 描述
node raft.Node Etc/raft的核心接口,对于一个最简单的实现来说,开发者只需要与该接口打交道即可实现机遇raft的服务。
raftStorage *raftMemoryStorage 用来保存raft状态的接口,etcd/raft/storage.go中定义了etc/raft模块所需的稳定存储接口,并提供了一个实现该接口的内存存储MemoryStorage[^1],raft-kv-http-server使用了该实现。
wal *wal.WAL 预写日志实现,raft-kv-http-server直接使用了etc/wal模块中的实现。
snapshotter *snap.Snapshotter 快照管理器的指针。
snapshotterReady chan *snap.Snapshoptter 一个用来发送snapshotter加载完毕的信号的“一次性”信道。因为snapshotter
snapCount uint64 当WAL中的日志超过该值时,触发快照操作并压缩日志。
transport *rafthttp.Transport etcd/raft模块通信时使用的接口。同样,这里使用了基于http的默认实现。

etcd/raft的Storage接口和Transport接口让用户能够根据需求自定义稳定存储模块和通信模块。使用Storage存储的数据需要被稳定存储,也就是说,即使服务器因断电等问题关机,在服务器重启后也能够恢复到断电前的最终状态。有些读者可能会疑惑,这里的raft-kv-http-server使用的是MemoryStorage,而内存是易失存储,为什么可以当作稳定存储使用?这是因为在raft-kv-http-server的实现中,每次重启时都会通过快照和预写日志恢复MemoryStorage,而快照和预写日志是保存在稳定存储上的。这样,通过快照、预写日志、MemoryStorage的组合,可以实现稳定存储。这样做的好处之一是,预写日志是仅追加(Append-Only)的且快照写入的是连续的空间,这样可以减少对稳定存储的随机写入,提供系统吞吐量。

此外,在结构体中,还有一些通过chan struct{}信道实现的信号量:

信号 描述
stopC 提议信道关闭信号
httpStopC 通知用于raft通信的http服务器关闭的信号
httpDoneC 用于raft通信的http服务器关闭的信号

4.2 startRaft

在创建raftNode时,需要提供节点id、对等节点urlpeers、是否要加入已存在的集群join、获取快照的函数签名getSnapshot、提议信道proposeC、配置变更提议信道confChangeC这些参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error),
proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {

commitC := make(chan *commit)
errorC := make(chan error)

node := &raftNode{
proposeC: proposeC,
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
peers: peers,
join: join,
walDir: fmt.Sprintf("raft-kv-svc-%d", id),
snapDir: fmt.Sprintf("raft-kv-svc-snaoshot%d", id),
getSnapshot: getSnapshot,
confState: raftpb.ConfState{},

snapshotterReady: make(chan *snap.Snapshotter, 1),

snapCount: defaultSnapshotCount,
stopC: make(chan struct{}),
httpStopC: make(chan struct{}),
httpDoneC: make(chan struct{}),
logger: zap.NewExample(),
}

go node.startRaft()
return commitC, errorC, node.snapshotterReady
}

newRaftNode函数中,仅初始化了raftNode的部分参数,其余的参数会在导入预写日志后后配置。随后,该函数启动了一个协程,该写协程调用了raftNodestartRaft()方法来启动raft节点。当前函数会将raft模块用来通知已提交的提议的信道、报错信道、和快照管理器加载完成信号的信道回给调用者。

接下来,我们跟随该方法进入raftNode.startRaft()方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

func (r *raftNode) startRaft() {
if !fileutil.Exist(r.snapDir) {
if err := os.Mkdir(r.snapDir, 0750); err != nil {
logger.Fatalf("[raft] failed to create dir for dnapshot: %v", err)
}
}
r.snapshotter = snap.New(zap.NewExample(), r.snapDir)

oldWal := wal.Exist(r.walDir)
r.wal = r.replayWAL()

// signal replay has finished
r.snapshotterReady <- r.snapshotter

rpeers := make([]raft.Peer, len(r.peers))
for i := range rpeers {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
c := &raft.Config{
ID: uint64(r.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: r.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxUncommittedEntriesSize: 256,
MaxInflightMsgs: 1 << 30,
}

if oldWal || r.join {
r.node = raft.RestartNode(c)
} else {
r.node = raft.StartNode(c, rpeers)
}

r.transport = &rafthttp.Transport{
Logger: r.logger,
TLSInfo: transport.TLSInfo{},
ID: types.ID(r.id),
ClusterID: 0x1000,
Raft: r,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(r.id)),
ErrorC: make(chan error),
}

r.transport.Start()
for i := range r.peers {
if i+1 != r.id {
r.transport.AddPeer(types.ID(i+1), []string{r.peers[i]})
}
}

go r.serveRaft()
go r.serveChannels()
}

startRaft方法虽然看上去很长,但是实现的功能很简单。

首先,startRaft方法检查快照目录是否存在,如果不存在则为其创建目录。然后创建基于该目录的快照管理器。创建完成后,向snapshotterReady信道写入该快照管理器,通知其快照管理器已经创建完成。

接着,程序检查是否有旧的预写日志存在,并加载旧的预写日志,重放预写日志的代码在下文中会进一步分析。

在加载完成后,程序设置了etcd/raft模块所需的配置,并从该配置上启动或者重启节点(取决于有没有旧的预写日志文件)。etcd/raft在的raft.StartNoderaft.RestartNode函数分别会根据配置启动或者重启raft服务器节点,并返回一个Node接口的实例。正如前文中提到的,Node接口是开发者依赖etcd/raft实现时唯一需要与其打交道的接口。程序将Node接口的实例记录在了raftNodenode字段中。

node创建完成后,程序配置并开启了通信模块,开始与集群中的其他raft节点通信。

在一切接续后,程序启动了两个goroutine,分别是raftNode.serveRaft()raftNode.serveChannels。其中raftNode.serveRaft()用来监听其他raft节点的消息,消息的处理主要在Transport接口的实现中编写,因此在这里不对其进行详细的分析,感兴趣的读者可以自行参考源码中的实现;raftNode.serveChannels()用来处理raftNode中各种信道,后文会对其进行详细分析。

下面,我们先来分许重放预写日志的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// replayWAL replays WAL entries into the raft instance.
func (rc *raftNode) replayWAL() *wal.WAL {
log.Printf("replaying WAL of member %d", rc.id)
snapshot := rc.loadSnapshot()
w := rc.openWAL(snapshot)
_, st, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
}
rc.raftStorage.SetHardState(st)

// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)

return w
}

replayWAL方法为raftNode重放其预写日志并返回日志文件。首先该方法会通过raftNode.loadSnapshot()方法加载快照,如果快照不存在该方法会返回nil。接着,通过raftNode.openWAL(snapshot)方法打开预写日志。该方法会根据快照中的日志元数据(这里的元数据与论文中的一样,记录了快照覆盖的最后一个日志条目的index和term)打开相应的预写日志,如果快照不存在,则会打开或者创建一个从初始状态开始的预写日志(当节点第一次启动时,即没有快照文件又没有预写日志文件,此时会为其创建预写日志文件;而节点是重启但重启前没有记录或日志,则会为其打开已有的从初始状态开始的预写日志)。之后,程序将快照应用到raft的存储MemoryStorage中,并将预写日志中记录的硬状态HardState应用到存储中(硬状态是会被持久化的状态,etcd/raft对论文中的实现进行了优化,因此保存的状态稍有不同。)除了快照之外,重放时还需要将预写日志中的日志条目应用到存储中(快照之后的持久化状态)。

4.3 serveRaft

raftnode.serveRaft()是RAFT Server用来接收与其他节点消息的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (r *raftNode) serveRaft() {
url, err := url.Parse(r.peers[r.id-1])
if err != nil {
logger.Fatalf("[raft] failed to parse url: %v", err)
}

listener, err := newStoppableListener(url.Host, r.httpStopC)
if err != nil {
logger.Fatalf("[raft] failed ti listen rafthttp: %v", err)
}

logger.Infof("[raft] http is listeing at %s", url.Host)
err = (&http.Server{Handler: r.transport.Handler()}).Serve(listener)
select {
case <-r.httpStopC:
default:
logger.Fatalf("[raft] failed to serve rafthttp: %v", err)
}
close(r.httpStopC)
}

serveRaft()中创建了一个自定义TCP连接监听器stoppableListener,这个监听器主要通过httpStopC这个信道实现暂停服务,即返回一个服务器暂停的错误,这里不再详细说明源码。然后基于该监听器开启了http服务,接口的逻辑由Transport.Handler()实现:

1
2
3
4
5
6
7
8
9
10
11
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler) // /raft
mux.Handle(RaftStreamPrefix+"/", streamHandler) // /raft/stream/
mux.Handle(RaftSnapshotPrefix, snapHandler) // /raft/snapshot
mux.Handle(ProbingPrefix, probing.NewHandler()) // /raft/probing
return mux
}

pipelineHandlerstreamHandlersnapshotHandler这三者都实现了Handler接口。

  • pipelineHandler读取从其他节点发送过来的请求数据,然后转发给给定的状态机处理。

  • streamHandler用于将连接进行封装,通过心跳来维护长连接。

  • snapshotHandler接收并处理快照消息。

4.4 serveChannels

raftNode.serveChannels()是RAFT Server用来处理各种信道的输入输出的方法,也是与etcd/raft模块中Node接口的实现交互的方法。

serveChannels()方法可以分为两个部分,该方法本身会循环处理raft有关的逻辑,如处理定时器信号驱动Node、处理Node传入的Ready结构体、处理通信模块报告的错误或者停止信号等;该方法还启动了一个goroutine,该goroutine中循环处理来自proposeCconfChangeC两个信道的消息。

在这两部分开始前,该方法做了一些初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (r *raftNode) serveChannels() {
snapshot, err := r.raftStorage.Snapshot()
if err != nil {
logger.Panicf("[raft] %v", err)
}
r.confState = snapshot.Metadata.ConfState
r.snapshotIndex = snapshot.Metadata.Index
r.appliedIndex = snapshot.Metadata.Index

defer r.wal.Close()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

// ...
// ...
}

首先,该方法从当前的快照的元数据设置raftNode的相关字段,并设置一个每100毫秒产生一个信号的循环定时器。serveChannels的循环会根据这个信号调用Node接口的Tick()方法,驱动Node执行。

接下来,我们先来看看serveChannels中启动的用来处理来自proposeCconfChangeC两个信道的消息的goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (r *raftNode) serveChannels() {

// send proposals over raft
go func() {
confChangeCount := uint64(0)

for r.proposeC != nil && r.confChangeC != nil {
select {
case prop, ok := <-r.proposeC:
if !ok {
r.proposeC = nil
} else {
r.node.Propose(context.TODO(), []byte(prop))
}

case cc, ok := <-r.confChangeC:
if !ok {
r.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
r.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not ready
close(r.stopC)
}()

// ...
}

这部分逻辑很简单。因为在循环中,如果proposeCconfChangeC中的一个被关闭,程序会将其置为nil,所以只有二者均不是nil时才执行循环。每次循环会通过select选取一个有消息的信道,通过Node接口提交给raft服务器。当循环结束时,关闭stopC信道,即发送关闭信号。

serveChannels()中的循环是与Node接口交互的重要逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (r *raftNode) serveChannels() {
// ...

// event loop on raft state machine updates
for {
select {
case <-ticker.C:
r.node.Tick()

// store raft entries to wal, then publish over commit channel
case rd := <-r.node.Ready():
// must save the snapshot file and WAL snapshot entry before saving any other entries
// or hard state to ensure that recovery after a snapshot restore is possible.

if !raft.IsEmptySnap(rd.Snapshot) {
r.saveSnap(rd.Snapshot)
}
r.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.publishSnapshot(rd.Snapshot)
}
r.raftStorage.Append(rd.Entries)
r.transport.Send(r.processMessages(rd.Messages))
applyDoneC, ok := r.publishEntries(r.entriesToApply(rd.CommittedEntries))
if !ok {
r.stop()
return
}
r.maybeTriggerSnapshot(applyDoneC)
r.node.Advance()

case err = <-r.transport.ErrorC:
r.writeError(err)
return

case <-r.stopC:
r.stop()
return
}
}
}

该循环同时监听4个信道:

  1. 循环定时器的信道,每次收到信号后,调用Node接口的Tick函数驱动Node
  2. Node.Ready()返回的信道,每当Node准备好一批数据后,会将数据通过该信道发布。开发者需要对该信道收到的Ready结构体中的各字段进行处理。在处理完成一批数据后,开发者还需要调用Node.Advance()告知Node这批数据已处理完成,可以继续传入下一批数据。
  3. 通信模块报错信道,收到来自该信道的错误后raftNode会继续上报该错误,并关闭节点。
  4. 用来表示停止信号的信道,当该信道被关闭时,阻塞的逻辑会从该分支运行,关闭节点。

其中,Node.Ready()返回的信道逻辑最为复杂。因为其需要处理raft状态及传入的各种数据,并交付给相应的模块处理。etcd/raft的Ready结构体中包含如下数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState

// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState

// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState

// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry

// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot

// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry

// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message

// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
字段 描述
*SoftState 当前节点的软状态(易失状态)。如果该状态没有任何更新则该字段为nil。软状态不需要被处理或存储,仅当用户需要其中信息时才需要使用该字段。
pb.HardState raft的硬状态,在节点向其他节点发送消息前,需要先存储硬状态。同样,如果其没有任何更新,该字段为一个空的硬状态。
ReadStates []ReadState 与线性一致性读(lineraizable read)相关状态,raft-kv-http-server中没有相关的处理逻辑,在后续的对etcd/raft分析的文章中,会详细介绍这一字段。
Entries []pb.Entry 需要保存到稳定存储的日志条目,其需要在向其他节点发送消息前存储。
Snapshot pb.Snapshot 需要被保存到稳定存储的快照。
CommittedEntries []pb.Entry 已通过raft算法提交的日志条目,开发者需要将这些条目应用到自己的状态机中(在raft-kv-http-server中即为kvstore)。这些条目在之前已经被应用到了稳定存储中。
Messages []pb.Message 需要发送给其他节点的消息。在发送这些消息前,需要先将HardStateEntries保存到稳定存储中。如果这些消息中有MsgSnap消息(用于传输快照的消息),开发者必须在节点收到快照会这接收快照失败后通过调用ReportSnapshot方法通知Node。(因为leader向某follower发送快照时会暂停向该folower发送raft日志的操作,因此其需要报告快照发送完成或失败以让leader继续对其进行操作。)
MustSync bool 该字段表示HardStateEntries是否必须同步写入磁盘,如果该字段为false,则可以异步写入。

Ready结构体中各个字段的注释已经很好地说明了其处理方式,这很有助于我们理解raft-kv-http-server中对Ready信道对处理方式:

  1. HardStateEntries写入预写日志,将其保存在稳定存储上;
  2. 如果有快照,先将快照保存在稳定存储中,然后应用快照,最后通过commitC写入nil值通知kvstore加载快照;
  3. Entries追加到MemoryStorage中(第一步仅写入到了预写日志中);
  4. 通过信道模块将Messages中的消息分发给其它raft节点;
  5. 通过publishEntries方法发布新增的日志条目;
  6. 通过maybeTriggerSnapshot方法检查memoryStorage中日志条目长度,如果超过设定的最大长度,则触发快照机制并压缩日志;

虽然看上去步骤较多,但是处理逻辑都很简单。这里我们仅看一下第5步的逻辑。

在第5步中,首先通过entriesToApply方法,从Ready结构体的Entries字段中找到还没有应用到本地状态机中的日志起点即后续日志条目,然后通过publishEntries方法发布这些日志条目。publishEntries方法实现方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// publishEntries writes committed log entries to commit channel and
// returns all entries could be published.
func (r *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) {
if len(ents) == 0 {
return nil, true
}

data := make([]string, 0, len(ents))
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
s := string(ents[i].Data)
data = append(data, s)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data)
r.confState = *r.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if len(cc.Context) > 0 {
r.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
logger.Infof("[raft] node <%d> is added to the cluster", r.id)
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(r.id) {
logger.Infof("[raft] node <%d> is removed from the cluster. shutting down.", r.id)
return nil, false
}
r.transport.RemovePeer(types.ID(cc.NodeID))
}
}
}

var applyDoneC chan struct{}

if len(data) > 0 {
applyDoneC = make(chan struct{}, 1)
select {
case r.commitC <- &commit{data, applyDoneC}:
case <-r.stopC:
return nil, false
}
}

// after commit, update appliedIndex
r.appliedIndex = ents[len(ents)-1].Index

return applyDoneC, true
}

publishEntries会遍历传入的日志列表,对于普通的日志条目,先将其反序列化通过commitC信道传给kvstore处理;对于用于变更集群配置的日志,则根据变更的内容(如增加或删除集群中的某个节点),修改通信模块中的相关记录。然后修改appliedIndex为当前日志的Index。除此之外,publishEntries还判断了日志Index是否为前文中提到的lastIndex。如果当前Index等于lastIndex,则说明之前的操作是在重放日志,且此时日志重放完成,因此需要向commitC信道写入nil以通知kvstore日志重放完成。

5. 总结

至此,我们已经分析了raftexample大部分的主要逻辑。在main.go中有raftexample中各模块的创建于初始化逻辑、raftnode.go中还有一些如关闭服务器的逻辑。

raftexample是官方提供的使用了etcd/raft的最基本的功能的简单的kv存储的示例。通过分析学习这段代码,可以简单了解etcd/raft的基本使用方式。当然,etcd/raft还提供了很多高级功能,我们会在后续的文章中介绍与分析。