RocketMQ 之消費者啓動與消費流程

‍‍vivo 互聯網服務器團隊 - Li Kui

一、簡介

1.1 RocketMQ 簡介

RocketMQ 是由阿里巴巴開源的分佈式消息中間件,支持順序消息、定時消息、自定義過濾器、負載均衡、pull/push 消息等功能。RocketMQ 主要由 Producer、Broker、Consumer 、NameServer 四部分組成,其中 Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。NameServer 充當名字路由服務,整體架構圖如下所示:

本文基於 Apache RocketMQ 最新版本主要講述 RocketMQ 的消費者機制,分析其啓動流程、pull/push 機制,消息 ack 機制以及定時消息和順序消息的不同。

1.2 工作流程

(1)啓動 NameServer。

NameServer 起來後監聽端口,等待 Broker、Producer、Consumer 連上來,相當於一個路由控制中心。

(2)啓動 Broker。

跟所有的 NameServer 保持長連接,定時發送心跳包。心跳包中包含當前 Broker 信息 (IP + 端口等) 以及存儲所有 Topic 信息。註冊成功後,NameServer 集羣中就有 Topic 跟 Broker 的映射關係。

(3)創建 Topic。

創建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發送消息時自動創建 Topic。

(4)Producer 發送消息。

啓動時先跟 NameServer 集羣中的其中一臺建立長連接,並從 NameServer 中獲取當前發送的 Topic 存在哪些 Broker 上,輪詢從隊列列表中選擇一個隊列,然後與隊列所在的 Broker 建立長連接從而向 Broker 發消息。

(5)Consumer 消費消息。

跟其中一臺 NameServer 建立長連接,獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接通道,開始消費消息。

二、消費者啓動流程

官方給出的消費者實現代碼如下所示:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
        // 設置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱一個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe("Test", "*");
        // 註冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啓動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

下面讓我們來分析消費者在啓動中每一階段中做了什麼吧,let’s go.

2.1 實例化消費者

第一步主要是實例化消費者,這裏採取默認的 Push 消費者模式,構造器中參數爲對應的消費者分組,指定同一分組可以消費同一類型的消息,如果沒有指定,將會採取默認的分組模式,這裏實例化了一個 DefaultMQPushConsumerImpl 對象,它是後面消費功能的主要實現類。

// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");

主要通過 DefaultMQPushConsumer 實例化 DefaultMQPushConsumerImpl,它是主要的消費功能實現類。

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
       AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
       this.consumerGroup = consumerGroup;
       this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
       defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
   }

2.2 設置 NameServer 和訂閱 topic 過程

// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("Test", "*");

2.2.1 添加 tag

設置 NameServer 地址後,這個地址爲你名字服務集羣的地址,類似於 zookeeper 集羣地址,樣例給出的是單機本地地址,搭建集羣后,可以設置爲集羣地址,接下來我們需要訂閱一個主題 topic 下的消息,設置對應的 topic,可以進行分類,通過設置不同的 tag 來實現,但目前只支持 "||" 進行連接,如:"tag1 || tag2 || tag3"。歸根在於構造訂閱數據時,源碼通過 "||" 進行了字符串的分割,如下所示:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
    String subString) throws Exception {
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);
    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        String[] tags = subString.split("\\|\\|");
        if (tags.length > 0) {
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {
            throw new Exception("subString split error");
        }
    }
    return subscriptionData;
}

2.2.2 發送心跳至 Broker

前面構造好訂閱主題和分類後,將其放入了一個 ConcurrentMap 中,並調用

sendHeartbeatToAllBrokerWithLock() 方法,進行心跳檢測和上傳過濾器類至 broker 集羣(生產者啓動過程也會進行此步驟)。如下所示:

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}

首先會對 broker 集羣進行心跳檢測,在此過程中會施加鎖,它會執行 sendHeartbeatToAllBroker 方法,構建心跳數據 heartbeatData,然後遍歷消費和生產者 table,將消費者和生產者信息加入到 heartbeatData 中,當都存在消費者和生產者的情況下,會遍歷 brokerAddrTable,往每個 broker 地址發送心跳,相當於往對應地址發送一次 http 請求,用於探測當前 broker 是否存活。 

