源碼分析 RocketMQ 多副本之 Leader 選主
本文將按照 《RocketMQ 多副本前置篇:初探 raft 協議》 的思路來學習 RocketMQ 選主邏輯。首先先回顧一下關於 Leader 的一些思考:
-
節點狀態
需要引入 3 種節點狀態:Follower(跟隨者)、Candidate(候選者),該狀態下的節點總是會嘗試發起投票,Leader(主節點)。 -
選舉計時器
Follower、Candidate 兩個狀態時,需要維護一個定時器,每次定時時間從 150ms-300ms 之間進行隨機,即每個節點的定時過期不一樣,Follower 狀態時,定時器到點後,觸發一輪投票。節點在收到投票請求、Leader 的心跳請求並作出響應後,需要重置定時器。 -
投票輪次 Team
Candidate 狀態的節點,每發起一輪投票,Team 加一。 -
投票機制
每一輪一個節點只能爲一個節點投贊成票,例如節點 A 中維護的輪次爲 3,並且已經爲節點 B 投了贊成票,如果收到其他節點,投票輪次爲 3,則會投反對票,如果收到輪次爲 4 的節點,是又可以投贊成票的。 -
成爲 Leader 的條件
必須得到集羣中初始數量的大多數,例如如果集羣中有 3 臺集羣,則必須得到兩票,如果其中一臺服務器宕機,剩下的兩個節點,還能進行選主嗎?答案是可以的,因爲可以得到 2 票,超過初始集羣中 3 的一半,所以通常集羣中的機器各位儘量爲奇數,因爲 4 臺的可用性與 3 臺一樣。
溫馨提示:本文是從源碼的角度分析 DLedger 選主實現原理,可能比較枯燥,文末給出了選主流程圖。
DLedger 關於選主的核心類圖
1.1 DLedgerConfig
多副本模塊相關的配置信息,例如集羣節點信息。
1.2 MemberState
節點狀態機,即 raft 協議中的 follower、candidate、leader 三種狀態的狀態機實現。
1.3 raft 協議相關
1.3.1 DLedgerClientProtocol
DLedger 客戶端協議,主要定義如下三個方法,在後面的日誌複製部分會重點闡述。
-
CompletableFuture get(GetEntriesRequest request)
客戶端從服務器獲取日誌條目(獲取數據) -
CompletableFuture append(AppendEntryRequest request)
客戶端向服務器追加日誌(存儲數據) -
CompletableFuture metadata(MetadataRequest request)
獲取元數據。
1.3.2 DLedgerProtocol
DLedger 服務端協議,主要定義如下三個方法。
-
CompletableFuture vote(VoteRequest request)
發起投票請求。 -
CompletableFuture heartBeat(HeartBeatRequest request)
Leader 向從節點發送心跳包。 -
CompletableFuture pull(PullEntriesRequest request)
拉取日誌條目,在日誌複製部分會詳細介紹。 -
CompletableFuture push(PushEntryRequest request)
推送日誌條件,在日誌複製部分會詳細介紹。
1.3.3 協議處理 Handler
DLedgerClientProtocolHandler、DLedgerProtocolHander 協議處理器。
1.4 DLedgerRpcService
DLedger Server(節點) 之間的網絡通信,默認基於 Netty 實現,其實現類爲:DLedgerRpcNettyService。
1.5 DLedgerLeaderElector
Leader 選舉實現器。
1.6 DLedgerServer
Dledger Server,Dledger 節點的封裝類。
接下來將從 DLedgerLeaderElector 開始剖析 DLedger 是如何實現 Leader 選舉的。(基於 raft 協議)。
源碼分析 Leader 選舉
2.1 DLedgerLeaderElector 類圖
我們先一一來介紹其屬性的含義:
-
Random random
隨機數生成器,對應 raft 協議中選舉超時時間是一隨機數。 -
DLedgerConfig dLedgerConfig
配置參數。 -
MemberState memberState
節點狀態機。 -
DLedgerRpcService dLedgerRpcService
rpc 服務,實現向集羣內的節點發送心跳包、投票的 RPC 實現。 -
long lastLeaderHeartBeatTime
上次收到心跳包的時間戳。 -
long lastSendHeartBeatTime
上次發送心跳包的時間戳。 -
long lastSuccHeartBeatTime
上次成功收到心跳包的時間戳。 -
int heartBeatTimeIntervalMs
一個心跳包的週期,默認爲 2s。 -
int maxHeartBeatLeak
允許最大的 N 個心跳週期內未收到心跳包,狀態爲 Follower 的節點只有超過 maxHeartBeatLeak * heartBeatTimeIntervalMs 的時間內未收到主節點的心跳包,纔會重新進入 Candidate 狀態,重新下一輪的選舉。 -
long nextTimeToRequestVote
發送下一個心跳包的時間戳。 -
boolean needIncreaseTermImmediately
是否應該立即發起投票。 -
int minVoteIntervalMs
最小的發送投票間隔時間,默認爲 300ms。 -
int maxVoteIntervalMs
最大的發送投票的間隔,默認爲 1000ms。 -
List roleChangeHandlers
註冊的節點狀態處理器,通過 addRoleChangeHandler 方法添加。 -
long lastVoteCost
上一次投票的開銷。 -
StateMaintainer stateMaintainer
狀態機管理器。
2.2 啓動選舉狀態管理器
通過 DLedgerLeaderElector 的 startup 方法啓動狀態管理機,代碼如下:
DLedgerLeaderElector#startup
public void startup() {
stateMaintainer.start(); // @1
for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) { // @2
roleChangeHandler.startup();
}
}
代碼 @1:啓動狀態維護管理器。
代碼 @2:遍歷狀態改變監聽器並啓動它,可通過 DLedgerLeaderElector 的 addRoleChangeHandler 方法增加狀態變化監聽器。
其中的是啓動狀態管理器線程,其 run 方法實現:
public void run() {
while (running.get()) {
try {
doWork();
} catch (Throwable t) {
if (logger != null) {
logger.error("Unexpected Error in running {} ", getName(), t);
}
}
}
latch.countDown();
}
從上面來看,主要是循環調用 doWork 方法,接下來重點看其 doWork 的實現:
public void doWork() {
try {
if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) { // @1
DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig); // @2
DLedgerLeaderElector.this.maintainState(); // @3
}
sleep(10); // @4
} catch (Throwable t) {
DLedgerLeaderElector.logger.error("Error in heartbeat", t);
}
}
代碼 @1:如果該節點參與 Leader 選舉,則首先調用 @2 重置定時器,然後驅動狀態機 (@3),是接下來重點需要剖析的。
代碼 @4:每執行一次選主,休息 10ms。
DLedgerLeaderElector#maintainState
private void maintainState() throws Exception {
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
maintainAsFollower();
} else {
maintainAsCandidate();
}
}
根據當前的狀態機狀態,執行對應的操作,從 raft 協議中可知,總共存在 3 種狀態:
-
leader
領導者,主節點,該狀態下,需要定時向從節點發送心跳包,用來傳播數據、確保其領導地位。 -
follower
從節點,該狀態下,會開啓定時器,嘗試進入到 candidate 狀態,以便發起投票選舉,同時一旦收到主節點的心跳包,則重置定時器。 -
candidate
候選者,該狀態下的節點會發起投票,嘗試選擇自己爲主節點,選舉成功後,不會存在該狀態下的節點。
我們在繼續往下看之前,需要知道 memberState 的初始值是什麼?我們追溯到創建 MemberState 的地方,發現其初始狀態爲 CANDIDATE。那我們接下從 maintainAsCandidate 方法開始跟進。
溫馨提示:在 raft 協議中,節點的狀態默認爲 follower,DLedger 的實現從 candidate 開始,一開始,集羣內的所有節點都會嘗試發起投票,這樣第一輪要達成選舉幾乎不太可能。
2.3 選舉狀態機狀態流轉
整個狀態機的驅動,由線程反覆執行 maintainState 方法。下面重點來分析其狀態的驅動。
2.3.1 maintainAsCandidate 方法
DLedgerLeaderElector#maintainAsCandidate
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;
}
long term;
long ledgerEndTerm;
long ledgerEndIndex;
Step1:首先先介紹幾個變量的含義:
-
nextTimeToRequestVote
下一次發起的投票的時間,如果當前時間小於該值,說明計時器未過期,此時無需發起投票。 -
needIncreaseTermImmediately
是否應該立即發起投票。如果爲 true,則忽略計時器,該值默認爲 false,當收到從主節點的心跳包並且當前狀態機的輪次大於主節點的輪次,說明集羣中 Leade r 的投票輪次小於從當前收到請求節點的投票輪次,應該立即發起新的投票。 -
term
投票輪次。 -
ledgerEndTerm
Leader 節點當前的投票輪次。 -
ledgerEndIndex
當前日誌的最大序列,即下一條日誌的開始 index,在日誌複製部分會詳細介紹。
DLedgerLeaderElector#maintainAsCandidate
synchronized (memberState) {
if (!memberState.isCandidate()) {
return;
}
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
long prevTerm = memberState.currTerm();
term = memberState.nextTerm();
logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
}
ledgerEndIndex = memberState.getLedgerEndIndex();
ledgerEndTerm = memberState.getLedgerEndTerm();
}
Step2:初始化 team、ledgerEndIndex 、ledgerEndTerm 屬性,其實現關鍵點如下:
-
如果上一次的投票結果爲待下一次投票或應該立即開啓投票,並且根據當前狀態機獲取下一輪的投票輪次,稍後會着重講解一下狀態機輪次的維護機制。
-
如果上一次的投票結果不是 WAIT_TO_VOTE_NEXT(等待下一輪投票),則投票輪次依然爲狀態機內部維護的輪次。
DLedgerLeaderElector#maintainAsCandidate
if (needIncreaseTermImmediately) {
nextTimeToRequestVote = getNextTimeToRequestVote();
needIncreaseTermImmediately = false;
return;
}
Step3:如果 needIncreaseTermImmediately 爲 true,則重置該標記位爲 false,並重新設置下一次投票超時時間,其實現代碼如下:
private long getNextTimeToRequestVote() {
return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs);
}
下一次倒計時:當前時間戳 + 上次投票的開銷 + 最小投票間隔 (300ms) + (1000- 300 )之間的隨機值。
final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
Step4:向集羣內的其他節點發起投票請求,並返回投票結果列表,稍後會重點分析其投票過程。可以預見,接下來就是根據各投票結果進行仲裁。
final AtomicLong knownMaxTermInGroup = new AtomicLong(-1);
final AtomicInteger allNum = new AtomicInteger(0);
final AtomicInteger validNum = new AtomicInteger(0);
final AtomicInteger acceptedNum = new AtomicInteger(0);
final AtomicInteger notReadyTermNum = new AtomicInteger(0);
final AtomicInteger biggerLedgerNum = new AtomicInteger(0);
final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
Step5:在進行投票結果仲裁之前,先來介紹幾個局部變量的含義:
-
knownMaxTermInGroup
已知的最大投票輪次。 -
allNum
所有投票票數。 -
validNum
有效投票數。 -
acceptedNum
獲得的投票數。 -
notReadyTermNum
未準備投票的節點數量,如果對端節點的投票輪次小於發起投票的輪次,則認爲對端未準備好,對端節點使用本次的輪次進入 Candidate 狀態。 -
biggerLedgerNum
發起投票的節點的 ledgerEndTerm 小於對端節點的個數。 -
alreadyHasLeader
是否已經存在 Leader。
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
// 省略部分代碼
}
Step5:遍歷投票結果,收集投票結果,接下來重點看其內部實現。
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
validNum.incrementAndGet();
}
Step6:如果投票結果不是 UNKNOW,則有效投票數量增 1。
synchronized (knownMaxTermInGroup) {
switch (x.getVoteResult()) {
case ACCEPT:
acceptedNum.incrementAndGet();
break;
case REJECT_ALREADY_VOTED:
break;
case REJECT_ALREADY_HAS_LEADER:
alreadyHasLeader.compareAndSet(false, true);
break;
case REJECT_TERM_SMALL_THAN_LEDGER:
case REJECT_EXPIRED_VOTE_TERM:
if (x.getTerm() > knownMaxTermInGroup.get()) {
knownMaxTermInGroup.set(x.getTerm());
}
break;
case REJECT_EXPIRED_LEDGER_TERM:
case REJECT_SMALL_LEDGER_END_INDEX:
biggerLedgerNum.incrementAndGet();
break;
case REJECT_TERM_NOT_READY:
notReadyTermNum.incrementAndGet();
break;
default:
break;
}
}
Step7:統計投票結構,幾個關鍵點如下:
-
ACCEPT
贊成票,acceptedNum 加一,只有得到的贊成票超過集羣節點數量的一半才能成爲 Leader。 -
REJECT_ALREADY_VOTED
拒絕票,原因是已經投了其他節點的票。 -
REJECT_ALREADY_HAS_LEADER
拒絕票,原因是因爲集羣中已經存在 Leaer 了。alreadyHasLeader 設置爲 true,無需在判斷其他投票結果了,結束本輪投票。 -
REJECT_TERM_SMALL_THAN_LEDGER
拒絕票,如果自己維護的 term 小於遠端維護的 ledgerEndTerm,則返回該結果,如果對端的 team 大於自己的 team,需要記錄對端最大的投票輪次,以便更新自己的投票輪次。 -
REJECT_EXPIRED_VOTE_TERM
拒絕票,如果自己維護的 term 小於遠端維護的 term,更新自己維護的投票輪次。 -
REJECT_EXPIRED_LEDGER_TERM
拒絕票,如果自己維護的 ledgerTerm 小於對端維護的 ledgerTerm,則返回該結果。如果是此種情況,增加計數器 biggerLedgerNum 的值。 -
REJECT_SMALL_LEDGER_END_INDEX
拒絕票,如果對端的 ledgerTeam 與自己維護的 ledgerTeam 相等,但是自己維護的 dedgerEndIndex 小於對端維護的值,返回該值,增加 biggerLedgerNum 計數器的值。 -
REJECT_TERM_NOT_READY
拒絕票,對端的投票輪次小於自己的 team,則認爲對端還未準備好投票,對端使用自己的投票輪次,是自己進入到 Candidate 狀態。
try {
voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
}
Step8:等待收集投票結果,並設置超時時間。
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
VoteResponse.ParseResult parseResult;
if (knownMaxTermInGroup.get() > term) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
changeRoleToCandidate(knownMaxTermInGroup.get());
} else if (alreadyHasLeader.get()) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
} else if (!memberState.isQuorum(validNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else if (memberState.isQuorum(acceptedNum.get())) {
parseResult = VoteResponse.ParseResult.PASSED;
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
} else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
}
Step9:根據收集的投票結果判斷是否能成爲 Leader。
溫馨提示:在講解關鍵點之前,我們先定義先將(當前時間戳 + 上次投票的開銷 + 最小投票間隔 (300ms) + (1000- 300 )之間的隨機值)定義爲 “ 1 個常規計時器”。
其關鍵點如下:
-
如果對端的投票輪次大於發起投票的節點,則該節點使用對端的輪次,重新進入到 Candidate 狀態,並且重置投票計時器,其值爲 “1 個常規計時器”
-
如果已經存在 Leader,該節點重新進入到 Candidate, 並重置定時器,該定時器的時間:“1 個常規計時器” + heartBeatTimeIntervalMs * maxHeartBeatLeak ,其中 heartBeatTimeIntervalMs 爲一次心跳間隔時間,maxHeartBeatLeak 爲允許最大丟失的心跳包,即如果 Flower 節點在多少個心跳週期內未收到心跳包,則認爲 Leader 已下線。
-
如果收到的有效票數未超過半數,則重置計時器爲 “1 個常規計時器”,然後等待重新投票,注意狀態爲 WAIT_TO_REVOTE,該狀態下的特徵是下次投票時不增加投票輪次。
-
如果得到的贊同票超過半數,則成爲 Leader。
-
如果得到的贊成票加上未準備投票的節點數超過半數,則應該立即發起投票,故其結果爲 REVOTE_IMMEDIATELY。
-
如果得到的贊成票加上對端維護的 ledgerEndIndex 超過半數,則重置計時器,繼續本輪次的選舉。
-
其他情況,開啓下一輪投票。
if (parseResult == VoteResponse.ParseResult.PASSED) {
logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);
changeRoleToLeader(term);
}
Step10:如果投票成功,則狀態機狀態設置爲 Leader,然後狀態管理在驅動狀態時會調用 DLedgerLeaderElector#maintainState 時,將進入到 maintainAsLeader 方法。
2.3.2 maintainAsLeader 方法
經過 maintainAsCandidate 投票選舉後,被其他節點選舉成爲領導後,會執行該方法,其他節點的狀態還是 Candidate,並在計時器過期後,又嘗試去發起選舉。接下來重點分析成爲 Leader 節點後,該節點會做些什麼?
DLedgerLeaderElector#maintainAsLeader
private void maintainAsLeader() throws Exception {
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { // @1
long term;
String leaderId;
synchronized (memberState) {
if (!memberState.isLeader()) { // @2
//stop sending
return;
}
term = memberState.currTerm();
leaderId = memberState.getLeaderId();
lastSendHeartBeatTime = System.currentTimeMillis(); // @3
}
sendHeartbeats(term, leaderId); // @4
}
}
代碼 @1:首先判斷上一次發送心跳的時間與當前時間的差值是否大於心跳包發送間隔,如果超過,則說明需要發送心跳包。
代碼 @2:如果當前不是 leader 節點,則直接返回,主要是爲了二次判斷。
代碼 @3:重置心跳包發送計時器。
代碼 @4:向集羣內的所有節點發送心跳包,稍後會詳細介紹心跳包的發送。
2.3.3 maintainAsFollower 方法
當 Candidate 狀態的節點在收到主節點發送的心跳包後,會將狀態變更爲 follower,那我們先來看一下在 follower 狀態下,節點會做些什麼事情?
private void maintainAsFollower() {
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {
synchronized (memberState) {
if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
changeRoleToCandidate(memberState.currTerm());
}
}
}
}
如果 maxHeartBeatLeak (默認爲 3) 個心跳包週期內未收到心跳,則將狀態變更爲 Candidate。
狀態機的驅動就介紹到這裏,在上面的流程中,其實我們忽略了兩個重要的過程,一個是發起投票請求與投票請求響應、發送心跳包與心跳包響應,那我們接下來將重點介紹這兩個過程。
2.4 投票與投票請求
節點的狀態爲 Candidate 時會向集羣內的其他節點發起投票請求 (個人覺得理解爲拉票更好),向對方詢問是否願意選舉我爲 Leader,對端節點會根據自己的情況對其投贊成票、拒絕票,如果是拒絕票,還會給出拒絕原因,具體由 voteForQuorumResponses、handleVote 這兩個方法來實現,接下來我們分別對這兩個方法進行詳細分析。
2.4.1 voteForQuorumResponses
發起投票請求。
private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm,
long ledgerEndIndex) throws Exception { // @1
List<CompletableFuture<VoteResponse>> responses = new ArrayList<>();
for (String id : memberState.getPeerMap().keySet()) { // @2
VoteRequest voteRequest = new VoteRequest(); // @3 start
voteRequest.setGroup(memberState.getGroup());
voteRequest.setLedgerEndIndex(ledgerEndIndex);
voteRequest.setLedgerEndTerm(ledgerEndTerm);
voteRequest.setLeaderId(memberState.getSelfId());
voteRequest.setTerm(term);
voteRequest.setRemoteId(id);
CompletableFuture<VoteResponse> voteResponse; // @3 end
if (memberState.getSelfId().equals(id)) { // @4
voteResponse = handleVote(voteRequest, true);
} else {
//async
voteResponse = dLedgerRpcService.vote(voteRequest); // @5
}
responses.add(voteResponse);
}
return responses;
}
代碼 @1:首先先解釋一下參數的含義:
-
long term
發起投票的節點當前的投票輪次。 -
long ledgerEndTerm
發起投票節點維護的已知的最大投票輪次。 -
long ledgerEndIndex
發起投票節點維護的已知的最大日誌條目索引。
代碼 @2:遍歷集羣內的節點集合,準備異步發起投票請求。這個集合在啓動的時候指定,不能修改。
代碼 @3:構建投票請求。
代碼 @4:如果是發送給自己的,則直接調用 handleVote 進行投票請求響應,如果是發送給集羣內的其他節點,則通過網絡發送投票請求,對端節點調用各自的 handleVote 對集羣進行響應。
接下來重點關注 handleVote 方法,重點探討其投票處理邏輯。
2.4.2 handleVote 方法
由於 handleVote 方法會併發被調用,因爲可能同時收到多個節點的投票請求,故本方法都被 synchronized 方法包含,鎖定的對象爲狀態機 memberState 對象。
if (!memberState.isPeerMember(request.getLeaderId())) {
logger.warn("[BUG] [HandleVote] remoteId={} is an unknown member", request.getLeaderId());
return CompletableFuture.completedFuture(newVoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER));
}
if (!self && memberState.getSelfId().equals(request.getLeaderId())) {
logger.warn("[BUG] [HandleVote] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));
}
Step1:爲了邏輯的完整性對其請求進行檢驗,除非有 BUG 存在,否則是不會出現上述問題的。
if (request.getTerm() < memberState.currTerm()) { // @1
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
} else if (request.getTerm() == memberState.currTerm()) { // @2
if (memberState.currVoteFor() == null) {
//let it go
} else if (memberState.currVoteFor().equals(request.getLeaderId())) {
//repeat just let it go
} else {
if (memberState.getLeaderId() != null) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER));
} else {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
}
}
} else { // @3
//stepped down by larger term
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//only can handleVote when the term is consistent
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
}
Step2:判斷髮起節點、響應節點維護的 team 進行投票 “仲裁”,分如下 3 種情況討論:
-
如果發起投票節點的 term 小於當前節點的 term。
此種情況下投拒絕票,也就是說在 raft 協議的世界中,誰的 term 越大,越有話語權。 -
如果發起投票節點的 term 等於當前節點的 term
如果兩者的 term 相等,說明兩者都處在同一個投票輪次中,地位平等,接下來看該節點是否已經投過票。
-
如果未投票、或已投票給請求節點,則繼續後面的邏輯(請看 step3)。
-
如果該節點已存在的 Leader 節點,則拒絕並告知已存在 Leader 節點。
-
如果該節點還未有 Leader 節點,但已經投了其他節點的票,則拒絕請求節點,並告知已投票。
-
如果發起投票節點的 term 大於當前節點的 term。
拒絕請求節點的投票請求,並告知自身還未準備投票,自身會使用請求節點的投票輪次立即進入到 Candidate 狀態。
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
} else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
}
if (request.getTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
}
Step3:判斷請求節點的 ledgerEndTerm 與當前節點的 ledgerEndTerm,這裏主要是判斷日誌的複製進度。
-
如果請求節點的 ledgerEndTerm 小於當前節點的 ledgerEndTerm 則拒絕,其原因是請求節點的日誌複製進度比當前節點低,這種情況是不能成爲主節點的。
-
如果 ledgerEndTerm 相等,但是 ledgerEndIndex 比當前節點小,則拒絕,原因與上一條相同。
-
如果請求的 term 小於 ledgerEndTerm 以同樣的理由拒絕。
memberState.setCurrVoteFor(request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
Step4:經過層層條件帥選,將寶貴的贊成票投給請求節點。
經過幾輪投票,最終一個節點能成功被推舉出來,選爲主節點。主節點爲了維持其領導地位,需要定時向從節點發送心跳包,接下來我們重點看一下心跳包的發送與響應。
2.5 心跳包與心跳包響應
2.5.1 sendHeartbeats
Step1:遍歷集羣中的節點,異步發送心跳包。
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
try {
if (ex != null) {
throw ex;
}
switch (DLedgerResponseCode.valueOf(x.getCode())) {
case SUCCESS:
succNum.incrementAndGet();
break;
case EXPIRED_TERM:
maxTerm.set(x.getTerm());
break;
case INCONSISTENT_LEADER:
inconsistLeader.compareAndSet(false, true);
break;
case TERM_NOT_READY:
notReadyNum.incrementAndGet();
break;
default:
break;
}
if (memberState.isQuorum(succNum.get())
|| memberState.isQuorum(succNum.get() + notReadyNum.get())) {
beatLatch.countDown();
}
} catch (Throwable t) {
logger.error("Parse heartbeat response failed", t);
} finally {
allNum.incrementAndGet();
if (allNum.get() == memberState.peerSize()) {
beatLatch.countDown();
}
}
});
}
Step2:統計心跳包發送響應結果,關鍵點如下:
-
SUCCESS
心跳包成功響應。 -
EXPIRED_TERM
主節點的投票 term 小於從節點的投票輪次。 -
INCONSISTENT_LEADER
從節點已經有了新的主節點。 -
TERM_NOT_READY
從節點未準備好。
這些響應值,我們在處理心跳包時重點探討。
beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);
if (memberState.isQuorum(succNum.get())) { // @1
lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
if (memberState.isQuorum(succNum.get() + notReadyNum.get())) { // @2
lastSendHeartBeatTime = -1;
} else if (maxTerm.get() > term) { // @3
changeRoleToCandidate(maxTerm.get());
} else if (inconsistLeader.get()) { // @4
changeRoleToCandidate(term);
} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) {
changeRoleToCandidate(term);
}
}
對收集的響應結果做仲裁,其實現關鍵點:
-
如果成功的票數大於集羣內的半數,則表示集羣狀態正常,正常按照心跳包間隔發送心跳包 (見代碼 @1)。
-
如果成功的票數加上未準備的投票的節點數量超過集羣內的半數,則立即發送心跳包 (見代碼 @2)。
-
如果從節點的投票輪次比主節點的大,則使用從節點的投票輪次,或從節點已經有了另外的主節點,節點狀態從 Leader 轉換爲 Candidate。
接下來我們重點看一下心跳包的處理邏輯。
2.5.2 handleHeartBeat
if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) {
if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
}
}
Step1:如果主節點的 term 小於 從節點的 term,發送反饋給主節點,告知主節點的 term 已過時;如果投票輪次相同,並且發送心跳包的節點是該節點的主節點,則返回成功。
下面重點討論主節點的 term 大於從節點的情況。
synchronized (memberState) {
if (request.getTerm() < memberState.currTerm()) { // @1
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) { // @2
if (memberState.getLeaderId() == null) {
changeRoleToFollower(request.getTerm(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else {
//this should not happen, but if happened
logger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));
}
} else {
//To make it simple, for larger term, do not change to follower immediately
//first change to candidate, and notify the state-maintainer thread
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//TOOD notify
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
}
}
Step2:加鎖來處理(這裏更多的是從節點第一次收到主節點的心跳包)
代碼 @1:如果主節的投票輪次小於當前投票輪次,則返回主節點投票輪次過期。
代碼 @2:如果投票輪次相同。
-
如果當前節點的主節點字段爲空,則使用主節點的 ID,並返回成功。
-
如果當前節點的主節點就是發送心跳包的節點,則更新上一次收到心跳包的時間戳,並返回成功。
-
如果從節點的主節點與發送心跳包的節點 ID 不同,說明有另外一個 Leaer,按道理來說是不會發送的,如果發生,則返回已存在 - 主節點,標記該心跳包處理結束。
代碼 @3:如果主節點的投票輪次大於從節點的投票輪次,則認爲從節點併爲準備好,則從節點進入 Candidate 狀態,並立即發起一次投票。
心跳包的處理就介紹到這裏。
RocketMQ 多副本之 Leader 選舉的源碼分析就介紹到這裏了,爲了加強對源碼的理解,先梳理流程圖如下:
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/wCYH7y2m_Oqcpt_vGNAU1A