etcd-1-raftexample
0. 序言
在深入学习etcd中raft的源码之前,首先应该学会使用etcd的raft模块。幸运的是,etcd官方提供了一个基于etcd/raft的简单kvstore的实现,该实现在etcd/contrib/raftexample下。raftexample实现了一个基于http的分布式存储服务,下面将其称为raft-kv-http-server。
该项目的根目录下还提供了该分布式kv存储示例的使用方法和基本设计思路,这里建议读者先按照其README.md
完整运行一遍示例,再继续接下来的学习。
1. 架构
raft-kv-http-server由三个组件构成:
- HTTP Server提供REST API接口
- KV Store提供map的键值对存储
- RAFT Server提供分布式共识服务
由RAFT支持的KV存储是一个持有所有已提交的键值对的map。该存储建立了RAFT服务器和REST服务器间的通信桥梁。键值对的更新通过该存储提交该RAFT服务器。当RAFT服务器报告有更新提交后,该存储便会更新其map。
REST服务器通过访问由RAFT支持的KV存储的方式暴露出当前RAFT达成的共识。
GET命令会在存储中查找键,如果键存在则会返回该键的值。
一个带键值的PUT命令会想存储中提出一个更新提议。
RAFT服务器和其集群中的对等节点(PEER)会参与共识的达成。
当REST服务器提交提议时,RAFT服务器会将该提议发送给其对等节点。
当RAFT达成共识时,服务器会通过一个提交信道来发布所有已提交的更新。
raft-kv-http-server的设计可以用一张图来表示,但是HTTP Server主要处理KV相关消息,集群配置比变更的消息会直接交给RAFT Server处理。

