深入剖析 RocketMQ 源碼 - 負載均衡機制

作者:vivo 互聯網服務器團隊 - Wang Zhi

一、引言

RocketMQ 是一款優秀的分佈式消息中間件,在各方面的性能都比目前已有的消息隊列要好,RocketMQ 默認採用長輪詢的拉模式, 單機支持千萬級別的消息堆積,可以非常好的應用在海量消息系統中。

RocketMQ 主要由 Producer、Broker、Consumer、Namesvr 等組件組成,其中 Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息,Namesvr 負責存儲元數據,各組件的主要功能如下:

RocketMQ 整體消息處理邏輯上以 Topic 維度進行生產消費、物理上會存儲到具體的 Broker 上的某個 MessageQueue 當中,正因爲一個 Topic 會存在多個 Broker 節點上的多個 MessageQueue,所以自然而然就產生了消息生產消費的負載均衡需求。

本篇文章分析的核心在於介紹 RocketMQ 的消息生產者(Producer)和消息消費者(Consumer)在整個消息的生產消費過程中如何實現負載均衡以及其中的實現細節。

二、RocketMQ 的整體架構

圖片

(圖片來自於 Apache RocketMQ

RocketMQ 架構上主要分爲四部分,如上圖所示:

圖片

RocketMQ 的 Topic 的物理分佈如上圖所示:

Topic 作爲消息生產和消費的邏輯概念,具體的消息存儲分佈在不同的 Broker 當中。

Broker 中的 Queue 是 Topic 對應消息的物理存儲單元。

在 RocketMQ 的整體設計理念當中,消息的生產消費以 Topic 維度進行,每個 Topic 會在 RocketMQ 的集羣中的 Broker 節點創建對應的 MessageQueue。

producer 生產消息的過程本質上就是選擇 Topic 在 Broker 的所有的 MessageQueue 並按照一定的規則選擇其中一個進行消息發送,正常情況的策略是輪詢。

consumer 消費消息的過程本質上就是一個訂閱同一個 Topic 的 consumerGroup 下的每個 consumer 按照一定的規則負責 Topic 下一部分 MessageQueue 進行消費。

在 RocketMQ 整個消息的生命週期內,不管是生產消息還是消費消息都會涉及到負載均衡的概念,消息的生成過程中主要涉及到 Broker 選擇的負載均衡,消息的消費過程主要涉及多 consumer 和多 Broker 之間的負責均衡。

三、producer 消息生產過程

圖片

producer 消息生產過程

3.1 路由同步過程

public class MQClientInstance {
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 省略對應的代碼
                    } else {
                        // 1、負責查詢指定的Topic對應的路由信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        // 2、比較路由數據topicRouteData是否發生變更
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        // 3、解析路由信息轉化爲生產者的路由信息和消費者的路由信息
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            // 生成生產者對應的Topic信息
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            // 保存到本地生產者路由表當中
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
            }
        } catch (InterruptedException e) {
        }
        return false;
    }
}

路由同步過程

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    // 按照broker維度保存的Queue信息
    private List<QueueData> queueDatas;
    // 按照broker維度保存的broker信息
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
public class QueueData implements Comparable<QueueData> {
    // broker的名稱
    private String brokerName;
    // 讀隊列大小
    private int readQueueNums;
    // 寫隊列大小
    private int writeQueueNums;
    // 讀寫權限
    private int perm;
    private int topicSynFlag;
}
public class BrokerData implements Comparable<BrokerData> {
    // broker所屬集羣信息
    private String cluster;
    // broker的名稱
    private String brokerName;
    // broker對應的ip地址信息
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    private final Random random = new Random();
}
--------------------------------------------------------------------------------------------------
public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    // 最細粒度的隊列信息
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    // Topic信息
    private String topic;
    // 所屬的brokerName信息
    private String brokerName;
    // Topic下的隊列信息Id
    private int queueId;
}

路由解析過程

public class MQClientInstance {
    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        info.setTopicRouteData(route);
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
          // 省略相關代碼
        } else {
            List<QueueData> qds = route.getQueueDatas();
            // 按照brokerName進行排序
            Collections.sort(qds);
            // 遍歷所有broker生成隊列維度信息
            for (QueueData qd : qds) {
                // 具備寫能力的QueueData能夠用於隊列生成
                if (PermName.isWriteable(qd.getPerm())) {
                    // 遍歷獲得指定brokerData進行異常條件過濾
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }
                    if (null == brokerData) {
                        continue;
                    }
                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }
                    // 遍歷QueueData的寫隊列的數量大小,生成MessageQueue保存指定TopicPublishInfo
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }
            info.setOrderTopic(false);
        }
        return info;
    }
}

路由生成過程

{
    "TBW102": [{
        "brokerName": "broker-a",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }, {
        "brokerName": "broker-b",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }]
}

路由解析舉例

3.2 負載均衡過程

