源碼分析 RocketMQ 多副本之 Leader 選主

本文將按照 《RocketMQ 多副本前置篇:初探 raft 協議》 的思路來學習 RocketMQ 選主邏輯。首先先回顧一下關於 Leader 的一些思考:

  1. 節點狀態
    需要引入 3 種節點狀態:Follower(跟隨者)、Candidate(候選者),該狀態下的節點總是會嘗試發起投票,Leader(主節點)。

  2. 選舉計時器
    Follower、Candidate 兩個狀態時,需要維護一個定時器,每次定時時間從 150ms-300ms  之間進行隨機,即每個節點的定時過期不一樣,Follower 狀態時,定時器到點後,觸發一輪投票。節點在收到投票請求、Leader 的心跳請求並作出響應後,需要重置定時器。

  3. 投票輪次 Team
    Candidate 狀態的節點,每發起一輪投票,Team 加一。

  4. 投票機制
    每一輪一個節點只能爲一個節點投贊成票,例如節點 A 中維護的輪次爲 3,並且已經爲節點 B 投了贊成票,如果收到其他節點,投票輪次爲 3,則會投反對票,如果收到輪次爲 4 的節點,是又可以投贊成票的。

  5. 成爲 Leader 的條件
    必須得到集羣中初始數量的大多數,例如如果集羣中有 3 臺集羣,則必須得到兩票,如果其中一臺服務器宕機,剩下的兩個節點,還能進行選主嗎?答案是可以的,因爲可以得到 2 票,超過初始集羣中 3 的一半,所以通常集羣中的機器各位儘量爲奇數,因爲 4 臺的可用性與 3 臺一樣。

溫馨提示:本文是從源碼的角度分析 DLedger 選主實現原理,可能比較枯燥,文末給出了選主流程圖。

DLedger 關於選主的核心類圖


1.1 DLedgerConfig

多副本模塊相關的配置信息,例如集羣節點信息。

1.2 MemberState

節點狀態機,即 raft 協議中的 follower、candidate、leader 三種狀態的狀態機實現。

1.3 raft 協議相關

1.3.1 DLedgerClientProtocol

DLedger 客戶端協議,主要定義如下三個方法,在後面的日誌複製部分會重點闡述。

1.3.2 DLedgerProtocol

DLedger 服務端協議,主要定義如下三個方法。

1.3.3 協議處理 Handler

DLedgerClientProtocolHandler、DLedgerProtocolHander 協議處理器。

1.4 DLedgerRpcService

DLedger Server(節點) 之間的網絡通信,默認基於 Netty 實現,其實現類爲:DLedgerRpcNettyService。

1.5 DLedgerLeaderElector

Leader 選舉實現器。

1.6 DLedgerServer

Dledger Server,Dledger 節點的封裝類。

接下來將從 DLedgerLeaderElector 開始剖析 DLedger 是如何實現 Leader 選舉的。(基於 raft 協議)。

源碼分析 Leader 選舉


2.1 DLedgerLeaderElector 類圖

我們先一一來介紹其屬性的含義:

2.2 啓動選舉狀態管理器

通過 DLedgerLeaderElector 的 startup 方法啓動狀態管理機,代碼如下:
DLedgerLeaderElector#startup

public void startup() {
    stateMaintainer.start();   // @1
    for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {   // @2
        roleChangeHandler.startup();
    }
}

代碼 @1:啓動狀態維護管理器。

代碼 @2:遍歷狀態改變監聽器並啓動它,可通過 DLedgerLeaderElector 的 addRoleChangeHandler 方法增加狀態變化監聽器。

其中的是啓動狀態管理器線程,其 run 方法實現:

public void run() {
    while (running.get()) {
        try {
            doWork();    
        } catch (Throwable t) {
            if (logger != null) {
                logger.error("Unexpected Error in running {} ", getName(), t);
            }
        }
    }
    latch.countDown();
}

從上面來看,主要是循環調用 doWork 方法,接下來重點看其 doWork 的實現:

