0. 引言

阅读本文前提条件时阅读过Diego Ongaro的论文In Search of an Understandable Consensus Algorithm (Extended Version),或者阅读过中文翻译

本文会对etcd/raft中Raft选举算法的实现与优化进行分析,RAFT论文中提到的部分,本文中不会做详细的解释。

1. Raft选举算法优化

在leader选举方面,etcd/raft对论文中的基本RAFT算法做了三种优化。这三种优化都在Diego Ongaro的博士论文

CONSENSUS: BRIDGING THEORY AND PRACTICE6.4 Processing read-only queries more efficiently9.6-Preventing disruptions when a server rejoins the cluster中有提到。

etcd/raft实现的与选举有关的优化有:

  1. Pre-Vote;
  2. Check Quorum;
  3. Leader Lease

在这三种优化中,只有Pre-VoteLeader Lease最初时对选举过程的优化,Check Quorum起初是为了更高效地实现线性一致性读(Linearizable Read)而做出的优化,但是由于Leader Lease需要依赖Check Quorum,因此笔者也将其放在这里一起讲解。本系列将etcd/raft对实现线性一致性读的优化留在了后续的文章中,本文仅介绍为了实现更高效的线性一致性读需要在选举部分做出的优化。

除此之外,etcd/raft还实现了Leader Transfer,即主动地进行leader的交接。其实现方式比较简单,只需要让希望成为新leader节点主动发起投票请求即可。需要注意的是,Leader Transfer不保证交接一定成功,只有目标节点能够得到数量达到quorum的选票时才能当选leader,Leader Transfer类型的投票不受Pre-VoteCheck QuorumLeader Lease机制约束。

1.1 Pre-Vote

如下图所示,当Raft集群的网络发生分区时,会出现节点数达不到quorum(达成共识至少需要的节点数)的分区,如下图中的Partition 1。


在节点数能够达到quorum的分区中,选举流程会正常进行,该分区中的所有节点的term最终会稳定为新选举出的leader节点的term。不幸的是,在节点数无法达到quorum的分区中,如果该分区中没有leader节点,因为节点总是无法收到数量达到quorum的投票而不会选举出新的leader,所以该分区中的节点在election timeout(选举超时)后,会增大term并发起下一轮选举,这导致该分区中的节点的term会不断增大。

如果网络一直没有恢复,这是没有问题的。但是,如果网络分区恢复,此时,达不到quorum的分区中的节点的term值会远大于能够达到quorum的分区中的节点的term,这会导致能够达到quorum分区的leader退位(step down)并增大自己的term到更大的term,使集群产生一轮不必要的选举。

Pre-Vote机制就是为了解决这一问题而设计的,其解决的思路在于不允许达不到quorum的分区正常进入投票流程,也就避免了其term号的增大。为此,Pre-Vote引入了“预投票”,也就是说,当节点election timeout后,它们不会立即增大自身的term并请求投票,而是先发起一轮预投票。收到预投票请求的节点不会退位。只有当节点收到了达到quorum的预投票响应时,节点才能增大自身term号并发起投票请求。这样,达不到quorum的分区中的节点永远无法增大term,也就不会在分区恢复后引起不必要的一轮投票。

1.2 Check Quorum

在Raft算法中,保证线性一致性读取的最简单的方式,就是将读请求同样当作一条Raft提议,通过与其它日志相同的方式执行,因此这种方式也叫作Log Read。显然,Log read的性能很差,而在很多系统中,读多写少的负载时很常见的场景。因此,为了提高读取的性能,就要试图绕过日志机制。

但是,直接绕过日志机制从leader读取,可能会读到陈旧的数据,也就是说存在stale read的问题。在下图的场景中,假设网络分区前,Node 5是整个集群的leader。在网络分区后,Partition 0分区中选举出了新leader,也就是图中的Node 1。