this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);

2.2.3 上傳過濾器類至 FilterServer

之後會執行 uploadFilterClassSource() 方法,只有 push 模式纔會有此過程,在此模式下,它會循環遍歷訂閱數據 SubscriptionData,如果此訂閱數據使用了類模式過濾,會調用 uploadFilterClassToAllFilterServer() 方法:上傳用戶自定義的過濾消息實現類至過濾器服務器。

private void uploadFilterClassSource() {
    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQConsumerInner> next = it.next();
        MQConsumerInner consumer = next.getValue();
        if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
            Set<SubscriptionData> subscriptions = consumer.subscriptions();
            for (SubscriptionData sub : subscriptions) {
                if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
                    final String consumerGroup = consumer.groupName();
                    final String className = sub.getSubString();
                    final String topic = sub.getTopic();
                    final String filterClassSource = sub.getFilterClassSource();
                    try {
                        this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
                    } catch (Exception e) {
                        log.error("uploadFilterClassToAllFilterServer Exception", e);
                    }
                }
            }
        }
    }
}

過濾器類的作用:消費端可以上傳一個 Class 類文件到 FilterServer,Consumer 從 FilterServer 拉取消息時,FilterServer 會把請求轉發給 Broker,FilterServer 收取到 Broker 消息後,根據上傳的過濾類中的邏輯做過濾操作,過濾完成後再把消息給到 Consumer,用戶可以自定義過濾消息的實現類。

2.3 註冊回調實現類

接下來就是代碼中的註冊回調實現類了,當然,如果你是 pull 模式的話就不需要實現它了,push 模式需要定義,兩者區別後面會講到,它主要用於從 broker 實時獲取消息,這裏有兩種消費上下文類型,用於不同的消費類型。

ConsumeConcurrentlyContext: 延時類消息上下文,用於延時消息,即定時消息,默認不延遲,可以設置延遲等級,每個等級對應固定時間刻度,RocketMQ 中不能自定義延遲時間,延遲等級從 1 開始,對應的時間間隔如下所示:

 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

ConsumeOrderlyContext : 順序類消息上下文,控制發送消息的順序,生產者設置分片路由規則後,相同 key 只落到指定 queue 上,消費過程中會對順序消息所在的 queue 加鎖,保證消息的有序性。

2.4 消費者啓動

我們先來看下消費者啓動的過程,如下所示:

(1)this.checkConfig(): 首先是檢測消費配置項,包括消費分組 group、消息模型 (集羣、廣播)、訂閱數據、消息監聽器等是否存在,如果不存在的話,會拋出異常。

(2)copySubscription(): 構建主題訂閱信息 SubscriptionData 並加入到 RebalanceImpl 負載均衡方法的訂閱信息中。

(3)getAndCreateMQClientInstance(): 初始化 MQ 客戶端實例。

(4)offsetStore.load(): 根據不同消息模式創建消費進度 offsetStore 並加載:BROADCASTING - 廣播模式,同一個消費 group 中的 consumer 都消費一次,CLUSTERING - 集羣模式,默認方式,只被消費一次。

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
}

可以通過 setMessageModel 方式設置不同模式;廣播模式下同消費組的消費者相互獨立,消費進度在本地單獨進行存儲;集羣模式下,同一條消息只會被同一個消費組消費一次,消費進度會參與到負載均衡中,消費進度是共享在整個消費組中的。

(5)consumeMessageService.start(): 根據不同消息監聽類型實例化並啓動。這裏有延時消息和順序消息。

這裏主要講下順序消息,RocketMQ 也幫我們實現了,在啓動時,如果是集羣模式並是順序類型,它會啓動定時任務,定時向 broker 發送批量鎖,鎖住當前順序消費發往的消息隊列,順序消息因爲生產者生產消息時指定了分片策略和消息上下文,只會發往一個消費隊列。