接下来,我们自下而上地学习raft-kv-http-server的实现。
2. httpapi
httpapi.go
是REST服务器的实现,这并不是我们关注的重点,我们需要关注的只是其通过kvstore
中的哪些方法来提供服务。
1 | func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
上面的代码中,省略的一些对消息的序列化和反序列化及对性能的小优化,我们主要关注其通过哪些方法为请求提供服务。
请求方法 | 处理方式 | 功能 |
---|---|---|
PUT | kvstore.Propose(k,v) | 添加|更新键值对 |
GET | kvstore.Lookup(k) | 查找键对应的值 |
POST | confChangeC <- cc | 向集群中添加节点 |
DELETE | confChangeC <- cc | 从集群中移除节点 |
从表中我们可以看出,与键值对相关的请求都会通过kvstore
提供的方法处理,而有关集群配置的请求则是会编码为etcd/raft/v3/raftpb
中proto定义的消息格式,直接传入confChangeC
信道。从main.go
中可以看出,该信道的消费者是raft模块。
3. kvstore
kvstore
是连接RAFT Sever和REST Server的桥梁,是实现键值存储功能的重要组件,但是其实现很简单。
kvstore.go
中kvstore
结构体非常简单,只有四个字段:
1 | // a key-value store backed by raft |
其中,proposeC
信道我们将在之后讲解。除此之外,该结构体中只有一个读写锁mu
、一个由map实现的键值存储kvStore
,和一个etcd提供的默认快照管理模块实现的snapshotter
。
目前,从kvstore
中,我们看不到多少有用的信息。接下来,我们关注一下创建KV存储的函数:
1 | func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore { |
newKVStore
函数的参数除了snapshotter
外,proposeC
、commitC
和errorC
均为信道。其中proposeC
为输入信道,commitC
和errorC
为输出信道。我们可以推断出,kvStore
会通过proposeC
与raft模块交互,并通过commitC
和errorC
接收来自raft模块的消息。snapshotter
来源于raft模块,启动时raft先夹在wal日志和wal快照,然后kvStore
才能加载快照。
newKVStore
中的逻辑也非常简单,将传入的参数写入kvStore
结构体相应的字段中。先调用loadSnapshot
方法从磁盘中导入快照,然后启动一个goroutine调用readCommits
方法,来循环处理来自raft模块发送过来的消息。
readCommits
方法稍有些复杂,我们先看看其比较简单的方法:
1 | func (s *kvstore) Lookup(key string) (string, bool) { |
Lookup
方法会通过读锁来访问其用来记录键值的map,防止查找时数据被修改返回错误的结果。Propose
方法将要更新的键值对编码为string,并传入proposeC
信道,交给raft模块处理。loadSnapshot
方法从快照指针snapshotter
中加载快照,getSnapshot
和recoverFromSnapshot
方法分别将记录键值的map序列化与反序列化,并加锁防止争用。
接下来,我们深入readCommits
的实现:
1 | func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) { |
该方法会循环遍历commitC
信道中raft模块传下来的消息。因为raft-kv-http-server功能简单,因此其通过nil表示raft完成重放日志的信号或用来通知kvstore
从上一个快照恢复的信号。
当commit
为nil时,该方法会通过kvstore
的快照管理模块snapshotter
尝试加载上一个快照;当commit
不为nil时,说明这是raft模块发布的已经通过共识提交了的键值对,此时从commit
的data
中反序列化出键值对,并加锁修改map中的键值对。
我们可以看到,kvstore
中基本上没有多少与raft相关的处理逻辑,大部分代码时对键值对存储抽象本身的实现。
4. raftnode
raft模块的实现在raftnode.go
文件中,我们将其拆解成四个部分进行解读:
- 4.1 struct: raftnode结构体
- 4.2 startRaft: raft的创建与启动
- 4.3 serveRaft: raft的http接口
- 4.4 serveChannels: raft的信道处理
4.1 struct
1 | // A key-value stream backed by raft |
在结构体中,有四个用于与其他组件交互的信道:
信道 | 描述 |
---|---|
proposeC <-chan string | 接收来自其他组件传入的需要通过raft达成共识的普通提议。 |
confChangeC <-chan raftpb.ConfChange | 接收来自其他组件的需要通过raft达成共识的集群变更提议。 |
commitC chan<- *string | 用来已通过raft达成共识的已提交的提议通知给其他组件的信道。 |
errorC chan<- error | 用来将错误报告给其他组件的信道。 |
结构体中还有一些用来记录节点信息的字段:
字段 | 描述 |
---|---|
id int | 节点id,同样也作为raft回话中的client id。 |
peer []string | 对等raft节点的url。 |
join string | 如果该节点是以加入已有集群的方式启动,那么该值为true,否则是false。 |
walDir string | 预写日志的目录路径。 |
snapDir string | 保存快照的目录路径。 |
getSnapshot func()([]byte, error) | 获取快照的方法签名。 |
confState raftpb.ConfState | 集群配置状态(详见其声明)。 |
snapshotIndex uint64 | 快照中的状态下最后一条日志的索引。 |
appliedIndex uint64 | 已应用的最后一条日志的索引。 |
结构体中还保存了etcd/raft提供的接口与其所需的相关组件:
字段 | 描述 |
---|---|
node raft.Node | Etc/raft的核心接口,对于一个最简单的实现来说,开发者只需要与该接口打交道即可实现机遇raft的服务。 |
raftStorage *raftMemoryStorage | 用来保存raft状态的接口,etcd/raft/storage.go中定义了etc/raft模块所需的稳定存储接口,并提供了一个实现该接口的内存存储MemoryStorage [^1],raft-kv-http-server使用了该实现。 |
wal *wal.WAL | 预写日志实现,raft-kv-http-server直接使用了etc/wal模块中的实现。 |
snapshotter *snap.Snapshotter | 快照管理器的指针。 |
snapshotterReady chan *snap.Snapshoptter | 一个用来发送snapshotter加载完毕的信号的“一次性”信道。因为snapshotter |
snapCount uint64 | 当WAL中的日志超过该值时,触发快照操作并压缩日志。 |
transport *rafthttp.Transport | etcd/raft模块通信时使用的接口。同样,这里使用了基于http的默认实现。 |
etcd/raft的
Storage
接口和Transport
接口让用户能够根据需求自定义稳定存储模块和通信模块。使用Storage
存储的数据需要被稳定存储,也就是说,即使服务器因断电等问题关机,在服务器重启后也能够恢复到断电前的最终状态。有些读者可能会疑惑,这里的raft-kv-http-server使用的是MemoryStorage
,而内存是易失存储,为什么可以当作稳定存储使用?这是因为在raft-kv-http-server的实现中,每次重启时都会通过快照和预写日志恢复MemoryStorage
,而快照和预写日志是保存在稳定存储上的。这样,通过快照、预写日志、MemoryStorage
的组合,可以实现稳定存储。这样做的好处之一是,预写日志是仅追加(Append-Only)的且快照写入的是连续的空间,这样可以减少对稳定存储的随机写入,提供系统吞吐量。
此外,在结构体中,还有一些通过chan struct{}
信道实现的信号量:
信号 | 描述 |
---|---|
stopC | 提议信道关闭信号 |
httpStopC | 通知用于raft通信的http服务器关闭的信号 |
httpDoneC | 用于raft通信的http服务器关闭的信号 |
4.2 startRaft
在创建raftNode时,需要提供节点id
、对等节点urlpeers
、是否要加入已存在的集群join
、获取快照的函数签名getSnapshot
、提议信道proposeC
、配置变更提议信道confChangeC
这些参数:
1 | // newRaftNode initiates a raft instance and returns a committed log entry |
在newRaftNode
函数中,仅初始化了raftNode
的部分参数,其余的参数会在导入预写日志后后配置。随后,该函数启动了一个协程,该写协程调用了raftNode
的startRaft()
方法来启动raft节点。当前函数会将raft模块用来通知已提交的提议的信道、报错信道、和快照管理器加载完成信号的信道回给调用者。
接下来,我们跟随该方法进入raftNode.startRaft()
方法中。
1 |
|
startRaft
方法虽然看上去很长,但是实现的功能很简单。
首先,startRaft
方法检查快照目录是否存在,如果不存在则为其创建目录。然后创建基于该目录的快照管理器。创建完成后,向snapshotterReady
信道写入该快照管理器,通知其快照管理器已经创建完成。
接着,程序检查是否有旧的预写日志存在,并加载旧的预写日志,重放预写日志的代码在下文中会进一步分析。
在加载完成后,程序设置了etcd/raft模块所需的配置,并从该配置上启动或者重启节点(取决于有没有旧的预写日志文件)。etcd/raft在的raft.StartNode
和raft.RestartNode
函数分别会根据配置启动或者重启raft服务器节点,并返回一个Node
接口的实例。正如前文中提到的,Node
接口是开发者依赖etcd/raft实现时唯一需要与其打交道的接口。程序将Node
接口的实例记录在了raftNode
的node
字段中。
在node
创建完成后,程序配置并开启了通信模块,开始与集群中的其他raft节点通信。
在一切接续后,程序启动了两个goroutine,分别是raftNode.serveRaft()
和raftNode.serveChannels
。其中raftNode.serveRaft()
用来监听其他raft节点的消息,消息的处理主要在Transport
接口的实现中编写,因此在这里不对其进行详细的分析,感兴趣的读者可以自行参考源码中的实现;raftNode.serveChannels()
用来处理raftNode
中各种信道,后文会对其进行详细分析。
下面,我们先来分许重放预写日志的逻辑:
1 | // replayWAL replays WAL entries into the raft instance. |
replayWAL
方法为raftNode
重放其预写日志并返回日志文件。首先该方法会通过raftNode.loadSnapshot()
方法加载快照,如果快照不存在该方法会返回nil
。接着,通过raftNode.openWAL(snapshot)
方法打开预写日志。该方法会根据快照中的日志元数据(这里的元数据与论文中的一样,记录了快照覆盖的最后一个日志条目的index和term)打开相应的预写日志,如果快照不存在,则会打开或者创建一个从初始状态开始的预写日志(当节点第一次启动时,即没有快照文件又没有预写日志文件,此时会为其创建预写日志文件;而节点是重启但重启前没有记录或日志,则会为其打开已有的从初始状态开始的预写日志)。之后,程序将快照应用到raft的存储MemoryStorage
中,并将预写日志中记录的硬状态HardState
应用到存储中(硬状态是会被持久化的状态,etcd/raft对论文中的实现进行了优化,因此保存的状态稍有不同。)除了快照之外,重放时还需要将预写日志中的日志条目应用到存储中(快照之后的持久化状态)。
4.3 serveRaft
raftnode.serveRaft()
是RAFT Server用来接收与其他节点消息的方法。
1 | func (r *raftNode) serveRaft() { |
serveRaft()
中创建了一个自定义TCP连接监听器stoppableListener
,这个监听器主要通过httpStopC
这个信道实现暂停服务,即返回一个服务器暂停的错误,这里不再详细说明源码。然后基于该监听器开启了http服务,接口的逻辑由Transport.Handler()
实现:
1 | func (t *Transport) Handler() http.Handler { |
pipelineHandler
、streamHandler
、snapshotHandler
这三者都实现了Handler
接口。
pipelineHandler
读取从其他节点发送过来的请求数据,然后转发给给定的状态机处理。streamHandler
用于将连接进行封装,通过心跳来维护长连接。snapshotHandler
接收并处理快照消息。
4.4 serveChannels
raftNode.serveChannels()
是RAFT Server用来处理各种信道的输入输出的方法,也是与etcd/raft模块中Node
接口的实现交互的方法。
serveChannels()
方法可以分为两个部分,该方法本身会循环处理raft有关的逻辑,如处理定时器信号驱动Node
、处理Node
传入的Ready
结构体、处理通信模块报告的错误或者停止信号等;该方法还启动了一个goroutine,该goroutine中循环处理来自proposeC
和confChangeC
两个信道的消息。
在这两部分开始前,该方法做了一些初始化:
1 | func (r *raftNode) serveChannels() { |
首先,该方法从当前的快照的元数据设置raftNode
的相关字段,并设置一个每100毫秒产生一个信号的循环定时器。serveChannels
的循环会根据这个信号调用Node
接口的Tick()
方法,驱动Node
执行。
接下来,我们先来看看serveChannels
中启动的用来处理来自proposeC
和confChangeC
两个信道的消息的goroutine。
1 | func (r *raftNode) serveChannels() { |
这部分逻辑很简单。因为在循环中,如果proposeC
或confChangeC
中的一个被关闭,程序会将其置为nil
,所以只有二者均不是nil
时才执行循环。每次循环会通过select选取一个有消息的信道,通过Node
接口提交给raft服务器。当循环结束时,关闭stopC
信道,即发送关闭信号。
serveChannels()
中的循环是与Node
接口交互的重要逻辑。
1 | func (r *raftNode) serveChannels() { |
该循环同时监听4个信道:
- 循环定时器的信道,每次收到信号后,调用
Node
接口的Tick
函数驱动Node
。 Node.Ready()
返回的信道,每当Node
准备好一批数据后,会将数据通过该信道发布。开发者需要对该信道收到的Ready
结构体中的各字段进行处理。在处理完成一批数据后,开发者还需要调用Node.Advance()
告知Node
这批数据已处理完成,可以继续传入下一批数据。- 通信模块报错信道,收到来自该信道的错误后
raftNode
会继续上报该错误,并关闭节点。 - 用来表示停止信号的信道,当该信道被关闭时,阻塞的逻辑会从该分支运行,关闭节点。
其中,Node.Ready()
返回的信道逻辑最为复杂。因为其需要处理raft状态及传入的各种数据,并交付给相应的模块处理。etcd/raft的Ready
结构体中包含如下数据:
1 | // Ready encapsulates the entries and messages that are ready to read, |
字段 | 描述 |
---|---|
*SoftState | 当前节点的软状态(易失状态)。如果该状态没有任何更新则该字段为nil 。软状态不需要被处理或存储,仅当用户需要其中信息时才需要使用该字段。 |
pb.HardState | raft的硬状态,在节点向其他节点发送消息前,需要先存储硬状态。同样,如果其没有任何更新,该字段为一个空的硬状态。 |
ReadStates []ReadState | 与线性一致性读(lineraizable read)相关状态,raft-kv-http-server中没有相关的处理逻辑,在后续的对etcd/raft分析的文章中,会详细介绍这一字段。 |
Entries []pb.Entry | 需要保存到稳定存储的日志条目,其需要在向其他节点发送消息前存储。 |
Snapshot pb.Snapshot | 需要被保存到稳定存储的快照。 |
CommittedEntries []pb.Entry | 已通过raft算法提交的日志条目,开发者需要将这些条目应用到自己的状态机中(在raft-kv-http-server中即为kvstore )。这些条目在之前已经被应用到了稳定存储中。 |
Messages []pb.Message | 需要发送给其他节点的消息。在发送这些消息前,需要先将HardState 和Entries 保存到稳定存储中。如果这些消息中有MsgSnap 消息(用于传输快照的消息),开发者必须在节点收到快照会这接收快照失败后通过调用ReportSnapshot 方法通知Node 。(因为leader向某follower发送快照时会暂停向该folower发送raft日志的操作,因此其需要报告快照发送完成或失败以让leader继续对其进行操作。) |
MustSync bool | 该字段表示HardState 和Entries 是否必须同步写入磁盘,如果该字段为false ,则可以异步写入。 |
Ready
结构体中各个字段的注释已经很好地说明了其处理方式,这很有助于我们理解raft-kv-http-server中对Ready
信道对处理方式:
- 将
HardState
和Entries
写入预写日志,将其保存在稳定存储上; - 如果有快照,先将快照保存在稳定存储中,然后应用快照,最后通过
commitC
写入nil
值通知kvstore
加载快照; - 将
Entries
追加到MemoryStorage
中(第一步仅写入到了预写日志中); - 通过信道模块将
Messages
中的消息分发给其它raft节点; - 通过
publishEntries
方法发布新增的日志条目; - 通过
maybeTriggerSnapshot
方法检查memoryStorage
中日志条目长度,如果超过设定的最大长度,则触发快照机制并压缩日志;
虽然看上去步骤较多,但是处理逻辑都很简单。这里我们仅看一下第5步的逻辑。
在第5步中,首先通过entriesToApply
方法,从Ready
结构体的Entries
字段中找到还没有应用到本地状态机中的日志起点即后续日志条目,然后通过publishEntries
方法发布这些日志条目。publishEntries
方法实现方式如下:
1 | // publishEntries writes committed log entries to commit channel and |
publishEntries
会遍历传入的日志列表,对于普通的日志条目,先将其反序列化通过commitC
信道传给kvstore
处理;对于用于变更集群配置的日志,则根据变更的内容(如增加或删除集群中的某个节点),修改通信模块中的相关记录。然后修改appliedIndex
为当前日志的Index
。除此之外,publishEntries
还判断了日志Index
是否为前文中提到的lastIndex
。如果当前Index
等于lastIndex
,则说明之前的操作是在重放日志,且此时日志重放完成,因此需要向commitC
信道写入nil
以通知kvstore
日志重放完成。
5. 总结
至此,我们已经分析了raftexample大部分的主要逻辑。在main.go
中有raftexample中各模块的创建于初始化逻辑、raftnode.go
中还有一些如关闭服务器的逻辑。
raftexample是官方提供的使用了etcd/raft的最基本的功能的简单的kv存储的示例。通过分析学习这段代码,可以简单了解etcd/raft的基本使用方式。当然,etcd/raft还提供了很多高级功能,我们会在后续的文章中介绍与分析。