但是,由于网络分区,Node 5无法收到Partition 0中节点的消息,Node 5不会意识到集群中出现了新的leader。此时,虽然它不能成功完成日志提交,但是如果读取时绕过了日志,它还是能够提供读取服务的。这会导致连接到Node 5的client读取到陈旧的数据。

Check Quorum可以减轻这一问题带来的影响,其机制也非常简单:让leader每隔一段时间主动检查follower是否活跃。如果活跃的follower数量达不到quorum,那么说明该leader可能是分区前的旧leader,所以此时该leader会主动退位转为follower。

需要注意的是,Check Quorum并不能完全避免stale read的发生,只能减少其发生的时间,降低影响。如果需要严格的线性一致性,需要通过其它机制实现。

1.3 Leader Lease

分布式系统中的网络十分复杂,有时可能出现网络不完全分区的情况,即整个网络拓扑图是一个连通图,但是可能并非任意的两个节点都能互相访问。


这种现象不止会出现在网络故障中,还会出现在成员变更中,在通过ConfChange移除节点时,不同节点应用该ConfChange的时间可能不同,这也可能导致这一现象发生。

在上图的场景下,Node 1与Node 2之间无法通信。如果它们之间的通信中断前,Node 1是集群的leader,在通信中断后,Node 2无法再收到来自Node 1的心跳。因此,Node2会开始选举。如果在Node 2发起选举前,Node 1和Node 3中都没有新的日志,那么Node 2仍可以收到能达到quorum的投票(来自Node 2本身的投票和来自Node 3的投票),并成为leader。

Leader Lease机制对投票引入了一条新的约束以解决这一问题:当节点在election timeout前,如果收到了leader的消息,那么它不会为其它发起投票或者预投票请求的节点投票。也就是说,Leader Lease机制会阻止了正常工作的集群中的节点给其他节点投票。

Leader Lease需要依赖Check Quorum机制才能正常工作。接下来通过一个例子说明其原因。

假如在一个5个节点组成的Raft集群中,出现了下图中的分区情况:Node 1和Node 2互通,Node 3、Node 4、Node 5之间两两互通,Node 5与任一节点不通。在网络分区前,Node 1是集群的leader。


在既没有Leader Lease也没有Check Quorum的情况下,Node 3、Node 4会因收不到leader的心跳而发起投票,因为Node 2、Node 3、Node 4互通,该分区节点数能达到quorum,因此它们可以选举出新的leader。

而在使用了Leader Lease而不是用Check Quorum的情况下,由于Node 2仍能够收到原leader Node 1的心跳,受Leader Lease机制的约束,它不会为其它节点投票。这会导致即使整个集群中存在可用节点数达到quorum的分区,但是集群仍无法正常工作。

而如果同时使用了Leader LeaseCheck Quorum,那么在上图的情况下,Node 1会在election timeout超时后因检测不到数量达到quorum的活跃节点而退位为follower。这样,Node 2、Node 3、Node 4之间的选举可以正常进行。

1.4 引入的新问题和解决方案

引入Pre-VoteCheck Quorum(etcd/raft的实现中,开启Check Quorum会自动开启Leader Lease)会为Raft算法引入一些新的问题。

当一个节点收到了term比自己低的消息时,原本的逻辑是直接忽略该消息,因为term比自己低的消息仅可能是因为网络延迟的迟到的旧消息。然而,开启了这些机制后,在如下的场景中会出现问题。

(1)场景一