定時任務發送批量鎖,鎖住當前順序消息隊列。

public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

發送鎖住隊列的消息至 broker,broker 端返回鎖住成功的隊列集合 lockOKMQSet,順序消息具體實現可查看後面第四節。

(6)mQClientFactory.registerConsumer():MQClientInstance 註冊消費者,並啓動 MQClientInstance,沒有註冊成功會結束消費服務。

(7)mQClientFactory.start():最後會啓動如下服務:遠程客戶端、定時任務、pull 消息服務、負載均衡服務、push 消息服務,然後將狀態改爲運行中。

switch (this.serviceState) {
    case CREATE_JUST:
        this.serviceState = ServiceState.START_FAILED;
        // If not specified,looking address from name server
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.mQClientAPIImpl.fetchNameServerAddr();
        }
        // Start request-response channel
        this.mQClientAPIImpl.start();
        // Start various schedule tasks
        this.startScheduledTask();
        // Start pull service
        this.pullMessageService.start();
        // Start rebalance service
        this.rebalanceService.start();
        // Start push service
        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
        log.info("the client factory [{}] start OK", this.clientId);
        this.serviceState = ServiceState.RUNNING;
        break;
    case RUNNING:
        break;
    case SHUTDOWN_ALREADY:
        break;
    case START_FAILED:
        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    default:
        break;
}

全部啓動完畢後,整個消費者也就啓動好了,接下來就可以對生產者發送過來的消息進行消費了,那麼是如何進行消息消費的呢?不同的消息模式有何區別呢?

三、pull/push 模式消費

3.1 pull 模式 - DefaultMQPullConsumer

pull 拉取式消費: 應用通常主動調用 Consumer 的拉消息方法從 Broker 服務器拉消息、主動權由應用程序控制,可以指定消費的位移,【僞代碼】如下所示:

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");
// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 啓動消費者實例
consumer.start();
//獲取主題下所有的消息隊列,這裏根據主題從nameserver獲取的
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");
for (MessageQueue queue : mqs) {
    //獲取當前隊列的消費位移,指定消費進度offset,fromstore:從broker中獲取還是本地獲取,true-broker
    long offset = consumer.fetchConsumeOffset(queue, true);
    PullResult pullResult = null;
    while (offset < pullResult.getMaxOffset()) {
        //第二個參數爲tag,獲取指定topic下的tag
        //第三個參數表示從哪個位移下開始消費消息
        //第四個參數表示一次最大拉取多少個消息
        try {
            pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("pull拉取消息失敗");
        }
        //代碼省略,記錄消息位移
        offset = pullResult.getNextBeginOffset();
        //代碼省略,這裏爲消費消息
    }
}

可以看到我們是主動拉取 topic 對應下的消息隊列,然後遍歷它們,獲取當前消費進度並進行消費。

3.2 push 模式 - DefaultMQPushConsumer

該模式下 Broker 收到數據後會主動推送給消費端,該消費模式一般實時性較高,現在一般推薦使用該方式,具體示例可以觀看第一章開頭的官方 demo。

它也是通過實現 pull 方式來實現的,首先,前面 2.4 消費者啓動之後,最後會啓動拉取消息服務 pullMessageService 和負載均衡 rebalanceService 服務,它們啓動後會一直有線程進行消費。

case CREATE_JUST:
               //......
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                //.......
                this.serviceState = ServiceState.RUNNING;
                break;
  case RUNNING:

這裏面調用 doRebalance() 方法,進行負載均衡,默認每 20s 做一次,會輪詢所有訂閱該實例的 topic。

public class RebalanceService extends ServiceThread {
    //初始化,省略....
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            //做負載均衡
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}

然後根據每個 topic,以及它是否順序消息模式來做 rebalance。

具體做法就是先對 Topic 下的消息消費隊列、消費者 Id 進行排序,然後用消息隊列的平均分配算法,計算出待拉取的消息隊列,將分配到的消息隊列集合與 processQueueTable 做一個過濾比對,新隊列不包含或已過期,則進行移除 。