public void doWork() {
    try {
        if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {   // @1
            DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);                 // @2
            DLedgerLeaderElector.this.maintainState();                                           // @3
        }
        sleep(10);                                                                                                    // @4
    } catch (Throwable t) {
        DLedgerLeaderElector.logger.error("Error in heartbeat", t);
    }
}

代碼 @1:如果該節點參與 Leader 選舉,則首先調用 @2 重置定時器,然後驅動狀態機 (@3),是接下來重點需要剖析的。

代碼 @4:每執行一次選主,休息 10ms。

DLedgerLeaderElector#maintainState

private void maintainState() throws Exception {
    if (memberState.isLeader()) {  
        maintainAsLeader();
    } else if (memberState.isFollower()) {
        maintainAsFollower();
    } else {
        maintainAsCandidate();
    }
}

根據當前的狀態機狀態,執行對應的操作,從 raft 協議中可知,總共存在 3 種狀態:

我們在繼續往下看之前,需要知道 memberState 的初始值是什麼?我們追溯到創建 MemberState 的地方,發現其初始狀態爲 CANDIDATE。那我們接下從 maintainAsCandidate 方法開始跟進。

溫馨提示:在 raft 協議中,節點的狀態默認爲 follower,DLedger 的實現從 candidate 開始,一開始,集羣內的所有節點都會嘗試發起投票,這樣第一輪要達成選舉幾乎不太可能。

2.3 選舉狀態機狀態流轉

整個狀態機的驅動,由線程反覆執行 maintainState 方法。下面重點來分析其狀態的驅動。

2.3.1  maintainAsCandidate 方法

DLedgerLeaderElector#maintainAsCandidate

if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
    return;
}
long term;
long ledgerEndTerm;
long ledgerEndIndex;

Step1:首先先介紹幾個變量的含義:

DLedgerLeaderElector#maintainAsCandidate

synchronized (memberState) {
    if (!memberState.isCandidate()) {
        return;
    }
    if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
        long prevTerm = memberState.currTerm();
        term = memberState.nextTerm();
        logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
        lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
    } else {
        term = memberState.currTerm();
    }
    ledgerEndIndex = memberState.getLedgerEndIndex();
    ledgerEndTerm = memberState.getLedgerEndTerm();
}

Step2:初始化 team、ledgerEndIndex 、ledgerEndTerm 屬性,其實現關鍵點如下:

DLedgerLeaderElector#maintainAsCandidate

if (needIncreaseTermImmediately) {
    nextTimeToRequestVote = getNextTimeToRequestVote();
    needIncreaseTermImmediately = false;
    return;
}

Step3:如果 needIncreaseTermImmediately 爲 true,則重置該標記位爲 false,並重新設置下一次投票超時時間,其實現代碼如下:

private long getNextTimeToRequestVote() {
    return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs);
}

下一次倒計時:當前時間戳 + 上次投票的開銷 + 最小投票間隔 (300ms) +  (1000- 300 )之間的隨機值。

final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);

Step4:向集羣內的其他節點發起投票請求,並返回投票結果列表,稍後會重點分析其投票過程。可以預見,接下來就是根據各投票結果進行仲裁。

final AtomicLong knownMaxTermInGroup = new AtomicLong(-1);
final AtomicInteger allNum = new AtomicInteger(0);
final AtomicInteger validNum = new AtomicInteger(0);
final AtomicInteger acceptedNum = new AtomicInteger(0);
final AtomicInteger notReadyTermNum = new AtomicInteger(0);
final AtomicInteger biggerLedgerNum = new AtomicInteger(0);
final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);

Step5:在進行投票結果仲裁之前,先來介紹幾個局部變量的含義:

for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
   // 省略部分代碼
}

Step5:遍歷投票結果,收集投票結果,接下來重點看其內部實現。

if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
    validNum.incrementAndGet();
}

Step6:如果投票結果不是 UNKNOW,則有效投票數量增 1。