如上图所示:在开启了Check Quorum / Leadrer Lease后(假设没有开启Pre-Vote,Pre-Vote的问题在下一场景中讨论),数量达不到quorum的分区中的leader会退位,且该分区中的节点永远都无法选举出leader,因此该分区中的节点永远都无法选举出leader,因此该分区中的term会不断增大。当该分区与整个网络集群的网络恢复后,由于开启了Check Quorum / Leader Lease,即使该分区的节点有更大的term,由于原分区的节点工作正常,它们选举的请求会被丢弃。同时由于该节点的term比原分区的leader节点的term大,因此它会丢弃原分区的leader的请求。这样,该节点永远都无法重新加入集群,也无法当选新leader。(详见issue #5451issue #5468

(2)场景2

Pre-Vote机制也有类似的问题。如上图所示,假如发起预投票的节点,在预投票通过后正要发起正式投票的请求时出现网络分区。此时,该节点的term会高于原集群的term。而集群因没有收到真正的投票请求,不会更新term,继续正常运行。在网络分区恢复后,原集群的term低于分区节点的term,但是日志比分区节点更新。此时,该节点发起的预投票请求因日志落后会被丢弃,而原集群leader发给该节点的请求会因term比该节点小而被丢弃。同样,该节点永远都无法重新加入集群,也无法当选新leader。(详见issue #8501issue #8525

(3)场景3

在更复杂的情况中,比如,在变更配置时,开启了原本没有开启的Pre-Vote机制。此时可能会出现与上一条类似的情况,即可能因term更高但是log更旧的节点存在导致整个集群死锁,所有节点都无法预投票成功。这种场景比场景2更危险,场景2只有之前分区的节点无法加入集群,这种情况下整个集群都不可用。(详见issue #8501issue #8525

为了继绝以上问题,节点在收到term比自己低的请求时,需要做特殊的处理,处理逻辑也很简单:

  1. 如果收到了term比当前节点term低的leader的消息,且集群开启了Check Quorum / Leader LeasePre-Vote,那么发送一条term为当前term的消息,令term低的节点成为follower。(针对场景1、场景2
  2. 对于term比当前节点term低的预投票请求,无论是否开启了Check Quorum / Leader LeasePre-Vote,都要通过一条term为当前term的消息,迫使其转为follower并更新term。(针对场景3

2. etcd/raft中Raft选举的实现

这一小节,笔者将分析etcd/raft中选举部分的实现。

2.1 MsgHup与hup

在etcd/raft的实现中,选举的触发是通过MsgHup消息实现的,无论是主动触发选举还是因election timeout都是如此:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// *** node.go ***

func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }

// *** rawnode.go ***

// Campaign causes this RawNode to transition to candidate state.
func (rn *RawNode) Campaign() error {
return rn.raft.Step(pb.Message{
Type: pb.MsgHup,
})
}

// *** raft.go ***

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++

if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
r.logger.Debugf("error occurred during election: %v", err)
}
}
}

因此可以跟着MsgHup的处理流程,分析etcd/raft中选举的实现。etcd/raft通过raft结构体的Step方法实现Raft状态机的状态转移。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (r *raft) Step(m pb.Message) error {
// ...
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
// ...
}
return nil
}

Step方法在处理MsgHup消息时,会根据当前配置中是否开启了Pre-Vote机制,以不同的CampaignType调用hup方法。CampaignType是一种枚举类型,其可能值如下表所示:

枚举值 描述
campaignPreElection 表示Pre-Vote的预选举阶段。
campaignElection 表示正常的选举阶段(仅超时选举,不包括Leader Transfer)。
campaignTransfer 表示Leader Transfer阶段。

接下来对hup的实现进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}

if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}

r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}

hup方法会对节点当前状态进行一些检查,如果检查通过才会试图让当前的节点发起投票或预投票。首先,hup会检查当前节点是否已经是leader,如果已经是leader那么直接返回。接下来,hup通过promotable()方法判断当前节点能否提升为leader。

1
2
3
4
5
6
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}

promotable的判定规则有三条:

  1. 当前节点是否已被集群移除。(通过ProgressTracker.ProgressMap映射中是否有当前节点的id的映射判断。当节点被移除出集群后,被移除的节点id会从该映射中移除。)
  2. 当前节点是否为learner节点。
  3. 当前节点是否还有未被保存到稳定存储中的快照。

这三条规则中,只要有一条为真,那么当前节点就无法成为leader。在hup方法中,除了需要promotable()为真,还需要判断一条规则:

  • 当前的节点已提交的日志中,是否有还未被应用到集群配置变更ConfChange消息。如果当前节点已提交的日志中还有未应用到ConfChange消息,那么该节点也无法提升为leader。