public void doRebalance(final boolean isOrder) {
      Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
      if (subTable != null) {
          for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
              final String topic = entry.getKey();
              try {
                  /根據 /每個topic以及它是否順序消息模式來做rebalance
                  this.rebalanceByTopic(topic, isOrder);
              } catch (Throwable e) {
                  if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                      log.warn("rebalanceByTopic Exception", e);
                  }
              }
          }
      }
      this.truncateMessageQueueNotMyTopic();
  }

rebalanceByTopic 中廣播和集羣模式都會執行 updateProcessQueueTableInRebalance() 方法,最後會分發請求 dispatchPullRequest,通過 executePullRequestImmediately() 方法將 pull 請求放入 pull 請求隊列 pullRequestQueue 中,注意,pull 模式下分發請求方法 dispatchPullRequest() 實際實現是一個空方法,這裏兩者很大不同,push 模式實現如下:

@Override
 public void dispatchPullRequest(List<PullRequest> pullRequestList) {
     for (PullRequest pullRequest : pullRequestList) {
         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
     }
 }

然後再 PullMessageService 中,因爲前面 consumer 啓動成功了,PullMessageService 線程會實時去取 pullRequestQueue 中的 pull 請求。

@Override
  public void run() {
      log.info(this.getServiceName() + " service started");
      while (!this.isStopped()) {
          try {
              PullRequest pullRequest = this.pullRequestQueue.take();
              if (pullRequest != null) {
                  this.pullMessage(pullRequest);
              }
          } catch (InterruptedException e) {
          } catch (Exception e) {
              log.error("Pull Message Service Run Method exception", e);
          }
      }
      log.info(this.getServiceName() + " service end");
  }

取出來的 pull 請求又會經由 DefaultMQPushConsumerImpl 的消息監聽類,調用 pullMessage() 方法。

private void pullMessage(final PullRequest pullRequest) {
     final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
     if (consumer != null) {
         DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
         impl.pullMessage(pullRequest);
     } else {
         log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
     }
 }

pullMessage() 中 pullKernelImpl() 有一個 Pullback 方法用於執行消息的回調,它會通過 submitConsumeRequest() 這個方法來處理消息,總而言之就是通過線程回調的方式讓 push 模式下的監聽器能夠感知到。

//Pull回調
PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                         //省略...消費位移更新
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispathToConsume);

這個方法對應的不同消費模式有着不同實現,但都是會構建一個消費請求 ConsumeRequest,裏面有一個 run() 方法,構建完畢後,會把它放入到 listener 監聽器中。

//監聽消息
 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

還記得前面我們樣例給出的註冊監聽器回調處理方法嗎?

我們可以點擊上面的 consumeMessage 方法,查看它在源碼中的實現位置,發現它就回到了我們前面的 2.3 註冊回調實現類裏面了,整個流程是不是通順了呢?這個監聽器中就會收到 push 的消息,拉取出來進行業務消費邏輯,下面是我們自己定義的消息回調處理方法。

// 註冊回調實現類來處理從broker拉取回來的消息
 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
         // 標記該消息已經被成功消費
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });

3.3  小結

push 模式相比較於 pull 模式不同的是,做負載均衡時,pullRequest 請求會放入 pullRequestQueue,然後 PullMessageService 線程會實時去取出這個請求,將消息存入 ProcessQueue,通過線程回調的方式讓 push 模式下的監聽器能夠感知到,這樣消息從分發請求到接收都是實時的,而 pull 模式是消費端主動去拉取指定消息的,需要指定消費進度。

對於我們開發者來說,選取哪種模式實現我們的業務邏輯比較合適呢?別急,先讓我們總結下他們的特點:

共同點:

  1. 兩者底層實際一樣,push 模式也是基於 pull 模式來實現的。

  2. pull 模式需要我們通過程序主動通過 consumer 向 broker 拉消息,而消息的 push 模式則只需要我們提供一個 listener 監聽,實時獲取消息。

