如何優雅地開始研究一個新技術

不講過多大道理,本文以我開始研究 rocketmq 爲例,就最近的事。

大家不要嘲笑我,我對 rocketmq 一無所知,所以寫這篇文章剛好合適,正好也記錄下我開始學習 rocketmq 這個冷啓動階段,大家看看對自己是否有幫助。

這篇文章是我邊看邊寫的,力求還原一下我最真實的啓動過程,不瞭解 rocketmq 的不用擔心,甚至更好,因爲本文不會涉及到 rocketmq 的細節,只是突出從一無所知開始研究它的啓動階段。

開始吧。

先把源碼跑起來

第一步,先把源碼搞到手。

百度出來它的官方網站:

http://rocketmq.apache.org/

傻瓜式的首頁,非常友好,直接點擊最大的那個 Getting Started 按鈕。

然後就進入了 quick-start 文檔頁。

根據開頭給的 4.8.0 版本的 rocketmq 源碼地址 

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip 

把它下載了下來。

是個 Java 項目,還是 maven 構建的,那好辦了,直接 idea 打開!

緊接着人家就教我如何構建它。

> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0

mvn 構建這一步就受挫了,好像是 maven 默認中央倉庫好多 jar 包下載不了,於是我把 maven 配置文件的中央倉庫改成阿里的,就好了。

<mirror>
  <id>alimaven</id>
  <name>aliyun maven</name
    <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
  <mirrorOf>central</mirrorOf>
</mirror>

體驗一下最簡操作

接着往下讀文檔,現在代碼已經到手,也構建成功,接下來講的是如果體驗一下發個消息,再收個消息的過程。

我是 windows 系統,就照着下面 windows 的教程來,就是這麼任性。

嗯,配置一下環境變量而已。

接下來是四個啓動腳本,很清晰。

## Start Name Server
.\bin\mqnamesrv.cmd
## Start Broker
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
## Send Messages
.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Producer
## Receive Messages
.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Consumer

我看了一下這些 cmd 文件,意思就是執行這些類的 main 方法而已,於是我改在 idea 裏分別執行他們,大概長這個樣子。

‍‍‍‍‍‍‍

具體看下較爲關心的兩個 main 方法。

Producer 的 main 方法,很簡單,簡化版如下。

DefaultMQProducer producer = new DefaultMQProducer("example");
producer.start();
for (int i = 0; i < 1000; i++) {
    Message msg = new Message(...);
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
}

執行後,會看到一堆消息作爲生產者被生產出來。

...
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84C03DF, offsetMsgId=0AE84A8400002A9F00000000000F7133, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2]queueOffset=1244]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84C03E0, offsetMsgId=0AE84A8400002A9F00000000000F71FE, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=3]queueOffset=1249]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E1, offsetMsgId=0AE84A8400002A9F00000000000F72C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=0]queueOffset=1244]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E2, offsetMsgId=0AE84A8400002A9F00000000000F7394, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=1]queueOffset=1251]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E3, offsetMsgId=0AE84A8400002A9F00000000000F745F, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2]queueOffset=1245]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E4, offsetMsgId=0AE84A8400002A9F00000000000F752A, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=3]queueOffset=1250]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E5, offsetMsgId=0AE84A8400002A9F00000000000F75F5, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=0]queueOffset=1245]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E6, offsetMsgId=0AE84A8400002A9F00000000000F76C0, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=1]queueOffset=1252]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E7, offsetMsgId=0AE84A8400002A9F00000000000F778B, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2]queueOffset=1246]
...

同樣,Consumer 的 mian 方法,也非常簡單。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest""*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

執行後會看到一堆消息被成功消費

...
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1239, sysFlag=0, bornTimestamp=1619580615742, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615742, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F5246, commitLogOffset=1004102, bodyCRC=493758879, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83E03B8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 53, 50]transactionId='null'}]] 
ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1238, sysFlag=0, bornTimestamp=1619580615740, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615740, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4F1A, commitLogOffset=1003290, bodyCRC=1688269248, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83C03B4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 56]transactionId='null'}]] 
ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1237, sysFlag=0, bornTimestamp=1619580615737, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615737, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4BEE, commitLogOffset=1002478, bodyCRC=1830206955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83903B0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 52]transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1236, sysFlag=0, bornTimestamp=1619580615735, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615735, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F48C2, commitLogOffset=1001666, bodyCRC=1786477042, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83703AC, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 48]transactionId='null'}]] 
ConsumeMessageThread_18 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1235, sysFlag=0, bornTimestamp=1619580615733, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615733, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4596, commitLogOffset=1000854, bodyCRC=1280920064, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83503A8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 54]transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1234, sysFlag=0, bornTimestamp=1619580615731, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615731, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F426A, commitLogOffset=1000042, bodyCRC=1261735449, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83303A4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 50]transactionId='null'}]] 
ConsumeMessageThread_10 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1233, sysFlag=0, bornTimestamp=1619580615729, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615729, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3F3E, commitLogOffset=999230, bodyCRC=855266886, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83103A0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 50, 56]transactionId='null'}]] 
ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1232, sysFlag=0, bornTimestamp=1619580615727, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615728, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3C12, commitLogOffset=998418, bodyCRC=994843245, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82F039C, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 50, 52]transactionId='null'}]] 
ConsumeMessageThread_11 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1231, sysFlag=0, bornTimestamp=1619580615725, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615726, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F38E6, commitLogOffset=997606, bodyCRC=1008852596, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82D0398, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 50, 48]transactionId='null'}]] 
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1230, sysFlag=0, bornTimestamp=1619580615723, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615724, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F35BA, commitLogOffset=996794, bodyCRC=2121214082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest'flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82B0394, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 49, 54]transactionId='null'}]] 
...