只有当以上条件都满足后,hup方法才会调用campaign方法,根据配置,开始投票或预投票。

2.2 campaign

campaign是用来发起投票或预投票的重要方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/ campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
if !r.promotable() {
// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
}
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}

因为调用campaign的方法不止有hupcampaign方法首先还是会检查promotable()是否为真。

1
2
3
4
5
6
7
8
9
10
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}

在开启Pre-Vote后,首次调用campaign时,参数为campaignPreElection。为此会调用becomeCandidate()方法,该方法不会修改当前节点的Term值,因此发送的MsgPreVote消息的Term应为当前的Term + 1。而如果没有开启Pre-Vote或已经完成预投票进行正式投票的流程或是Leader Transfer时(即使开启了Pre-VoteLeader Transfer也不会进行预投票),会调用becomeCandidate方法。该方法会增大当前节点的Term,因此发送MsgVoteTerm就是此时的TermbecomeXXX用来将当前状态机的状态与相关行为切换相应的角色,笔者会在后文详细分析其实现与修改后的行为。

接下来,campaign方法开始发送投票请求。在向其他节点发送请求之前,该节点会先投票给自己:

1
2
3
4
5
6
7
8
9
10
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}

poll方法会在本地更新本地的投票状态并获取当前投票结果。如果节点投票给自己后就赢得了选举,这说明集群是以单节点的模式启动的,那么如果当前是预投票阶段,当前节点就能立即开启投票流程,如果已经在投票流程中或是在Leader Transfer就直接当选leader即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, ißndex: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}

请求的Term字段就是之前记录的term,即预投票阶段为当前Term + 1、投票阶段为当前的Term

2.3 Step方法与step

在前文中,笔者提到过Step函数是Raft状态机状态转移的入口方法,Step方法的参数是Raft消息。Step方法会检查消息的Term字段,对不同的情况进行不同的处理。Step方法还会对与选举相关的一些消息进行特殊处理。最后,Step会调用Raft接口体step字段中记录的函数签名。step字段的定义如下:

1
2
3
4
5
// Definition of `stepFunc`
type stepFunc func(r *raft, m pb.Message) error

// step field in struct `raft`
step stepFunc

上一节中提到的becomeXXX函数会让状态机切换到相应角色,并切换raft结构体的step字段中记录的函数。让不同角色的节点能够用不同的逻辑来处理Raft消息。

在调用step字段记录的函数处理请求前,Step会根据消息的Term字段,进行一些预处理。

2.3.1 对Term为0的消息的预处理

1
2
3
4
5
6
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
// ...
}

etcd/raft使用Term为0消息作为本地消息,Step不会对本地消息进行特殊处理,直接进入之后的逻辑。

2.3.2 对Term大于当前节点Term的消息的预处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}

对于Term大于当前节点的Term的消息,如果消息类型为MsgVoteMsgPreVote,先要检查这些消息是否需要处理,其判断规则如下:

  1. force: 如果该消息的CampaignTypecampaignTransferforce为真,表示该消息必须被处理;
  2. inLease:如果开启了Check Quorum(开启Check Quorum会自动开启Leader Lease),且election timeout前收到过leader的消息,那么inLease为真,表示当前Leader Lease还没有过期。
  3. 如果!force && inLease,说明该消息不需要被处理,可以直接返回。
  4. 对于Term大于当前节点的Term的消息,Step还需要判断是否需要切换自己的身份为follower。

对于Term大于当前节点的Term的消息,Step还需要判断是否切换自己的身份为follower,其判断规则如下:

  1. 如果消息为MsgPreVote预投票消息,那么不需要转follower。
  2. 如果消息为MsgPreVoteRespReject字段不为真时,那么不需要转为follower。
  3. 否则,转为follower。