優點:

  1. push 模式採用長輪詢阻塞的方式獲取消息,實時性非常高;

  2. push 模式 rocketMQ 處理了獲取消息的細節,使用起來比較簡單方便;

  3. pull 模式可以指定消費進度,想消費多少就消費多少,靈活性大。

缺點:

  1. push 模式當消費者能力遠遠低於生產者能力的時候,會產生一定的消費者消息堆積;

  2. pull 模式實時性很低,頻率不好設置;

  3. 拉取消息的間隔不好設置,太短則產生很多無效 Pull 請求的 RPC 開銷,影響 MQ 整體的網絡性能,太長則實時性差。

適用場景:

  1. 對於服務端生產消息數據比較大時,而消費端處理比較複雜,消費能力相對較低時,這種情況就適用 pull 模式;

  2. 對於數據實時性要求高的場景,就比較適用與 push 模式。

現在的你是否明確業務中該使用哪種模式了呢?

四、順序消息

4.1 實現 MQ 順序消息發送存在問題

(1)一般消息發送會採取輪詢方式把消息發送到不同的 queue(分區隊列);而消費消息的時候從多個 queue 上拉取消息,broker 之間是無感知的,這種情況發送和消費是不能保證順序。

(2)異步方式發送消息時,發送的時候不是按着一條一條順序發送的,保證不了消息到達 Broker 的時間也是按照發送的順序來的。

消息發送到存儲,最後到消費要經歷這麼多步驟,我們該如何在業務中使用順序消息呢?讓咱們來一步步拆解下吧。

4.2 實現 MQ 順序消息關鍵點

既然分散到多個 broker 上無法追蹤順序,那麼可以控制發送的順序消息只依次發送到同一個 queue 中,消費的時候只從這個 queue 上依次拉取,則就保證了順序。在發送時設置分片路由規則,讓相同 key 的消息只落到指定 queue 上,然後消費過程中對順序消息所在的 queue 加鎖,保證消息的有序性,讓這個 queue 上的消息就按照 FIFO 順序來進行消費。因此我們滿足以下三個條件是否就可以呢?

1)消息順序發送: 多線程發送的消息無法保證有序性,因此,需要業務方在發送時,針對同一個業務編號 (如同一筆訂單) 的消息需要保證在一個線程內順序發送,在上一個消息發送成功後,在進行下一個消息的發送。對應到 mq 中,消息發送方法就得使用同步發送,異步發送無法保證順序性。

//採用的同步發送方式,在一個線程內順序發送,異步發送方式爲:producer.send(msg, new SendCallback() {...})
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}

2)消息順序存儲:MQ 的 topic 下會存在多個 queue,要保證消息的順序存儲,同一個業務編號的消息需要被髮送到一個 queue 中。對應到 mq 中,需要使用 MessageQueueSelector 來選擇要發送的 queue。即可以對業務編號設置路由規則,像根據隊列數量對業務字段 hash 取餘,將消息發送到一個 queue 中。

//使用"%"操作,使得訂單id取餘後相同的數據路由到同一個queue中,也可以自定義路由規則
long index = id % mqs.size();  
return mqs.get((int) index);

3)消息順序消費: 要保證消息順序消費,同一個 queue 就只能被一個消費者所消費,因此對 broker 中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內部,也只能有一個消費線程來消費該隊列。這裏 RocketMQ 已經爲我們實現好了

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
         //....省略
        }
    }

消費者重新負載,並且分配完消費隊列後,需要向 mq 服務器發起消息拉取請求,代碼實現在 RebalanceImpl#updateProcessQueueTableInRebalance() 中,針對順序消息的消息拉取,mq 做了以上判斷,即消費客戶端先向 broker 端發起對 messageQueue 的加鎖請求,只有加鎖成功時才創建 pullRequest 進行消息拉取,這裏的 pullRequest 就是前面 pull 和 push 模式消息體,而 updateProcessQueueTableInRebalance 這個方法也是在前面消費者啓動過程中有講到過哦。

具體加鎖邏輯如下:

public boolean lock(final MessageQueue mq) {
     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
     if (findBrokerResult != null) {
         LockBatchRequestBody requestBody = new LockBatchRequestBody();
         requestBody.setConsumerGroup(this.consumerGroup);
         requestBody.setClientId(this.mQClientFactory.getClientId());
         requestBody.getMqSet().add(mq);
         try {
             Set<MessageQueue> lockedMq =
                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
             for (MessageQueue mmqq : lockedMq) {
                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                 if (processQueue != null) {
                     processQueue.setLocked(true);
                     processQueue.setLastLockTimestamp(System.currentTimeMillis());
                 }
             }
             boolean lockOK = lockedMq.contains(mq);
             log.info("the message queue lock {}, {} {}",
                 lockOK ? "OK" : "Failed",
                 this.consumerGroup,
                 mq);
             return lockOK;
         } catch (Exception e) {
             log.error("lockBatchMQ exception, " + mq, e);
         }
     }
     return false;
 }

可以看到,就是調用 lockBatchMQ 方法發送了一個加鎖請求,成功獲取到消息處理隊列就設爲獲取到鎖,返回鎖定成功,如果加鎖成功,同一時刻只有一個線程進行消息消費。加鎖失敗,會延遲 1000ms 重新嘗試向 broker 端申請鎖定 messageQueue,鎖定成功後重新提交消費請求。

怎麼樣,這樣的加鎖方式是不是很像我們平時用到的分佈式鎖呢?由你來設計實現你會怎麼做呢?

## 五、消息 ack 機制

5.1 消息消費失敗處理

消息被消費者消費了,那麼如何保證被消費成功呢?消息消費失敗會出現什麼情況呢?

消息被消費,那麼如何保證被消費成功呢?這裏只有使用方控制,只有使用方確認成功了,纔會消費成功,否則會重新投遞。

RocketMQ 其實是通過 ACK 機制來對失敗消息進行重試和通知的,具體流程如下所示:

消息成功與否是由使用方控制,只有使用方確認成功了,纔會消費成功,否則會重新投遞,Consumer 會通過監聽器監聽回調過來的消息,返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 表示消費成功,如果消費失敗,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 狀態(消費重試),RocketMQ 就會默認爲這條消息失敗了,延遲一定時間後(默認 10s,可配置),會再次投送到 ConsumerGroup,重試次數與間隔時間關係上圖所示。如果持續這樣,失敗到一定次數(默認 16 次),就會進入到 DLQ 死信隊列,不再投遞,此時可以通過監控人工來干預。

5.2 消息重投帶來問題

RocketMQ 消費消息因爲消息重投很大一個問題就是無法保證消息只被消費一次,因此需要開發人員在業務裏面自己去處理。

六、總結

本文主要介紹了 RocketMQ 的消費者啓動流程,結合官方源碼和示例,一步步講述消費者在啓動和消息消費中的的工作原理及內容,並結合平時業務工作中,對我們所熟悉的順序、push/pull 模式等進行詳細分析,以及對於消息消費失敗和重投帶來問題去進行分析。

對於自己而言,希望通過主動學習源碼方式,能夠明白其中啓動的原理,學習裏面優秀的方案,像對於 pull/push,順序消息這些,學習之後能夠了解到 push 模式是何如做到實時拉取消息的,順序消息是如何保證的,再就是能夠聯想到平時遇到這種問題該如何處理,像順序消息在消息被消費時保持和存儲的順序一致,這裏自己施加分佈式鎖寫能不能實現等,文中也有很多引導性問題,希望能引起讀者自己的思考,能夠對整個消費者啓動和消息消費流程有着較爲直觀的認知,但還有着一些技術細節由於篇幅原因沒做出詳細說明,也歡迎大家一起探討交流~

參考資料:

  1. RocketMQ 官網示例

  2. RocketMQ 系列之 pull(拉) 消息模式(七)

  3. RocketMQ 的順序消息(順序消費)

vivo 互聯網技術 分享 vivo 互聯網技術乾貨與沙龍活動,推薦最新行業動態與熱門會議。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ajj2oq10PGLvIArBRufs9Q