一個生產,一個消費,清晰明瞭,體驗很好。

嗯,至此爲止,quick-start 就基本通了,也就知道這玩意的最簡單的用法了。

看看整體架構

此時已經上手體驗了一把用法,來總結下。

1. 我啓動了一個 namesrv,暴露的端口是 9876。

2. 我又啓動了一個 broker,啓動時加了個參數 -n localhost:9876,很明顯,就是指向了剛剛那個 namesrv。

3. 然後我啓動 produer 發了一堆消息。

4. 最後我啓動了 consumer,就神奇地收到了這個消息。

啓動 producer 和 consumer 時都必須使用一個環境變量叫 NAMESRV_ADDR=localhost:9876

OK 了,此時我已經有了合理的猜測,producer 和 consumer 都是通過環境變量連接了 9876 這個端口,也就是 namesrv,然後通過這個 namesrv 能找到 broker,那我可以簡單畫出這樣的架構圖(這是我猜的,不一定對)。

嗯,有了這個自己的猜測,我去看了官方文檔中的架構部分:

https://github.com/apache/rocketmq/blob/master/docs/cn/architecture.md

上來就是一張圖。

哇塞,大差不差,只是官方文檔的圖是考慮到了集羣情況,其實我反倒覺得開始的圖不應該整合太多內容。

緊接着,下面就解釋了這四個東西都是啥,我簡寫下。

Producer:消息發佈的角色,選擇相應的 Broker 集羣隊列進行消息投遞。

Consumer:消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。

NameServer:一個非常簡單的 Topic 路由註冊中心,支持 Broker 的動態註冊與發現。

BrokerServer:Broker 主要負責消息的存儲、投遞和查詢,包括以下幾個子模塊:

Remoting Module:整個 Broker 的實體,負責處理來自 clients 端的請求。

Client Manager:負責管理客戶端(Producer/Consumer)和維護 Consumer 的 Topic 訂閱信息。

Store Service:提供方便簡單的 API 接口處理消息存儲到物理硬盤和查詢功能。

HA Service:高可用服務,提供 Master Broker 和 Slave Broker 之間的數據同步功能。

Index Service:根據特定的 Message key 對投遞到 Broker 的消息進行索引服務,以提供消息的快速查詢。

嗯,又有了些新概念,BrokerServer 還分成五個子模塊,看着有些蒙,先不管。

再往下看,到了部署架構,有了部署步驟。

嗯,跟我們剛剛 quick-start 部分的部署順序一樣,只不過步驟三的創建 Topic,可以提前創建,也可以在發送消息時自動創建,我們剛剛用的應該就是發消息時自動創建啦!這裏再拿個小本本記下來,如何提前創建 Topic,OK 不去管它。

再往下,就是最最最最重要的部分了,也就是設計原理

地址是這個:

https://github.com/apache/rocketmq/blob/master/docs/cn/design.md

這也就是我們研究 rocketmq 的原理,閱讀源碼,最需要看的部分,當然,也包括面試,滋滋滋。

設計中包含六個部分,分別是消息存儲、通信機制、消息過濾、負載均衡、事務消息、消息查詢

比如消息存儲,是整個設計部分的第一個版塊,上來就是一張勸退圖。

下面配上了文字講解。

之前的 quick-start 和整體架構的描述,可以被快速地理解,到這就不行了,就真得花時間開始細琢磨了。

但實際上呀,仔細看消息存儲版塊裏面的內容,包含三個部分:

1.1 消息存儲整體架構

1.2 頁緩存與內存映射

1.3 消息刷盤

這裏麪包括 IO 模型,內存映射,磁盤順序讀寫,PageCache,同步異步刷盤等通用的底層知識,如果這些都統統掌握,整個這一部分就跟拼積木一樣,很順利拼起來了。

這回知道底層知識有啥用了吧?起碼能讓你更深入和快速理解一個由它們拼起來的上層技術。

這塊就沒法完全展開啦,不然就成一篇講 rocketmq 原理的文章了,我拿出這裏的一句話。

另外,RocketMQ 主要通過 MappedByteBuffer 對文件進行讀寫操作。其中,利用了 NIO 中的 FileChannel 模型將磁盤上的物理文件直接映射到用戶態的內存地址中(這種 Mmap 的方式減少了傳統 IO 將磁盤文件數據在操作系統內核地址空間的緩衝區和用戶應用程序地址空間的緩衝區之間來回進行拷貝的性能開銷),將對文件的操作轉化爲直接對內存地址進行操作,從而極大地提高了文件的讀寫效率(正因爲需要使用內存映射機制,故 RocketMQ 的文件存儲都使用定長結構來存儲,方便一次將整個文件映射至內存)。

這段話如果你對零拷貝這個底層概念瞭解,其實整段話一秒鐘就看完了,而且會覺得它是廢話,就像是在湊字數一樣。(當然這個我做不到哈,我對底層還沒有了解很透徹,所以用了 10 秒,理解到了一個馬馬虎虎的程度)

見仁見智

再接下來就是見仁見智的部分啦,如果是已經有了非常多技術深入學習經驗的大牛,直接根據設計文檔,再對照源碼即可,對大牛來說是最快的方式。

如果不是的話,可以先找一些市面上比較經典的入門書籍,或者質量高的視頻教程,過一遍,然後再配合設計文檔和源碼,基本就能把這個技術喫透了。

學多了之後你會發現,好多底層的技術或中間件,和我們應用層一樣,也是有套路的,也是拼積木拼起來的。所以今後在你想學一門新技術時,別想着學它有沒有用,面試會不會考,起碼你看多了之後會發現,越新技術越學越快,學到最後你會發現根本就沒有新技術,一切都是在拼積木。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/G4w9kEuZqojOXct9ijJXkw