在转为follower时,新的Term就是该消息的Term。如果消息类型是MsgAppMsgHeartbeatMsgSnap,说明这是来自leader的消息,那么直接将lead字段直接置为该消息的发送者的id,否则不知道当前的leader节点是谁。

2.3.3 对Term小于当前节点Term的消息的预处理

最后,如果消息的Term比当前Term小,因存在1.4节中提到的问题,除了忽略消息外,还要做额外的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
case m.Term < r.Term:
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
// partition, and it is now unable to either win an election or to rejoin
// the majority on the old term. If checkQuorum is false, this will be
// handled by incrementing term numbers in response to MsgVote with a
// higher term, but if checkQuorum is true we may not advance the term on
// MsgVote and must generate other messages to advance the term. The net
// result of these two features is to minimize the disruption caused by
// nodes that have been removed from the cluster's configuration: a
// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
// disruptive term increases, by notifying leader of this node's activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else if m.Type == pb.MsgPreVote {
// Before Pre-Vote enable, there may have candidate with higher term,
// but less log. After update to Pre-Vote, the cluster may deadlock if
// we drop messages with a lower term.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else {
// ignore other cases
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
}
return nil
}

这里对应了[1.4](###1.4 引入的新问题和解决方案)中提出的问题:

  1. 如果集群开启了Check Quorum / Leader LeasePre-Vote,通过一条消息term为当前Term的消息令term低的节点成为follower;
  2. 如果是term较低的预投票请求,通过一条消息tern为当前term的消息迫使candidate转为follower并更新term。

2.4 becomeXXX与stepXXX

在上文中笔者介绍过,becomeXXX函数用于切换Raft状态机角色,stepXXX是Raft状态机的相应角色状态转移的行为。

etcd/raft中becomeXXX共有四种:

  • becomeFollower
  • becomeCandidate
  • becomePreCandidate
  • becomeLeader

stepXXX共有三种:

  • stepLeader
  • stepCandidate
  • stepFollower

becomeCandatebecomePreCandidate相应的行为均为stepCandidate

这一小节中,笔者将介绍becomeXXXstepXXX与选举相关的逻辑。

2.4.1 Candidate、PreCandidate

CandidatePreCandidate的行为有很多相似之处,本节笔者讲分析二者行为并对比异同之处。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (r *raft) becomeCandidate() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

func (r *raft) becomePreCandidate() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {
panic("invalid transition [leader -> pre-candidate]")
}
// Becoming a pre-candidate changes our step functions and state,
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r.step = stepCandidate
r.prs.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}

func (r *raft) reset(term uint64) {
if r.Term != term {
r.Term = term
r.Vote = None
}
r.lead = None

r.electionElapsed = 0
r.heartbeatElapsed = 0
r.resetRandomizedElectionTimeout()

r.abortLeaderTransfer()

r.prs.ResetVotes()

// ...
}

预选举和选举的区别主要在于预选举不会改变状态机的term也不会修改当前term的该节点投出的选票。下表列出了becomePreCandidatebecomeCandidate修改或未修改的与选举相关的重要字段:

重要字段 becomePreCandidate becomeCandidate 描述
step stepCandidate stepCandidate step行为
tick tickElection tickElection tick行为
Vote mot modified self id 当前term将选票投给谁
state StatePreCandidate StateCandidate 状态机角色
lead None None 当前term的leader
prs.Votes rest reset 收到的选票

无论是PreCandidate还是Candidate,其行为都是stepCandidate。其中,部分字段是通过reset函数修改的。reset方法用于状态机切换角色时初始化相关字段。因为切换到PreCandidate严格来说并不算真正地切换角色,因此becomePreCandidatebecomeLeaderbecomeFollower都调用了reset方法。本文仅关注reset中与选举有关的部分,reset中还有一些与日志复制相关的逻辑,笔者会在后续的文章中分析。

接下来分析stepCandidate中与选举相关的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}