synchronized (knownMaxTermInGroup) {
    switch (x.getVoteResult()) {
        case ACCEPT:
            acceptedNum.incrementAndGet();
            break;
        case REJECT_ALREADY_VOTED:
            break;
        case REJECT_ALREADY_HAS_LEADER:
            alreadyHasLeader.compareAndSet(false, true);
            break;
        case REJECT_TERM_SMALL_THAN_LEDGER:
        case REJECT_EXPIRED_VOTE_TERM:
            if (x.getTerm() > knownMaxTermInGroup.get()) {
                knownMaxTermInGroup.set(x.getTerm());
            }
            break;
        case REJECT_EXPIRED_LEDGER_TERM:
        case REJECT_SMALL_LEDGER_END_INDEX:
            biggerLedgerNum.incrementAndGet();
            break;
        case REJECT_TERM_NOT_READY:
            notReadyTermNum.incrementAndGet();
            break;
        default:
            break;
    }
}

Step7:統計投票結構,幾個關鍵點如下:

try {
    voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
}

Step8:等待收集投票結果,並設置超時時間。

lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
VoteResponse.ParseResult parseResult;
if (knownMaxTermInGroup.get() > term) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
    nextTimeToRequestVote = getNextTimeToRequestVote();
    changeRoleToCandidate(knownMaxTermInGroup.get());
} else if (alreadyHasLeader.get()) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
    nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
} else if (!memberState.isQuorum(validNum.get())) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
    nextTimeToRequestVote = getNextTimeToRequestVote();
} else if (memberState.isQuorum(acceptedNum.get())) {
    parseResult = VoteResponse.ParseResult.PASSED;
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
    parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
} else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) {
    parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
    nextTimeToRequestVote = getNextTimeToRequestVote();
} else {
    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
    nextTimeToRequestVote = getNextTimeToRequestVote();
}

Step9:根據收集的投票結果判斷是否能成爲 Leader。

溫馨提示:在講解關鍵點之前,我們先定義先將(當前時間戳 + 上次投票的開銷 + 最小投票間隔 (300ms) +  (1000- 300 )之間的隨機值)定義爲 “ 1 個常規計時器”。

其關鍵點如下:

if (parseResult == VoteResponse.ParseResult.PASSED) {
    logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);
    changeRoleToLeader(term);
}

Step10:如果投票成功,則狀態機狀態設置爲 Leader,然後狀態管理在驅動狀態時會調用 DLedgerLeaderElector#maintainState 時,將進入到 maintainAsLeader 方法。

2.3.2  maintainAsLeader 方法

經過 maintainAsCandidate 投票選舉後,被其他節點選舉成爲領導後,會執行該方法,其他節點的狀態還是 Candidate,並在計時器過期後,又嘗試去發起選舉。接下來重點分析成爲 Leader 節點後,該節點會做些什麼?

DLedgerLeaderElector#maintainAsLeader

private void maintainAsLeader() throws Exception {
    if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {  // @1
        long term;
        String leaderId;
        synchronized (memberState) {
            if (!memberState.isLeader()) {     // @2
                //stop sending
                return;
            }
            term = memberState.currTerm();
            leaderId = memberState.getLeaderId();
            lastSendHeartBeatTime = System.currentTimeMillis();    // @3
        }
        sendHeartbeats(term, leaderId);    // @4
    }
}

代碼 @1:首先判斷上一次發送心跳的時間與當前時間的差值是否大於心跳包發送間隔,如果超過,則說明需要發送心跳包。

代碼 @2:如果當前不是 leader 節點,則直接返回,主要是爲了二次判斷。

代碼 @3:重置心跳包發送計時器。

代碼 @4:向集羣內的所有節點發送心跳包,稍後會詳細介紹心跳包的發送。

2.3.3  maintainAsFollower 方法

當 Candidate 狀態的節點在收到主節點發送的心跳包後,會將狀態變更爲 follower,那我們先來看一下在 follower 狀態下,節點會做些什麼事情?

private void maintainAsFollower() {
    if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {   
        synchronized (memberState) {
            if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
                logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
                changeRoleToCandidate(memberState.currTerm());
            }
        }
    }
}

如果 maxHeartBeatLeak (默認爲 3) 個心跳包週期內未收到心跳,則將狀態變更爲 Candidate。

狀態機的驅動就介紹到這裏,在上面的流程中,其實我們忽略了兩個重要的過程,一個是發起投票請求與投票請求響應、發送心跳包與心跳包響應,那我們接下來將重點介紹這兩個過程。

