RocketMQ 的推模式和拉模式有什麼區別?
大家好,我是君哥。
RocketMQ 消息消費有兩種模式,PULL 和 PUSH, 今天我們來看一下這兩種模式有什麼區別。
PUSH 模式
首先看一段 RocketMQ 推模式的一個官方示例:
public static void main(String[] args) throws InterruptedException, MQClientException {
Tracer tracer = initTracer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
消費者會定義一個消息監聽器,並且把這個監聽器註冊到 DefaultMQPushConsumer,同時也會註冊到 DefaultMQPushConsumerIm-pl,當拉取到消息時,就會使用這個監聽器來處理消息。那這個監聽器是什麼時候調用呢?看下面的 UML 類圖:
消費者真正拉取請求的類是 DefaultMQPush-ConsumerImpl,這個類的 pullMessage 方法調用了 PullAPIWrapper 的 pullKernelImpl 方法,這個方法有一個參數是回調函數 Pull-Callback,當 PULL 狀態是 PullStatus.FOU-ND 時,代表拉取消息成功,處理邏輯如下:
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//省略部分邏輯
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//省略部分邏輯
break;
//省略其他case
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
//省略
}
};
這個處理邏輯調用了 ConsumeMessage-Service 類的 submitConsumeRequest 方法,我們看一下併發消費消息的處理邏輯,代碼如下:
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//分批處理,跟上面邏輯一致
}
ConsumeRequest 類是一個線程類,run 方法裏面調用了消費者定義的消息處理方法,代碼如下:
public void run() {
//省略邏輯
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
//省略邏輯
try {
//調用消費方法
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//省略邏輯
}
//省略邏輯
}
下面以併發消費方式下的同步拉取消息爲例總結一下消費者消息處理過程:
-
在 MessageListenerConcurrently 中定義消費者處理邏輯,消費者啓動時註冊到 DefaultMQPushConsumer 和 DefaultMQ-PushConsumerImpl;
-
消費者啓動時,啓動消費拉取線程 PullMessageService,裏面死循環不停地從 Broker 拉取消息。這裏調用了 DefaultMQPushConsumerImpl 類的 pullMessage 方法;
-
DefaultMQPushConsumerImpl 類的 pullMessage 方法調用 PullAPIWrapper 的 pullKernelImpl 方法真正去發送 PULL 請求,並傳入 PullCallback 的 回調函數;
-
拉取到消息後,調用 PullCallback 的 onSuccess 方法處理結果,這裏調用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,裏面用 ConsumeRequest 線程來處理拉取到的消息;
-
ConsumeRequest 處理消息時調用了消費端定義的消費邏輯,也就是 Message-ListenerConcurrently 的 consumeMessage 方法。
PULL 模式
下面是來自官方的一段 PULL 模式拉取消息的代碼:
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
這裏我們看到,PULL 模式需要在處理邏輯裏不停的去拉取消息,比如上面代碼中寫了一個死循環。那 PULL 模式中 poll 函數是怎麼實現的呢?我們看下面的 UML 類圖:
跟蹤源碼可以看到,消息拉取最終是從 DefaultLitePullConsumerImpl 類中的一個 LinkedBlockingQueue 上面拉取。那消息是什麼時候 put 到 LinkedBlockingQueue 呢?
官方拉取消息的代碼中有一個 subscribe 方法訂閱了 Topic,這裏相關的 UML 類圖如下:
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
//省略邏輯
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
//省略邏輯
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}
這裏給 DefaultLitePullConsumer 類的 messageQueueListener 這個監聽器進行了賦值。當監聽器監聽到 MessageQueue 發送變化時,就會啓動消息拉取消息的線程 Pull-TaskImpl,代碼如下:
public void run() {
//省略部分邏輯
if (!this.isCancelled()) {
long pullDelayTimeMills = 0;
try {
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
//省略其他 case
}
}
//省略 catch
if (!this.isCancelled()) {
//啓動下一次拉取
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}
}
}
拉取消息成功後,調用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然後啓動下一次拉取。
這樣就清除了示例代碼中 poll 消息的邏輯,那還有一個問題,監聽器是什麼時候觸發監聽事件呢?
在消費者啓動時,會啓動 RebalanceService 這個線程,這個線程的 run 方法如下:
public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
下面的 UML 類圖顯示了 doRebalance 方法的調用關係:
可以看到最終調用了 最終調用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代碼如下:
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
這裏最終觸發了監聽器。
下面以併發消費方式下的同步拉取消息爲例總結一下消費者消息處理過程:
-
消費者啓動,向 DefaultLitePullConsumer 訂閱了 Topic,這個訂閱過程會向 DefaultLitePullConsumer 註冊一個監聽器;
-
消費者啓動過程中,會啓動 Message-Queue 重平衡線程 Rebalance-Service,當重平衡過程發現 ProcessQueueTable 發生變化時,啓動消息拉取線程;
-
消息拉取線程拉取到消息後,把消息放到 consumeRequestCache,然後進行下一次拉取;
-
消費者啓動後,不停地從 consumeReq-uestCache 拉取消息進行處理。
總結
通過本文的講解,可以看到 PUSH 模式和 PULL 模式本質上都是客戶端主動拉取,RocketMQ 並沒有真正實現 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的區別如下:
-
PULL 模式是從 Broker 拉取消息後放入緩存,然後消費端不停地從緩存取出消息來執行客戶端定義的處理邏輯,而 PUSH 模式是在死循環中不停的從 Broker 拉取消息,拉取到後調用回調函數進行處理,回調函數中調用客戶端定義的處理邏輯;
-
PUSH 模式拉取消息依賴死循環來不停喚起業務,而 PULL 模式拉取消息是通過 MessageQueue 監聽器來觸發消息拉取線程,消息拉取線程會在拉取完一次後接着下一次拉取。
君哥聊技術 後端架構師,定期分享技術乾貨,包括後端開發、分佈式、中間件、雲原生等。同時也會分享職場心得、程序人生。關注我,一起進階。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/TGtDKLGK03z-JWUgqh90jA