从如上代码中,可以看到PreCandidateCandidate对不同消息的处理方式:

  1. MsgPropMsgTimeoutNow:丢弃。
  2. MsgAppMsgHeartbeatMsgSnap:收到了来自leader的消息,转为follower。
  3. 相应地MsgPreVoteRespMsgVoteResp:通过poll记录选票并获取当前选举状态。

在条件3中,当前节点在获取选举状态后,会根据不同的状态做出不同的处理:

  1. VotePending:暂无选举结果,不做处理。
  2. VoteWon:赢得选举,如果当前状态机的角色是PreCandidate,那么调用campaign进行正式选举;如果当前状态机的角色是Candidate,那么当选leader,并向集群广播MsgAppend消息以通知集群中节点已有leader产生。
  3. VoteList:选举失败,变为follower。

2.4.2 Leader

leader中与选举相关逻辑的比重较少,这里简单介绍一下。

首先,是becomeLeader及其修改的选举相关的重要字段:

1
2
3
4
5
6
7
8
9
10
11
12
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// ...
}
重要字段 becomeLeader 描述
step stepLeader step行为
tick tickHeartbeat tick行为
Vote None or not modified 当前term将选票投给谁,如果投票没有更新,那么不会修改该字段
state StaleLeader 状态机角色
lead self id 当前term的leader
prs.Votes rest 收到的选票

接下来,分析stepLeader中与选举相关的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383

func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
// The leader should always see itself as active. As a precaution, handle
// the case in which the leader isn't in the configuration any more (for
// example if it just removed itself).
//
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
// leader steps down when removing itself. I might be missing something.
if pr := r.prs.Progress[r.id]; pr != nil {
pr.RecentActive = true
}
if !r.prs.QuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
// Mark everyone (but ourselves) as inactive in preparation for the next
// CheckQuorum.
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
if id != r.id {
pr.RecentActive = false
}
})
return nil
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return ErrProposalDropped
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}

for i := range m.Entries {
e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
var ccc pb.ConfChange
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
} else if e.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
}
if cc != nil {
alreadyPending := r.pendingConfIndex > r.raftLog.applied
alreadyJoint := len(r.prs.Config.Voters[1]) > 0
wantsLeaveJoint := len(cc.AsV2().Changes) == 0

var refused string
if alreadyPending {
refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
} else if alreadyJoint && !wantsLeaveJoint {
refused = "must transition out of joint config first"
} else if !alreadyJoint && wantsLeaveJoint {
refused = "not in joint state; refusing empty conf change"
}

if refused != "" {
r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}

if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
case pb.MsgReadIndex:
// only one voting member (the leader) in the cluster
if r.prs.IsSingleton() {
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
return nil
}

// Postpone read only request when this leader has not committed
// any log entry at its term.
if !r.committedEntryInCurrentTerm() {
r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
return nil
}

sendMsgReadIndexResponse(r, m)

return nil
}

// All other message types require a progress for m.From (pr).
pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true

