5 張圖帶你理解 RocketMQ 消費者啓動過程

大家好,我是君哥。

今天來分享 RocketMQ 中一個關鍵的知識點,消費者的啓動過程。

多數消息隊列中,消費者和 Broker 通信的方式有兩種,PUSH 模式和 PULL 模式:

注意,RocketMQ 並沒有真正實現 PUSH 模式, RocketMQ 中的 PUSH 模式,本質上也是 PULL 模式,只是消費端封裝了輪詢過程,相當於開啓一個定時線程不停地從 Broker 拉取消息,拉取到消息後喚醒本地業務線程來處理。本文講解 PULL 模式 的啓動過程。涉及到到的啓動過程如下圖:

首先看下面這張圖:

圖中可以看出,消費者需要註冊到 Name Server,拉取消息的時候可以從 Broker 主節點拉取,也可以從 Broker 從節點拉取。

在 RocketMQ 的源碼中,拉模式有兩個消費者相關的類,其中 DefaultMQPullCons umer 類已經被廢棄,官方推薦使用 Defau ltLitePullConsumer 類。下面代碼來自官方示例:

public static void main(String[] args) throws Exception {
    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
    litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    litePullConsumer.subscribe("TopicTest", "*");
    //啓動方法
    litePullConsumer.start();
    try {
        while (running) {
            //這裏可以看到,PULL 模式下消費者需要業務代碼主動去拉取消息
            List<MessageExt> messageExts = litePullConsumer.poll();
            System.out.printf("%s%n", messageExts);
        }
    } finally {
        litePullConsumer.shutdown();
    }
}

上面代碼中消費者屬於消費組 lite_pull _consumer_test,訂閱了【TopicTest 】這個 Topic 下的所有 tag。下面一起看一下啓動方法。下圖是消費者啓動過程中類調用關係圖,圖中心的 pullRequestQueu e 是核心,pull 請求會先發送到這個隊列,然後循環地拉取處理。

檢查啓動配置

消費者啓動時首先會檢查配置,檢查的配置項如下:

這部分源代碼見 DefaultLitePullConsum erImpl#checkConfig。

修改消費者實例名稱

如果是集羣模式,實例名稱改爲【進程 ID + “#” + 系統時間(納秒 )】,代碼如下:

//ClientConfig類
public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    }
}

初始化 MQ 客戶端

創建一個 MQClientInstance 實例,然後把消費者註冊到 MQClientInstance。

private void initMQClientFactory() throws MQClientException {
    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
    boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
    if (!registerOK) {
        this.serviceState = ServiceState.CREATE_JUST;
        throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
            null);
    }
}

初始化負載均衡器

對  RebalanceLitePullImpl 實例初始化,給下面的參數賦值:

負載均衡線程啓動後,默認每 20s 做一次負載均衡,見如下代碼:

//RebalanceService 類
public void run() {
    while (!this.isStopped()) {
        //waitInterval 默認 20s,可以配置
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }
}

初始化 Wrapper

PullAPIWrapper 這個 Wrapper 類是 MQ-ClientInstance 類的 Wrapper 類,類中 pullKernelImpl 方法對  MQClientInstance 類中的 pullMessage 方法進行了裝飾,這個裝飾類主要增加了下面功能:

  1. 獲取 Broker 地址;

  2. 檢查 RocketMQ 版本;

  3. 如果 Broker 是從節點,把 sysFlag 標記偏移量的位改爲 0,(偏移量 0x1);

  4. 封裝請求 header;

  5. 獲取 filterServer 地址(如果消費者是通過 filterServer 從 Broker 拉取消息,這裏隨機獲取一個 filterServer  地址)。

代碼如下 :

//PullAPIWrapper 
public PullResult pullKernelImpl(
    //省略所有參數
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //1.獲取 Broker 地址
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                                                          this.recalculatePullFromWhichNode(mq), false);
    //省略從 Name sever 更新本地 Broker 緩存邏輯

    if (findBrokerResult != null) {
        {
            //2.檢查 RocketMQ 版本
            if (!ExpressionType.isTagType(expressionType)
                && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                                            + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
            }
        }
        int sysFlagInner = sysFlag;

        if (findBrokerResult.isSlave()) {
            //3.把偏移量的位改爲 0,(偏移量 0x1)
            sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
        }
        //4.封裝請求 header
        PullMessageRequestHeader  = new PullMessageRequestHeader();
        //省略封裝 requestHeader

        String brokerAddr = findBrokerResult.getBrokerAddr();
        if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
            //5.獲取 filterServer 地址
            brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
        }

        PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
            brokerAddr,
            requestHeader,
            timeoutMillis,
            communicationMode,
            pullCallback);

        return pullResult;
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

初始化 offset 存儲器

offset 存儲器的 UML 類圖如下:


有兩個實現類分別對應集羣模式和廣播模式,本文討論的集羣模式的實現類是 RemoteBrokerOffsetStore。offset 可以存儲在本地或者遠端服務器。

啓動 MQ 客戶端

啓動 MQ 客戶端主要包括如下步驟:

  1. 把 serviceState 改爲 START_FAIL ED;

  2. 初始化 Netty channel;

  3. 啓動定時任務,包括定時獲取 Name Server 地址、從 Name Server 更新 Topic 路由信息、清理過期的 Broker、向 Broker 發送心跳、持久化 offset、定時調整線程池的數量(源碼裏面這個並沒有實現邏輯);

  4. 啓動拉取消息的線程,拉取線程的邏輯是從請求隊列中不停地取出 pull 請求,然後將請求發送到 Broker 進行拉取消息,代碼如下:

//PullMessageService類
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

從下面的代碼可以看出,PULL 拉取消息最終使用了 DefaultMQPushConsumer Impl,所以 PULL 模式和 PUSH 模式拉取消息的邏輯是一樣的。

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}
  1. 啓動 MessageQueue 負載均衡線程;

  2. 啓動生產者線程;

  3. 把 serviceState 改爲 Running。

源碼參考 MQClientInstance#start。

啓動定時任務

這個定時任務默認每 30s 執行一次,用於監聽每個 Topic 下的 MessageQueue 是否發生變化。代碼見 startScheduleTask 方法。

啓動軌跡消息

軌跡消息主要用於跟蹤消息發送、消息消費的軌跡,用於記錄詳細日誌。代碼如下:

//AsyncTraceDispatcher 類
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

這裏不詳細展開了,後面再單獨討論。

總結

本文通過源碼分析講解了 RocketMQ 中 PULL 模式下的消費者啓動過程,在生產上使用比較多的還是 PUSH 模式,PULL 模式拉取消息的方法跟 PUSH 模式一樣,不同的是 PULL 模式需要應用程序進行拉取動作,可以通過 PULL 模式的學習更容易的理解 PUSH 模式。最後,分析一個 PULL 模式啓動過程涉及的 UML 類圖:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/XlnEvt-4jNG_g2SHx6VNyA