深入剖析 RocketMQ 源碼 - 負載均衡機制
作者:vivo 互聯網服務器團隊 - Wang Zhi
一、引言
RocketMQ 是一款優秀的分佈式消息中間件,在各方面的性能都比目前已有的消息隊列要好,RocketMQ 默認採用長輪詢的拉模式, 單機支持千萬級別的消息堆積,可以非常好的應用在海量消息系統中。
RocketMQ 主要由 Producer、Broker、Consumer、Namesvr 等組件組成,其中 Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息,Namesvr 負責存儲元數據,各組件的主要功能如下:
-
消息生產者(Producer):負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統裏產生的消息發送到 Broker 服務器。RocketMQ 提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要 Broker 返回確認信息,單向發送不需要。
-
消息消費者(Consumer):負責消費消息,一般是後臺系統負責異步消費。一個消息消費者會從 Broker 服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
-
代理服務器(Broker Server):消息中轉角色,負責存儲消息、轉發消息。代理服務器在 RocketMQ 系統中負責接收從生產者發送來的消息並存儲、同時爲消費者的拉取請求作準備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
-
名字服務(Name Server):名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的 Broker IP 列表。多個 Namesrv 實例組成集羣,但相互獨立,沒有信息交換。
-
生產者組(Producer Group):同一類 Producer 的集合,這類 Producer 發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之後崩潰,則 Broker 服務器會聯繫同一生產者組的其他生產者實例以提交或回溯消費。
-
消費者組(Consumer Group):同一類 Consumer 的集合,這類 Consumer 通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。
RocketMQ 整體消息處理邏輯上以 Topic 維度進行生產消費、物理上會存儲到具體的 Broker 上的某個 MessageQueue 當中,正因爲一個 Topic 會存在多個 Broker 節點上的多個 MessageQueue,所以自然而然就產生了消息生產消費的負載均衡需求。
本篇文章分析的核心在於介紹 RocketMQ 的消息生產者(Producer)和消息消費者(Consumer)在整個消息的生產消費過程中如何實現負載均衡以及其中的實現細節。
二、RocketMQ 的整體架構
(圖片來自於 Apache RocketMQ)
RocketMQ 架構上主要分爲四部分,如上圖所示:
-
Producer:消息發佈的角色,支持分佈式集羣方式部署。Producer 通過 MQ 的負載均衡模塊選擇相應的 Broker 集羣隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。
-
Consumer:消息消費的角色,支持分佈式集羣方式部署。支持以 push 推,pull 拉兩種模式對消息進行消費。同時也支持集羣方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。
-
NameServer:NameServer 是一個非常簡單的 Topic 路由註冊中心,支持分佈式集羣方式部署,其角色類似 Dubbo 中的 zookeeper,支持 Broker 的動態註冊與發現。
-
BrokerServer:Broker 主要負責消息的存儲、投遞和查詢以及服務高可用保證,支持分佈式集羣方式部署。
RocketMQ 的 Topic 的物理分佈如上圖所示:
Topic 作爲消息生產和消費的邏輯概念,具體的消息存儲分佈在不同的 Broker 當中。
Broker 中的 Queue 是 Topic 對應消息的物理存儲單元。
在 RocketMQ 的整體設計理念當中,消息的生產消費以 Topic 維度進行,每個 Topic 會在 RocketMQ 的集羣中的 Broker 節點創建對應的 MessageQueue。
producer 生產消息的過程本質上就是選擇 Topic 在 Broker 的所有的 MessageQueue 並按照一定的規則選擇其中一個進行消息發送,正常情況的策略是輪詢。
consumer 消費消息的過程本質上就是一個訂閱同一個 Topic 的 consumerGroup 下的每個 consumer 按照一定的規則負責 Topic 下一部分 MessageQueue 進行消費。
在 RocketMQ 整個消息的生命週期內,不管是生產消息還是消費消息都會涉及到負載均衡的概念,消息的生成過程中主要涉及到 Broker 選擇的負載均衡,消息的消費過程主要涉及多 consumer 和多 Broker 之間的負責均衡。
三、producer 消息生產過程
producer 消息生產過程:
-
producer 首先訪問 namesvr 獲取路由信息,namesvr 存儲 Topic 維度的所有路由信息(包括每個 topic 在每個 Broker 的隊列分佈情況)。
-
producer 解析路由信息生成本地的路由信息,解析 Topic 在 Broker 隊列信息並轉化爲本地的消息生產的路由信息。
-
producer 根據本地路由信息向 Broker 發送消息,選擇本地路由中具體的 Broker 進行消息發送。
3.1 路由同步過程
public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 省略對應的代碼
} else {
// 1、負責查詢指定的Topic對應的路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
// 2、比較路由數據topicRouteData是否發生變更
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
// 3、解析路由信息轉化爲生產者的路由信息和消費者的路由信息
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// 生成生產者對應的Topic信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// 保存到本地生產者路由表當中
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
}
} finally {
this.lockNamesrv.unlock();
}
} else {
}
} catch (InterruptedException e) {
}
return false;
}
}
路由同步過程:
-
路由同步過程是消息生產者發送消息的前置條件,沒有路由的同步就無法感知具體發往那個 Broker 節點。
-
路由同步主要負責查詢指定的 Topic 對應的路由信息,比較路由數據 topicRouteData 是否發生變更,最終解析路由信息轉化爲生產者的路由信息和消費者的路由信息。
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
// 按照broker維度保存的Queue信息
private List<QueueData> queueDatas;
// 按照broker維度保存的broker信息
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
public class QueueData implements Comparable<QueueData> {
// broker的名稱
private String brokerName;
// 讀隊列大小
private int readQueueNums;
// 寫隊列大小
private int writeQueueNums;
// 讀寫權限
private int perm;
private int topicSynFlag;
}
public class BrokerData implements Comparable<BrokerData> {
// broker所屬集羣信息
private String cluster;
// broker的名稱
private String brokerName;
// broker對應的ip地址信息
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
}
--------------------------------------------------------------------------------------------------
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
// 最細粒度的隊列信息
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
// Topic信息
private String topic;
// 所屬的brokerName信息
private String brokerName;
// Topic下的隊列信息Id
private int queueId;
}
路由解析過程:
-
TopicRouteData 核心變量 QueueData 保存每個 Broker 的隊列信息,BrokerData 保存 Broker 的地址信息。
-
TopicPublishInfo 核心變量 MessageQueue 保存最細粒度的隊列信息。
-
producer 負責將從 namesvr 獲取的 TopicRouteData 轉化爲 producer 本地的 TopicPublishInfo。
public class MQClientInstance {
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
// 省略相關代碼
} else {
List<QueueData> qds = route.getQueueDatas();
// 按照brokerName進行排序
Collections.sort(qds);
// 遍歷所有broker生成隊列維度信息
for (QueueData qd : qds) {
// 具備寫能力的QueueData能夠用於隊列生成
if (PermName.isWriteable(qd.getPerm())) {
// 遍歷獲得指定brokerData進行異常條件過濾
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
// 遍歷QueueData的寫隊列的數量大小,生成MessageQueue保存指定TopicPublishInfo
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
}
路由生成過程:
-
路由生成過程主要是根據 QueueData 的 BrokerName 和 writeQueueNums 來生成 MessageQueue 對象。
-
MessageQueue 是消息發送過程中選擇的最細粒度的可發送消息的隊列。
{
"TBW102": [{
"brokerName": "broker-a",
"perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0,
"writeQueueNums": 8
}, {
"brokerName": "broker-b",
"perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0,
"writeQueueNums": 8
}]
}
路由解析舉例:
-
topic(TBW102)在 broker-a 和 broker-b 上存在隊列信息,其中讀寫隊列個數都爲 8。
-
先按照 broker-a、broker-b 的名字順序針對 broker 信息進行排序。
-
針對 broker-a 會生成 8 個 topic 爲 TBW102 的 MessageQueue 對象,queueId 分別是 0-7。
-
針對 broker-b 會生成 8 個 topic 爲 TBW102 的 MessageQueue 對象,queueId 分別是 0-7。
-
topic(名爲 TBW102)的 TopicPublishInfo 整體包含 16 個 MessageQueue 對象,其中有 8 個 broker-a 的 MessageQueue,有 8 個 broker-b 的 MessageQueue。
-
消息發送過程中的路由選擇就是從這 16 個 MessageQueue 對象當中獲取一個進行消息發送。
3.2 負載均衡過程
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 1、查詢消息發送的TopicPublishInfo信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
String[] brokersSent = new String[timesTotal];
// 根據重試次數進行消息發送
for (; times < timesTotal; times++) {
// 記錄上次發送失敗的brokerName
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 2、從TopicPublishInfo獲取發送消息的隊列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 3、執行發送並判斷髮送結果,如果發送失敗根據重試次數選擇消息隊列進行重新發送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
switch (communicationMode) {
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (MQBrokerException e) {
// 省略相關代碼
} catch (InterruptedException e) {
// 省略相關代碼
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
}
}
}
消息發送過程:
-
查詢 Topic 對應的路由信息對象 TopicPublishInfo。
-
從 TopicPublishInfo 中通過
selectOneMessageQueue 獲取發送消息的隊列,該隊列代表具體落到具體的 Broker 的 queue 隊列當中。
-
執行發送並判斷髮送結果,如果發送失敗根據重試次數選擇消息隊列進行重新發送,重新選擇隊列會避開上一次發送失敗的 Broker 的隊列。
public class TopicPublishInfo {
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 按照輪詢進行選擇發送的MessageQueue
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 避開上一次上一次發送失敗的MessageQueue
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
}
路由選擇過程:
-
MessageQueue 的選擇按照輪詢進行選擇,通過全局維護索引進行累加取模選擇發送隊列。
-
MessageQueue 的選擇過程中會避開上一次發送失敗 Broker 對應的 MessageQueue。
Producer 消息發送示意圖:
-
某 Topic 的隊列分佈爲 Broker_A_Queue1、Broker_A_Queue2、Broker_B_Queue1、Broker_B_Queue2、Broker_C_Queue1、Broker_C_Queue2,根據輪詢策略依次進行選擇。
-
發送失敗的場景下如 Broker_A_Queue1 發送失敗那麼就會跳過 Broker_A 選擇 Broker_B_Queue1 進行發送。
四、consumer 消息消費過程
consumer 消息消費過程:
-
consumer 訪問 namesvr 同步 topic 對應的路由信息。
-
consumer 在本地解析遠程路由信息並保存到本地。
-
consumer 在本地進行 Reblance 負載均衡確定本節點負責消費的 MessageQueue。
-
consumer 訪問 Broker 消費指定的 MessageQueue 的消息。
4.1 路由同步過程
public class MQClientInstance {
// 1、啓動定時任務從namesvr定時同步路由信息
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
}
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>();
// 遍歷所有的consumer訂閱的Topic並從namesvr獲取路由信息
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 省略代碼
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// 構建consumer側的路由信息
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
}
} finally {
this.lockNamesrv.unlock();
}
}
} catch (InterruptedException e) {
}
return false;
}
}
路由同步過程:
-
路由同步過程是消息消費者消費消息的前置條件,沒有路由的同步就無法感知具體待消費的消息的 Broker 節點。
-
consumer 節點通過定時任務定期從 namesvr 同步該消費節點訂閱的 topic 的路由信息。
-
consumer 通過 updateTopicSubscribeInfo 將同步的路由信息構建成本地的路由信息並用以後續的負責均衡。
4.2 負載均衡過程
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
}
負載均衡過程:
-
consumer 通過 RebalanceService 來定期進行重新負載均衡。
-
RebalanceService 的核心在於完成 MessageQueue 和 consumer 的分配關係。
public abstract class RebalanceImpl {
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// 省略相關代碼
break;
}
case CLUSTERING: { // 集羣模式下的負載均衡
// 1、獲取topic下所有的MessageQueue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 2、獲取topic下該consumerGroup下所有的consumer對象
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// 3、開始重新分配進行rebalance
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 4、通過分配策略重新進行分配
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 5、根據分配結果執行真正的rebalance動作
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
重新分配流程:
-
獲取 topic 下所有的 MessageQueue。
-
獲取 topic 下該 consumerGroup 下所有的 consumer 的 cid(如 192.168.0.8@15958)。
-
針對 mqAll 和 cidAll 進行排序,mqAll 排序順序按照先 BrokerName 後 BrokerId,cidAll 排序按照字符串排序。
-
通過分配策略
AllocateMessageQueueStrategy 重新分配。
-
根據分配結果執行真正的 rebalance 動作。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 核心邏輯計算開始
// 計算當前cid的下標
int index = cidAll.indexOf(currentCID);
// 計算多餘的模值
int mod = mqAll.size() % cidAll.size();
// 計算平均大小
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// 計算起始下標
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 計算範圍大小
int range = Math.min(averageSize, mqAll.size() - startIndex);
// 組裝結果
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
// 核心邏輯計算結束
@Override
public String getName() {
return "AVG";
}
}
------------------------------------------------------------------------------------
rocketMq的集羣存在3個broker,分別是broker_a、broker_b、broker_c。
rocketMq上存在名爲topic_demo的topic,writeQueue寫隊列數量爲3,分佈在3個broker。
排序後的mqAll的大小爲9,依次爲
[broker_a_0 broker_a_1 broker_a_2 broker_b_0 broker_b_1 broker_b_2 broker_c_0 broker_c_1 broker_c_2]
rocketMq存在包含4個consumer的consumer_group,排序後cidAll依次爲
[192.168.0.6@15956 192.168.0.7@15957 192.168.0.8@15958 192.168.0.9@15959]
192.168.0.6@15956 的分配MessageQueue結算過程
index:0
mod:9%4=1
averageSize:9 / 4 + 1 = 3
startIndex:0
range:3
messageQueue:[broker_a_0、broker_a_1、broker_a_2]
192.168.0.6@15957 的分配MessageQueue結算過程
index:1
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:3
range:2
messageQueue:[broker_b_0、broker_b_1]
192.168.0.6@15958 的分配MessageQueue結算過程
index:2
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:5
range:2
messageQueue:[broker_b_2、broker_c_0]
192.168.0.6@15959 的分配MessageQueue結算過程
index:3
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:7
range:2
messageQueue:[broker_c_1、broker_c_2]
分配策略分析:
- 整體分配策略可以參考上圖的具體例子,可以更好的理解分配的邏輯。
consumer 的分配:
-
同一個 consumerGroup 下的 consumer 對象會分配到同一個 Topic 下不同的 MessageQueue。
-
每個 MessageQueue 最終會分配到具體的 consumer 當中。
五、RocketMQ 指定機器消費設計思路
日常測試環境當中會存在多臺 consumer 進行消費,但實際開發當中某臺 consumer 新上了功能後希望消息只由該機器進行消費進行邏輯覆蓋,這個時候 consumerGroup 的集羣模式就會給我們造成困擾,因爲消費負載均衡的原因不確定消息具體由那臺 consumer 進行消費。當然我們可以通過介入 consumer 的負載均衡機制來實現指定機器消費。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 通過改寫這部分邏輯,增加判斷是否是指定IP的機器,如果不是直接返回空列表表示該機器不負責消費
if (!cidAll.contains(currentCID)) {
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
}
consumer 負載均衡策略改寫:
-
通過改寫負載均衡策略 AllocateMessageQueueAveragely 的 allocate 機制保證只有指定 IP 的機器能夠進行消費。
-
通過 IP 進行判斷是基於 RocketMQ 的 cid 格式是 192.168.0.6@15956,其中前面的 IP 地址就是對於的消費機器的 ip 地址,整個方案可行且可以實際落地。
六、小結
本文主要介紹了 RocketMQ 在生產和消費過程中的負載均衡機制,結合源碼和實際案例力求給讀者一個易於理解的技術普及,希望能對讀者有參考和借鑑價值。囿於文章篇幅,有些方面未涉及,也有很多技術細節未詳細闡述,如有疑問歡迎繼續交流。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/y_c6jD-r9DVgSotOXfk4tQ