2.4 投票與投票請求

節點的狀態爲 Candidate 時會向集羣內的其他節點發起投票請求 (個人覺得理解爲拉票更好),向對方詢問是否願意選舉我爲 Leader,對端節點會根據自己的情況對其投贊成票、拒絕票,如果是拒絕票,還會給出拒絕原因,具體由 voteForQuorumResponses、handleVote 這兩個方法來實現,接下來我們分別對這兩個方法進行詳細分析。

2.4.1 voteForQuorumResponses

發起投票請求。

private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm,
    long ledgerEndIndex) throws Exception {   // @1
    List<CompletableFuture<VoteResponse>> responses = new ArrayList<>();
    for (String id : memberState.getPeerMap().keySet()) {               // @2
        VoteRequest voteRequest = new VoteRequest();                  // @3 start
        voteRequest.setGroup(memberState.getGroup());
        voteRequest.setLedgerEndIndex(ledgerEndIndex);
        voteRequest.setLedgerEndTerm(ledgerEndTerm);
        voteRequest.setLeaderId(memberState.getSelfId());
        voteRequest.setTerm(term);
        voteRequest.setRemoteId(id);
        CompletableFuture<VoteResponse> voteResponse;          // @3 end
        if (memberState.getSelfId().equals(id)) {                             // @4
            voteResponse = handleVote(voteRequest, true);
        } else {
            //async
            voteResponse = dLedgerRpcService.vote(voteRequest);  // @5
        }
        responses.add(voteResponse);
    }
    return responses;
}

代碼 @1:首先先解釋一下參數的含義:

代碼 @2:遍歷集羣內的節點集合,準備異步發起投票請求。這個集合在啓動的時候指定,不能修改。

代碼 @3:構建投票請求。

代碼 @4:如果是發送給自己的,則直接調用 handleVote 進行投票請求響應,如果是發送給集羣內的其他節點,則通過網絡發送投票請求,對端節點調用各自的 handleVote 對集羣進行響應。

接下來重點關注 handleVote 方法,重點探討其投票處理邏輯。

2.4.2 handleVote 方法

由於 handleVote 方法會併發被調用,因爲可能同時收到多個節點的投票請求,故本方法都被 synchronized 方法包含,鎖定的對象爲狀態機 memberState 對象。

if (!memberState.isPeerMember(request.getLeaderId())) {
    logger.warn("[BUG] [HandleVote] remoteId={} is an unknown member", request.getLeaderId());
    return CompletableFuture.completedFuture(newVoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER));
}
if (!self && memberState.getSelfId().equals(request.getLeaderId())) {
    logger.warn("[BUG] [HandleVote] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId());
    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));
}

Step1:爲了邏輯的完整性對其請求進行檢驗,除非有 BUG 存在,否則是不會出現上述問題的。

if (request.getTerm() < memberState.currTerm()) {    // @1
    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
} else if (request.getTerm() == memberState.currTerm()) {   // @2
    if (memberState.currVoteFor() == null) {
        //let it go
    } else if (memberState.currVoteFor().equals(request.getLeaderId())) {
         //repeat just let it go
    } else {
        if (memberState.getLeaderId() != null) {
             return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER));
        } else {
                return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
        }
    }
} else {            // @3
    //stepped down by larger term
    changeRoleToCandidate(request.getTerm());
    needIncreaseTermImmediately = true;
    //only can handleVote when the term is consistent
    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
}

Step2:判斷髮起節點、響應節點維護的 team 進行投票 “仲裁”,分如下 3 種情況討論:

if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
} else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
}

if (request.getTerm() < memberState.getLedgerEndTerm()) {
    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
}

Step3:判斷請求節點的 ledgerEndTerm 與當前節點的 ledgerEndTerm,這裏主要是判斷日誌的複製進度。

memberState.setCurrVoteFor(request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));

Step4:經過層層條件帥選,將寶貴的贊成票投給請求節點。

經過幾輪投票,最終一個節點能成功被推舉出來,選爲主節點。主節點爲了維持其領導地位,需要定時向從節點發送心跳包,接下來我們重點看一下心跳包的發送與響應。