if m.Reject {
// RejectHint is the suggested next base entry for appending (i.e.
// we try to append entry RejectHint+1 next), and LogTerm is the
// term that the follower has at index RejectHint. Older versions
// of this library did not populate LogTerm for rejections and it
// is zero for followers with an empty log.
//
// Under normal circumstances, the leader's log is longer than the
// follower's and the follower's log is a prefix of the leader's
// (i.e. there is no divergent uncommitted suffix of the log on the
// follower). In that case, the first probe reveals where the
// follower's log ends (RejectHint=follower's last index) and the
// subsequent probe succeeds.
//
// However, when networks are partitioned or systems overloaded,
// large divergent log tails can occur. The naive attempt, probing
// entry by entry in decreasing order, will be the product of the
// length of the diverging tails and the network round-trip latency,
// which can easily result in hours of time spent probing and can
// even cause outright outages. The probes are thus optimized as
// described below.
r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
nextProbeIdx := m.RejectHint
if m.LogTerm > 0 {
// If the follower has an uncommitted log tail, we would end up
// probing one by one until we hit the common prefix.
//
// For example, if the leader has:
//
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 3 3 3 5 5 5 5 5
// term (F) 1 1 1 1 2 2
//
// Then, after sending an append anchored at (idx=9,term=5) we
// would receive a RejectHint of 6 and LogTerm of 2. Without the
// code below, we would try an append at index 6, which would
// fail again.
//
// However, looking only at what the leader knows about its own
// log and the rejection hint, it is clear that a probe at index
// 6, 5, 4, 3, and 2 must fail as well:
//
// For all of these indexes, the leader's log term is larger than
// the rejection's log term. If a probe at one of these indexes
// succeeded, its log term at that index would match the leader's,
// i.e. 3 or 5 in this example. But the follower already told the
// leader that it is still at term 2 at index 6, and since the
// log term only ever goes up (within a log), this is a contradiction.
//
// At index 1, however, the leader can draw no such conclusion,
// as its term 1 is not larger than the term 2 from the
// follower's rejection. We thus probe at 1, which will succeed
// in this example. In general, with this approach we probe at
// most once per term found in the leader's log.
//
// There is a similar mechanism on the follower (implemented in
// handleAppendEntries via a call to findConflictByTerm) that is
// useful if the follower has a large divergent uncommitted log
// tail[1], as in this example:
//
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 3 3 3 3 3 3 3 7
// term (F) 1 3 3 4 4 5 5 5 6
//
// Naively, the leader would probe at idx=9, receive a rejection
// revealing the log term of 6 at the follower. Since the leader's
// term at the previous index is already smaller than 6, the leader-
// side optimization discussed above is ineffective. The leader thus
// probes at index 8 and, naively, receives a rejection for the same
// index and log term 5. Again, the leader optimization does not improve
// over linear probing as term 5 is above the leader's term 3 for that
// and many preceding indexes; the leader would have to probe linearly
// until it would finally hit index 3, where the probe would succeed.
//
// Instead, we apply a similar optimization on the follower. When the
// follower receives the probe at index 8 (log term 3), it concludes
// that all of the leader's log preceding that index has log terms of
// 3 or below. The largest index in the follower's log with a log term
// of 3 or below is index 3. The follower will thus return a rejection
// for index=3, log term=3 instead. The leader's next probe will then
// succeed at that index.
//
// [1]: more precisely, if the log terms in the large uncommitted
// tail on the follower are larger than the leader's. At first,
// it may seem unintuitive that a follower could even have such
// a large tail, but it can happen:
//
// 1. Leader appends (but does not commit) entries 2 and 3, crashes.
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (L) 1 2 2 [crashes]
// term (F) 1
// term (F) 1
//
// 2. a follower becomes leader and appends entries at term 3.
// -----------------
// term (x) 1 2 2 [down]
// term (F) 1 3 3 3 3
// term (F) 1
//
// 3. term 3 leader goes down, term 2 leader returns as term 4
// leader. It commits the log & entries at term 4.
//
// -----------------
// term (L) 1 2 2 2
// term (x) 1 3 3 3 3 [down]
// term (F) 1
// -----------------
// term (L) 1 2 2 2 4 4 4
// term (F) 1 3 3 3 3 [gets probed]
// term (F) 1 2 2 2 4 4 4
//
// 4. the leader will now probe the returning follower at index
// 7, the rejection points it at the end of the follower's log
// which is at a higher log term than the actually committed
// log.
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
if pr.MaybeUpdate(m.Index) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
// TODO(tbg): we should also enter this branch if a snapshot is
// received that is below pr.PendingSnapshot but which makes it
// possible to use the log again.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.BecomeProbe()
pr.BecomeReplicate()
case pr.State == tracker.StateReplicate:
pr.Inflights.FreeLE(m.Index)
}

if r.maybeCommit() {
// committed index has progressed for the term, so it is safe
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.maybeSendAppend(m.From, false) {
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.ProbeSent = false

// free one slot for the full inflights window to allow progress.
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}

if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}

rss := r.readOnly.advance(m)
for _, rs := range rss {
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
r.send(resp)
}
}
case pb.MsgSnapStatus:
if pr.State != tracker.StateSnapshot {
return nil
}
// TODO(tbg): this code is very similar to the snapshot handling in
// MsgAppResp above. In fact, the code there is more correct than the
// code here and should likely be updated to match (or even better, the
// logic pulled into a newly created Progress state machine handler).
if !m.Reject {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
// NB: the order here matters or we'll be probing erroneously from
// the snapshot index, but the snapshot never applied.
pr.PendingSnapshot = 0
pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.ProbeSent = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
if pr.IsLearner {
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
return nil
}
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
return nil
}
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return nil
}
// Transfer leadership to third party.
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r.electionElapsed = 0
r.leadTransferee = leadTransferee
if pr.Match == r.raftLog.lastIndex() {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
}
}
return nil
}

