爲什麼說 MQTT 適用於大規模物聯網場景
一、MQTT 是什麼
1.1、MQTT 基本概念
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈 / 訂閱(publish/subscribe)模式的 "輕量級" 通訊協議,該協議構建於 TCP/IP 協議上,由 IBM 在 1999 年發佈。MQTT 最大優點在於,可以以極少的代碼和有限的帶寬,爲連接遠程設備提供實時可靠的消息服務。作爲一種低開銷、低帶寬佔用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。
1.2、MQTT 協議實現方式
在 MQTT 協議通信機制中有三個角色:消息發佈者(publisher)、代理服務器(broker)和消息訂閱者(subscriber)。其中,消息的發佈者和訂閱者都是客戶端,消息代理是服務器,消息發佈者可以同時是訂閱者。消息從發佈者發送到代理服務器,然後被訂閱者接收,而主題就是發佈者與訂閱者之間約定的消息通道。
一個使用 MQTT 協議的應用程序或者設備,它總是建立到服務器的網絡連接。
客戶端可以:
發佈其他客戶端可能會訂閱的信息;
訂閱其它客戶端發佈的消息;
退訂或刪除應用程序的消息;
斷開與服務器連接。
MQTT 服務器又稱爲 " 消息代理 "(Broker),可以是一個應用程序或一臺設備。它是位於消息發佈者和訂閱者之間,它可以:
接受來自客戶的網絡連接;
接受客戶發佈的應用信息;
處理來自客戶端的訂閱和退訂請求;
向訂閱的客戶轉發應用程序消息。
MQTT 傳輸的消息分爲:主題(Topic)和消息(payload)兩部分:
Topic,可以理解爲消息的類型,訂閱者訂閱(Subscribe)後,就會收到該主題的消息內容(payload);
payload,可以理解爲消息的內容,是指訂閱者具體要使用的內容。
1.3、MQTT 協議中的方法
MQTT 協議中定義了一些方法(也被稱爲動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的數據或動態生成數據,這取決於服務器的實現。通常來說,資源指服務器上的文件或輸出。主要方法有:
CONNECT:客戶端連接到服務器
CONNACK:連接確認
PUBLISH:發佈消息
PUBACK:發佈確認
PUBREC:發佈的消息已接收
PUBREL:發佈的消息已釋放
PUBCOMP:發佈完成
SUBSCRIBE:訂閱請求
SUBACK:訂閱確認
UNSUBSCRIBE:取消訂閱
UNSUBACK:取消訂閱確認
PINGREQ:客戶端發送心跳
PINGRESP:服務端心跳響應
DISCONNECT:斷開連接
AUTH:認證
二、爲什麼用 MQTT
MQTT 協議的特性在各領域中應用廣泛。
2.1、 主要特性
由於物聯網的環境是非常特別的,所以 MQTT 遵循以下設計原則:
-
輕量級:MQTT 的設計非常輕量,協議頭部非常小,傳輸的數據量很小,適用於帶寬有限的網絡環境,如低速、高延遲或不穩定的網絡。
-
簡單:MQTT 的協議規範相對簡單,易於實現和部署。它定義了少量的消息類型和協議操作,使得開發人員可以快速上手。
-
異步通信:MQTT 使用異步通信模式,發佈者發送消息後,不需要等待接收者的響應,可以繼續執行其他操作。這種異步通信模式適合在資源有限的設備和網絡中工作。
-
可靠性(提供服務質量管理):MQTT 支持三種不同的消息傳遞質量(QoS)級別:QoS 0(至多一次),QoS 1(至少一次)和 QoS 2(只有一次)。這使得可以根據應用程序的要求選擇適當的消息交付保證級別。
-
網絡狀況適應性:MQTT 可以適應不穩定的網絡狀況,如網絡中斷、重連等。它具有斷開連接後自動重連的機制,可以確保消息的可靠傳輸。
-
安全:MQTT 可使用 TLS 加密消息,使用 OAuth 進行身份驗證
-
大規模物聯網設備支持:物聯網系統往往涉及大量設備,需要一種能夠處理大規模部署的協議。Mqtt 的輕量級特性,低帶寬消耗和對資源的高效利用成爲大規模物聯網應用的理想選擇。
2.2、 應用領域廣泛
MQTT 協議廣泛應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。
物聯網 M2M 通信,物聯網大數據採集
Android 消息推送,WEB 消息推送
移動即時消息,例如 Facebook Messenger
智能硬件、智能家居、智能電器
車聯網通信,電動車站樁採集
智慧城市、遠程醫療、遠程教育
電力、石油與能源等行業市場
2.3、 異步通信 (發佈 - 訂閱模式)
發佈訂閱模式(Publish-Subscribe Pattern)是一種消息傳遞模式,它將發送消息的客戶端(發佈者)與接收消息的客戶端(訂閱者)解耦,使得兩者不需要建立直接的聯繫也不需要知道對方的存在。
MQTT 發佈 / 訂閱模式的精髓在於由一個被稱爲代理(Broker)的中間角色負責所有消息的路由和分發工作,發佈者將帶有主題的消息發送給代理,訂閱者則向代理訂閱主題來接收感興趣的消息。
在 MQTT 中,主題和訂閱無法被提前註冊或創建,所以代理也無法預知某一個主題之後是否會有訂閱者,以及會有多少訂閱者,所以只能將消息轉發給當前的訂閱者,如果當前不存在任何訂閱,那麼消息將被直接丟棄。
MQTT 發佈 / 訂閱模式有 4 個主要組成部分:發佈者、訂閱者、代理和主題。
-
發佈者(Publisher):負責將消息發佈到主題上,發佈者一次只能向一個主題發送數據,發佈者發佈消息時也無需關心訂閱者是否在線。
-
訂閱者(Subscriber):訂閱者通過訂閱主題接收消息,且可一次訂閱多個主題。MQTT 還支持通過共享訂閱的方式在多個訂閱者之間實現訂閱的負載均衡。
-
代理(Broker):負責接收發布者的消息,並將消息轉發至符合條件的訂閱者。另外,代理也需要負責處理客戶端發起的連接、斷開連接、訂閱、取消訂閱等請求。
-
主題(Topic):主題是 MQTT 進行消息路由的基礎,它類似 URL 路徑,使用斜槓 / 進行分層,比如 sensor/1/temperature。一個主題可以有多個訂閱者,代理會將該主題下的消息轉發給所有訂閱者;一個主題也可以有多個發佈者,代理將按照消息到達的順序轉發。
發佈者和訂閱者都是 MQTT 客戶端,它們可以同時發佈和訂閱。這意味着一個客戶端可以向特定主題發佈消息,同時也可以訂閱其他主題以接收消息。在 MQTT 中,發佈和訂閱是獨立的操作,客戶端可以根據需要進行發佈和訂閱。
2.4、MQTT 服務質量 QoS
很多時候,使用 MQTT 協議的設備都運行在網絡受限的環境下,而只依靠底層的 TCP 傳輸協議,並不能完全保證消息的可靠到達。因此,MQTT 提供了 QoS 機制,其核心是設計了多種消息交互機制來提供不同的服務質量,來滿足用戶在各種場景下對消息可靠性的要求。每條消息都可以在發佈時獨立設置自己的 QoS。
MQTT 定義了三個 QoS 等級,分別爲:
-
QoS 0:最多交付一次,消息可能丟失
-
QoS 1:至少交付一次,消息可以保證到達,但是可能重複
-
QoS 2:恰好交付一次,消息保證到達,並且不會重複
1、QoS 0
消息最多傳遞一次,如果當時 MQTT 客戶端不可用,則會丟失該消息。Sender (可能是 Publisher 或者 Broker) 發送一條消息之後,就不再關心它有沒有發送到對方,也不設置任何重發機制。
爲什麼 QoS 0 消息會丟失?
當我們使用 QoS 0 傳遞消息時,消息的可靠性完全依賴於底層的 TCP 協議。而 TCP 只能保證在連接穩定不關閉的情況下消息的可靠到達,一旦出現連接關閉、重置,仍有可能丟失當前處於網絡鏈路或操作系統底層緩衝區中的消息,這也是 QoS 0 消息最主要的丟失場景。
2、QoS 1
消息傳遞至少 1 次。包含了簡單的重發機制,Sender 發送消息之後等待接收者的 ACK,如果沒收到 ACK 則重新發送消息。這種模式能保證消息至少能到達一次,但無法保證消息重複。
爲什麼 QoS 1 消息會重複?
對於發送方來說,沒收到 PUBACK 報文:
- PUBLISH 已經到達接收方,接收方的 PUBACK 報文還未到達發送方
在發送方重傳時,接收方已經收到過這個 PUBLISH 報文,這就導致接收方將收到重複的消息。
3、QoS 2
消息僅傳送一次。設計了重發和重複消息發現機制,保證消息到達對方並且嚴格只到達一次。
QoS 2 解決了 QoS 0、1 消息可能丟失或者重複的問題,但相應地,它也帶來了最複雜的交互流程和最高的開銷。每一次的 QoS 2 消息投遞,都要求發送方與接收方進行至少兩次請求 / 響應流程。
2.5、車聯網場景中的消息 QoS 設計
首先需要明確的是 QoS 級別越高,消息交互越複雜,系統資源消耗越大,所以 QoS 等級不是設置的越高越好。應用程序可以根據自己的網絡場景和業務需求,選擇合適的 QoS 級別。
如在車聯網領域內,根據車聯網信息服務相關數據的屬性和特徵,我們可以將其分爲六類:基礎屬性類數據、車輛工控類數據、環境感知類數據、車控類數據、應用服務類數據和用戶個人信息。那麼在不同的車聯網場景中如何選擇 MQTT QoS 等級呢?
以下情況下可以選擇 QoS 0
-
可以接受消息偶爾丟失的場景下可以選擇 QoS 0。
-
車聯網提供的與娛樂相關的多媒體服務,如天氣預報等數據等。還有部分涉車服務類數據,如車輛歷史行車數據的上報、歷史行車操作數據等。
以下情況下可以選擇 QoS 1
-
車聯網 大部分場景都是選用 QoS1,它實現了系統資源性能和消息實時性、可靠性最優化。
-
QoS 1 廣泛運用於控車消息、行車上報數據(含新能源國標和企標)、交通安全管控類數據,和交通安全、道路安全相關的預警數據。
以下情況下可以選擇 QoS 2
-
對於不能忍受消息丟失,且不希望收到重複的消息,數據完整性與及時性要求較高的場景,可以選擇 QoS 2。
-
車聯網場景中 QoS 2 的應用並不多,雖然其可以增加消息可靠性,但同時也使資源消耗和消息時延大幅增加。QoS 2 主要運用於對數據完整性與及時性要求較高的銀行、消防、航空等行業,有些主機廠的行車告警和車輛充電樁計費費單消息會選擇採用 QoS 2。
Tips:
需要注意的是 MQTT 發佈與訂閱操作中的 QoS 代表了不同的含義,發佈時的 QoS 表示消息發送到 MQTT 服務器 使用的 QoS 等級,訂閱時的 QoS 表示 MQTT Broker 向自己轉發消息時可以使用的最大 QoS 等級。需要保障發送與訂閱的 QoS 一致,才能確保最終收到的消息是固定的 QoS 等級,否則會出現消費降級的情況。例如:A 發送的消息 QoS 爲 2,B 訂閱的消息 QoS 爲 1,則最終接收到消息的 QoS 爲 1。
2.6、MQTT 安全
可以通過如下方法增強 MQTT 安全:
-
使用 TLS/SSL:通過使用傳輸層安全協議(TLS)或安全套接層協議(SSL),可以對 MQTT 通信進行加密,防止數據被竊聽和篡改。
-
身份認證:在 MQTT 連接建立時,進行客戶端和服務器之間的身份驗證,以確保通信雙方的合法性。常見的身份認證方式包括用戶名 / 密碼認證、客戶端證書認證等。
-
訪問控制:通過配置合適的訪問控制策略,限制用戶對 MQTT Broker 的訪問權限。例如,可以限制某些主題只允許特定的客戶端訂閱或發佈。
-
消息加密:對於特別敏感的數據,可以在發佈消息時進行端到端的加密,確保只有預期的接收方能夠解密並查看消息內容。
-
安全審計:對 MQTT 通信進行安全審計,記錄關鍵事件和操作,以便監控和追蹤潛在的安全問題。
三、怎麼使用 MQTT
3.1、MQTT Broker(代理服務)
MQTT 代理有多種實現方式,以滿足不同的需求,如開源、商業和託管雲服務。HiveMQ 提供商業版:HiveMQ 自託管和 HiveMQ 雲,一個託管的雲 MQTT 服務,以及 HiveMQ 社區版,一個開源版本。有關 MQTT 代理的詳細列表,請訪問 mqtt.cn 。有 HiveMQ、EMQX 等。
3.2、MQTT Client Libraries
支持 Java、Go、Python、Swift、JS 等等,幾乎常用的都支持。
https://mqtt.org/software/
3.3、使用案例示例
Mqtt Client:java mqbroker:HiveMq
設備或網關在接入物聯網平臺時首先需要和平臺建立連接,從而將設備或網關與平臺進行關聯。開發者通過傳入設備信息,將設備或網關連接到物聯網平臺。
MQTT 基本信息
MQTT 賬號密碼
- 在建立連接之前,先修改以下參數:
// MQTT 服務器地址
private static final String MQTT_BROKER = "tcp://iot.modbus.cn:1883"; MQTT地址/端口號
// MQTT 客戶端 ID
private static final String MQTT_CLIENT_ID = "4QR8TZ9ThuL4G"; //設備號SN碼
// MQTT 用戶名
private static final String MQTT_USERNAME = "ceshi";
// MQTT 密碼
private static final String MQTT_PASSWORD = "Abc123456";
// 訂閱的主題
private static final String MQTT_TOPIC_SUBSCRIBE = "/server/coo/4QR8TZ9ThuL4G";
// 發佈的主題
private static final String MQTT_TOPIC_PUBLISH = "/dev/coo/4QR8TZ9ThuL4G";
-
MQTT_BROKER 爲物聯網平臺的設備對接地址,可參考平臺對接信息獲取(獲取的是域名信息,可通過在 cmd 命令框中執行 “ping 域名”,獲取 IP 地址)。
-
MQTT_CLIENT_ID 爲設備號 SN,在成功添加設備後獲取。
-
修改完 1 中的參數後就可使用 MqttClient 建立連接了。
-
連接成功後,設備顯示在線。
備註:從設備地址是 sensor_device_id ,寄存器是 port_id,數據數值是 Sdata ,具體稍後有具體說明。
使用 MQTT 通訊協議,下面的屬性名稱,Modbus 功能碼,數據格式,數據順序不會影響最終數據,隨便選擇就可以!
MQTT 主題一覽
Modbus 物聯網平臺 作爲物聯網 PaaS 雲平臺,對設備 MQTT 接入提供了內置的訪問協議規範,讓設備和雲平臺的消息通信更加有章可循,大大簡化了物聯網項目的開發難度,縮短了產品的開發週期。
不同於普通的 MQTT 使用方式,我們提供了標準的內置主題,這足以實現絕大多數的物聯網應用場景。
4QR8TZ9ThuL4G 爲示例設備號 SN,請替換爲自己的!
上行通信
Topic:/dev/coo/4QR8TZ9ThuL4G 設備號 SN(請替換爲自己的)
// 連接到 MQTT 服務器
mqttClient.connect(options);
// 訂閱主題
mqttClient.subscribe(MQTT_TOPIC_SUBSCRIBE);
// 創建消息
MqttMessage message = new MqttMessage();
message.setPayload("[{"sensor_device_id": 1, "port_id": 1, "sdata": 66}]".getBytes());
// 發佈消息
mqttClient.publish(MQTT_TOPIC_PUBLISH, message);
備註:從設備地址是 sensor_device_id ,寄存器是 port_id,數據數值是 Sdata
下行通信
Topic:/server/coo/4QR8TZ9ThuL4G 設備號SN(請替換爲自己的)
System.out.println("設備ID: " + sensor_device_id + ", 端口ID: " + port_id + ", 數據: " + sdata);
備註:從設備地址是sensor_device_id ,寄存器是port_id,數據數值是Sdata
完整代碼:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONArray;
import org.json.JSONObject;
public class MqttDemo {
// MQTT 服務器地址
private static final String MQTT_BROKER = "tcp://iot.modbus.cn:1883";
// MQTT 客戶端 ID
private static final String MQTT_CLIENT_ID = "4QR8TZ9ThuL4G";
// MQTT 用戶名
private static final String MQTT_USERNAME = "ceshi";
// MQTT 密碼
private static final String MQTT_PASSWORD = "Abc123456";
// 訂閱的主題
private static final String MQTT_TOPIC_SUBSCRIBE = "/server/coo/4QR8TZ9ThuL4G";
// 發佈的主題
private static final String MQTT_TOPIC_PUBLISH = "/dev/coo/4QR8TZ9ThuL4G";
// 心跳間隔,單位爲秒
private static final int MQTT_KEEP_ALIVE_INTERVAL = 60;
public static void main(String[] args) throws MqttException {
// 創建 MQTT 客戶端
MqttClient mqttClient = new MqttClient(MQTT_BROKER, MQTT_CLIENT_ID, new MemoryPersistence());
// 設置 MQTT 連接選項
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(MQTT_USERNAME);
options.setPassword(MQTT_PASSWORD.toCharArray());
options.setCleanSession(true);
options.setKeepAliveInterval(MQTT_KEEP_ALIVE_INTERVAL);
// 設置回調
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("連接斷開");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("收到消息。主題: " + topic);
System.out.println("消息: " + new String(message.getPayload()));
// 將消息轉換爲字符串
String msg = new String(message.getPayload());
// 使用 JSON 工具解析字符串
JSONArray jsonArray = new JSONArray(msg);
for (int i = 0; i < jsonArray.length(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
int sensor_device_id = jsonObject.getInt("sensor_device_id");
int port_id = jsonObject.getInt("port_id");
int sdata = jsonObject.getInt("sdata");
// 在這裏你可以處理接收到的數據
System.out.println("設備ID: " + sensor_device_id + ", 端口ID: " + port_id + ", 數據: " + sdata);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("消息發佈完成");
}
});
// 連接到 MQTT 服務器
mqttClient.connect(options);
// 訂閱主題
mqttClient.subscribe(MQTT_TOPIC_SUBSCRIBE);
// 創建消息
MqttMessage message = new MqttMessage();
message.setPayload("[{"sensor_device_id": 1, "port_id": 1, "sdata": 66}]".getBytes());
// 發佈消息
mqttClient.publish(MQTT_TOPIC_PUBLISH, message);
}
}
四、小結
1、MQTT 是一種輕量級的遠程傳輸協議,可以以極少的代碼和有限的帶寬,爲連接遠程設備提供實時可靠的消息服務。憑藉着異步、穩定性使其在物聯網應用廣泛。
2、Mqtt 爲什麼能支持大規模物聯網設備?
MQTT 是一種輕量級的消息協議,特別適合大規模物聯網(IoT)設備的通信。它在大規模 IoT 場景中得到廣泛使用,主要因爲以下幾個關鍵特性:
1)、低帶寬和高效性
MQTT 設計簡單,傳輸協議非常輕量,數據包開銷小,特別適合帶寬有限的網絡環境。它的傳輸效率高,不會佔用大量網絡資源,這樣成千上萬的 IoT 設備也可以同時進行通信。
2)、發佈 - 訂閱模式
MQTT 使用發佈 - 訂閱模型,設備之間的通信通過中心化的消息代理(Broker)來管理。設備可以發佈消息到主題(Topic),並且只需訂閱自己關心的主題即可獲取消息。這種模式使得數十萬設備能夠高效地共享信息,減少了點對點通信的負擔。
3)、可靠安全性
支持三種服務質量(QoS)級別,確保數據的傳遞可靠性。根據不同的應用場景和網絡條件,用戶可以選擇不同的 QoS,以平衡數據可靠性和網絡開銷。
總之,MQTT 協議設計簡潔,資源佔用低,通過 Broker 管理設備通信,支持大規模併發連接,因此特別適合大規模 IoT 部署場景。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/n3dycD3bKGahBOIzN-d8Vg