2.5 心跳包與心跳包響應

2.5.1 sendHeartbeats

Step1:遍歷集羣中的節點,異步發送心跳包。

 CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
    future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
        try {

            if (ex != null) {
                throw ex;
            }
            switch (DLedgerResponseCode.valueOf(x.getCode())) {
                case SUCCESS:
                    succNum.incrementAndGet();
                    break;
                case EXPIRED_TERM:
                    maxTerm.set(x.getTerm());
                    break;
                case INCONSISTENT_LEADER:
                    inconsistLeader.compareAndSet(false, true);
                    break;
                case TERM_NOT_READY:
                    notReadyNum.incrementAndGet();
                    break;
                default:
                    break;
            }
            if (memberState.isQuorum(succNum.get())
                || memberState.isQuorum(succNum.get() + notReadyNum.get())) {
                beatLatch.countDown();
            }
        } catch (Throwable t) {
            logger.error("Parse heartbeat response failed", t);
        } finally {
            allNum.incrementAndGet();
            if (allNum.get() == memberState.peerSize()) {
                beatLatch.countDown();
            }
        }
    });
}

Step2:統計心跳包發送響應結果,關鍵點如下:

這些響應值,我們在處理心跳包時重點探討。

beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);
if (memberState.isQuorum(succNum.get())) {   // @1
    lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
    logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
                memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
    if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {    // @2
        lastSendHeartBeatTime = -1;
    } else if (maxTerm.get() > term) {                                                          // @3
        changeRoleToCandidate(maxTerm.get());
    } else if (inconsistLeader.get()) {                                                            // @4
        changeRoleToCandidate(term);
    } else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) {
        changeRoleToCandidate(term);
    }
}

對收集的響應結果做仲裁,其實現關鍵點:

接下來我們重點看一下心跳包的處理邏輯。

2.5.2 handleHeartBeat

if (request.getTerm() < memberState.currTerm()) {
    return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) {
    if (request.getLeaderId().equals(memberState.getLeaderId())) {
        lastLeaderHeartBeatTime = System.currentTimeMillis();
        return CompletableFuture.completedFuture(new HeartBeatResponse());
    }
}

Step1:如果主節點的 term 小於 從節點的 term,發送反饋給主節點,告知主節點的 term 已過時;如果投票輪次相同,並且發送心跳包的節點是該節點的主節點,則返回成功。

下面重點討論主節點的 term 大於從節點的情況。

synchronized (memberState) {
    if (request.getTerm() < memberState.currTerm()) {   // @1
        return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
    } else if (request.getTerm() == memberState.currTerm()) {  // @2
        if (memberState.getLeaderId() == null) {
            changeRoleToFollower(request.getTerm(), request.getLeaderId());
            return CompletableFuture.completedFuture(new HeartBeatResponse());
        } else if (request.getLeaderId().equals(memberState.getLeaderId())) {
            lastLeaderHeartBeatTime = System.currentTimeMillis();
            return CompletableFuture.completedFuture(new HeartBeatResponse());
        } else {
            //this should not happen, but if happened
            logger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId());
            return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));
        }
    } else {
        //To make it simple, for larger term, do not change to follower immediately
        //first change to candidate, and notify the state-maintainer thread
        changeRoleToCandidate(request.getTerm());
        needIncreaseTermImmediately = true;
        //TOOD notify
        return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
    }
}

Step2:加鎖來處理(這裏更多的是從節點第一次收到主節點的心跳包)

代碼 @1:如果主節的投票輪次小於當前投票輪次,則返回主節點投票輪次過期。

代碼 @2:如果投票輪次相同。

代碼 @3:如果主節點的投票輪次大於從節點的投票輪次,則認爲從節點併爲準備好,則從節點進入 Candidate 狀態,並立即發起一次投票。

心跳包的處理就介紹到這裏。

RocketMQ 多副本之 Leader 選舉的源碼分析就介紹到這裏了,爲了加強對源碼的理解,先梳理流程圖如下:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wCYH7y2m_Oqcpt_vGNAU1A