stepLeader中处理的消息分为两类,一类是不需要谁是发送者的消息(大多数为本地消息),另一类需哟啊知道谁是发送者的消息(大多数为来自其它节点的消息)。

stepLeader中对第一类消息的处理方式如下:

  1. MsgBeat:该消息为heartbeat timeout后通知leader广播心跳的消息。因此,收到消息后,广播心跳消息。
  2. MsgCheckQuorum:该消息为开启Check Quorum时election timeout后通知leader进行相关操作的消息。因此,检查活跃的节点数是否达到quorum,如果无法达到,那么退位为follower(其相关操作涉及ProgressTracker,笔者会在后续的文章中分析,这里只需要知道其作用即可)。
  3. 如果转移目标是当前节点,而当前节点已经是leader了,那么不做处理。
  4. 记录转移目标,以用做第2步中是否打断了上次转移的依据。
  5. 判断转移目标的日志是否跟上了leader。如果跟上了,向其发送MsgTimeoutNow消息,让其立即超市并进行新的选举;否则正常向其发送日志。如果转移目标的日志没有跟上leader,则leader在处理转移目标对其日志复制消息的响应时,会判断其是否跟上了leader,如果那时跟上了则向其发送MsgTimeoutNow消息,让其立即超时并进行新的选举。这部分代码可在处理MsgAppResp消息遭到:
1
2
3
4
5
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}

此处代码只会在follower跟上了其match index才会执行,详情请见本系列后续文章。

2.4.3 Follower

follower中与选举相关的逻辑不是很多。

首先,还是对becomeFollower中对与选举相关的逻辑进行分析:

1
2
3
4
5
6
7
8
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
重要字段 becomeFollower 描述
step stepFollower step行为
tick tickElection tick行为
state StateFollower 状态机角色
lead 参数lead 当前term的leader

接下来分析stepFollower中对与选举相关的消息的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
// ... ...
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
// ... ...
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
// ... ...
case pb.MsgSnap:
r.electionElapsed = 0
r.lead = m.From
// ... ...
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgTimeoutNow:
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r.hup(campaignTransfer)
// ... ...
return nil
}

可以看到,follower在收到来自leader的MsgAppMsgHeartbeatMsgSnap消息后,会更新当前记录的leader并重置election timeout定时器。而收到应发给leader的消息后,会把消息转发给leader(如MsgTransferLeader消息,这里给出的代码中还省略了一些消息)。因此,这里真正需要关心的与选举相关的消息只有MsgTimeoutNow

2.4.2节 中可以看到,MsgTimeoutNow消息是发生在Leader Transfer时,leader通知目标节点立即超时并发起选举请求的消息。因此,这里直接以campaignTransfer作为参数调用了hup方法。在 2.1节2.2节 中可以看到,hupcampaign方法对campaignTransfercampaignVote的处理几乎一致,只有在写入发起投票的消息时,如果进行的是Leader Transfer,那么会将campaignTransfer写入到消息的Context字段中。在消息的接收方处理该消息时,如果Context字段为campaignTeansfer,那么不会直接忽略该消息,这一点我们可以在 2.3.2节 中看到。