public class DefaultMQProducerImpl implements MQProducerInner {
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 1、查詢消息發送的TopicPublishInfo信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            String[] brokersSent = new String[timesTotal];
            // 根據重試次數進行消息發送
            for (; times < timesTotal; times++) {
                // 記錄上次發送失敗的brokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 2、從TopicPublishInfo獲取發送消息的隊列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 3、執行發送並判斷髮送結果,如果發送失敗根據重試次數選擇消息隊列進行重新發送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        switch (communicationMode) {
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (MQBrokerException e) {
                        // 省略相關代碼
                    } catch (InterruptedException e) {
                        // 省略相關代碼
                    }
                } else {
                    break;
                }
            }
            if (sendResult != null) {
                return sendResult;
            }
        }
    }
}

消息發送過程

public class TopicPublishInfo {
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            // 按照輪詢進行選擇發送的MessageQueue
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                // 避開上一次上一次發送失敗的MessageQueue
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
}

路由選擇過程

圖片

Producer 消息發送示意圖

四、consumer 消息消費過程

圖片

consumer 消息消費過程

4.1 路由同步過程

public class MQClientInstance {
    // 1、啓動定時任務從namesvr定時同步路由信息
    private void startScheduledTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    }
    public void updateTopicRouteInfoFromNameServer() {
        Set<String> topicList = new HashSet<String>();
        // 遍歷所有的consumer訂閱的Topic並從namesvr獲取路由信息
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if (subList != null) {
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }
        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 省略代碼
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            // 構建consumer側的路由信息
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            }
        } catch (InterruptedException e) {
        }
        return false;
    }
}

路由同步過程

4.2 負載均衡過程

public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    private final MQClientInstance mqClientFactory;
    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }
    @Override
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}

負載均衡過程

public abstract class RebalanceImpl {
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // 省略相關代碼
                break;
            }
            case CLUSTERING: { // 集羣模式下的負載均衡
                // 1、獲取topic下所有的MessageQueue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 2、獲取topic下該consumerGroup下所有的consumer對象
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                // 3、開始重新分配進行rebalance
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        // 4、通過分配策略重新進行分配
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        return;
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 5、根據分配結果執行真正的rebalance動作
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

重新分配流程

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 核心邏輯計算開始
        // 計算當前cid的下標
        int index = cidAll.indexOf(currentCID);
        // 計算多餘的模值
        int mod = mqAll.size() % cidAll.size();
        // 計算平均大小
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // 計算起始下標
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 計算範圍大小
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        // 組裝結果
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
    // 核心邏輯計算結束
    @Override
    public String getName() {
        return "AVG";
    }
}
------------------------------------------------------------------------------------
rocketMq的集羣存在3個broker,分別是broker_a、broker_b、broker_c。
rocketMq上存在名爲topic_demo的topic,writeQueue寫隊列數量爲3,分佈在3個broker。
排序後的mqAll的大小爲9,依次爲
[broker_a_0  broker_a_1  broker_a_2  broker_b_0  broker_b_1  broker_b_2  broker_c_0  broker_c_1  broker_c_2]
rocketMq存在包含4個consumer的consumer_group,排序後cidAll依次爲
[192.168.0.6@15956  192.168.0.7@15957  192.168.0.8@15958  192.168.0.9@15959]
192.168.0.6@15956 的分配MessageQueue結算過程
index:0
mod:9%4=1
averageSize:9 / 4 + 1 = 3
startIndex:0
range:3
messageQueue:[broker_a_0、broker_a_1、broker_a_2]
192.168.0.6@15957 的分配MessageQueue結算過程
index:1
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:3
range:2
messageQueue:[broker_b_0、broker_b_1]
192.168.0.6@15958 的分配MessageQueue結算過程
index:2
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:5
range:2
messageQueue:[broker_b_2、broker_c_0]
192.168.0.6@15959 的分配MessageQueue結算過程
index:3
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:7
range:2
messageQueue:[broker_c_1、broker_c_2]

分配策略分析:

圖片

consumer 的分配

五、RocketMQ 指定機器消費設計思路

日常測試環境當中會存在多臺 consumer 進行消費,但實際開發當中某臺 consumer 新上了功能後希望消息只由該機器進行消費進行邏輯覆蓋,這個時候 consumerGroup 的集羣模式就會給我們造成困擾,因爲消費負載均衡的原因不確定消息具體由那臺 consumer 進行消費。當然我們可以通過介入 consumer 的負載均衡機制來實現指定機器消費。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 通過改寫這部分邏輯,增加判斷是否是指定IP的機器,如果不是直接返回空列表表示該機器不負責消費
        if (!cidAll.contains(currentCID)) {
            return result;
        }
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

consumer 負載均衡策略改寫

六、小結

本文主要介紹了 RocketMQ 在生產和消費過程中的負載均衡機制,結合源碼和實際案例力求給讀者一個易於理解的技術普及,希望能對讀者有參考和借鑑價值。囿於文章篇幅,有些方面未涉及,也有很多技術細節未詳細闡述,如有疑問歡迎繼續交流。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/y_c6jD-r9DVgSotOXfk4tQ