爲什麼說 MQTT 適用於大規模物聯網場景

一、MQTT 是什麼

1.1、MQTT 基本概念

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈 / 訂閱(publish/subscribe)模式的 "輕量級" 通訊協議,該協議構建於 TCP/IP 協議上,由 IBM 在 1999 年發佈。MQTT 最大優點在於,可以以極少的代碼和有限的帶寬,爲連接遠程設備提供實時可靠的消息服務。作爲一種低開銷、低帶寬佔用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。

1.2、MQTT 協議實現方式

在 MQTT 協議通信機制中有三個角色:消息發佈者(publisher)、代理服務器(broker)和消息訂閱者(subscriber)。其中,消息的發佈者和訂閱者都是客戶端,消息代理是服務器,消息發佈者可以同時是訂閱者。消息從發佈者發送到代理服務器,然後被訂閱者接收,而主題就是發佈者與訂閱者之間約定的消息通道。

一個使用 MQTT 協議的應用程序或者設備,它總是建立到服務器的網絡連接。

客戶端可以:

發佈其他客戶端可能會訂閱的信息;

訂閱其它客戶端發佈的消息;

退訂或刪除應用程序的消息;

斷開與服務器連接。

MQTT 服務器又稱爲 " 消息代理 "(Broker),可以是一個應用程序或一臺設備。它是位於消息發佈者和訂閱者之間,它可以:

接受來自客戶的網絡連接;

接受客戶發佈的應用信息;

處理來自客戶端的訂閱和退訂請求;

向訂閱的客戶轉發應用程序消息。

MQTT 傳輸的消息分爲:主題(Topic)和消息(payload)兩部分:

Topic,可以理解爲消息的類型,訂閱者訂閱(Subscribe)後,就會收到該主題的消息內容(payload);

payload,可以理解爲消息的內容,是指訂閱者具體要使用的內容。

1.3、MQTT 協議中的方法

MQTT 協議中定義了一些方法(也被稱爲動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的數據或動態生成數據,這取決於服務器的實現。通常來說,資源指服務器上的文件或輸出。主要方法有:

CONNECT:客戶端連接到服務器

CONNACK:連接確認

PUBLISH:發佈消息

PUBACK:發佈確認

PUBREC:發佈的消息已接收

PUBREL:發佈的消息已釋放

PUBCOMP:發佈完成

SUBSCRIBE:訂閱請求

SUBACK:訂閱確認

UNSUBSCRIBE:取消訂閱

UNSUBACK:取消訂閱確認

PINGREQ:客戶端發送心跳

PINGRESP:服務端心跳響應

DISCONNECT:斷開連接

AUTH:認證

二、爲什麼用 MQTT

MQTT 協議的特性在各領域中應用廣泛。

2.1、 主要特性

由於物聯網的環境是非常特別的,所以 MQTT 遵循以下設計原則:

2.2、 應用領域廣泛

MQTT 協議廣泛應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。

物聯網 M2M 通信,物聯網大數據採集

Android 消息推送,WEB 消息推送

移動即時消息,例如 Facebook Messenger

智能硬件、智能家居、智能電器

車聯網通信,電動車站樁採集

智慧城市、遠程醫療、遠程教育

電力、石油與能源等行業市場

2.3、 異步通信 (發佈 - 訂閱模式)

發佈訂閱模式(Publish-Subscribe Pattern)是一種消息傳遞模式,它將發送消息的客戶端(發佈者)與接收消息的客戶端(訂閱者)解耦,使得兩者不需要建立直接的聯繫也不需要知道對方的存在。

MQTT 發佈 / 訂閱模式的精髓在於由一個被稱爲代理(Broker)的中間角色負責所有消息的路由和分發工作,發佈者將帶有主題的消息發送給代理,訂閱者則向代理訂閱主題來接收感興趣的消息。

在 MQTT 中,主題和訂閱無法被提前註冊或創建,所以代理也無法預知某一個主題之後是否會有訂閱者,以及會有多少訂閱者,所以只能將消息轉發給當前的訂閱者,如果當前不存在任何訂閱,那麼消息將被直接丟棄。

MQTT 發佈 / 訂閱模式有 4 個主要組成部分:發佈者、訂閱者、代理和主題。

發佈者和訂閱者都是 MQTT 客戶端,它們可以同時發佈和訂閱。這意味着一個客戶端可以向特定主題發佈消息,同時也可以訂閱其他主題以接收消息。在 MQTT 中,發佈和訂閱是獨立的操作,客戶端可以根據需要進行發佈和訂閱。

2.4、MQTT 服務質量 QoS

很多時候,使用 MQTT 協議的設備都運行在網絡受限的環境下,而只依靠底層的 TCP 傳輸協議,並不能完全保證消息的可靠到達。因此,MQTT 提供了 QoS 機制,其核心是設計了多種消息交互機制來提供不同的服務質量,來滿足用戶在各種場景下對消息可靠性的要求。每條消息都可以在發佈時獨立設置自己的 QoS。

MQTT 定義了三個 QoS 等級,分別爲:

1、QoS 0

消息最多傳遞一次,如果當時 MQTT 客戶端不可用,則會丟失該消息。Sender (可能是 Publisher 或者 Broker) 發送一條消息之後,就不再關心它有沒有發送到對方,也不設置任何重發機制。

爲什麼 QoS 0 消息會丟失?

當我們使用 QoS 0 傳遞消息時,消息的可靠性完全依賴於底層的 TCP 協議。而 TCP 只能保證在連接穩定不關閉的情況下消息的可靠到達,一旦出現連接關閉、重置,仍有可能丟失當前處於網絡鏈路或操作系統底層緩衝區中的消息,這也是 QoS 0 消息最主要的丟失場景。

2、QoS 1

消息傳遞至少 1 次。包含了簡單的重發機制,Sender 發送消息之後等待接收者的 ACK,如果沒收到 ACK 則重新發送消息。這種模式能保證消息至少能到達一次,但無法保證消息重複。

爲什麼 QoS 1 消息會重複?

對於發送方來說,沒收到 PUBACK 報文:

在發送方重傳時,接收方已經收到過這個 PUBLISH 報文,這就導致接收方將收到重複的消息。

3、QoS 2

消息僅傳送一次。設計了重發和重複消息發現機制,保證消息到達對方並且嚴格只到達一次。

QoS 2 解決了 QoS 0、1 消息可能丟失或者重複的問題,但相應地,它也帶來了最複雜的交互流程和最高的開銷。每一次的 QoS 2 消息投遞,都要求發送方與接收方進行至少兩次請求 / 響應流程。

2.5、車聯網場景中的消息 QoS 設計

首先需要明確的是 QoS 級別越高,消息交互越複雜,系統資源消耗越大,所以 QoS 等級不是設置的越高越好。應用程序可以根據自己的網絡場景和業務需求,選擇合適的 QoS 級別。

如在車聯網領域內,根據車聯網信息服務相關數據的屬性和特徵,我們可以將其分爲六類:基礎屬性類數據、車輛工控類數據、環境感知類數據、車控類數據、應用服務類數據和用戶個人信息。那麼在不同的車聯網場景中如何選擇 MQTT QoS 等級呢?

以下情況下可以選擇 QoS 0

以下情況下可以選擇 QoS 1

以下情況下可以選擇 QoS 2

Tips:

需要注意的是 MQTT 發佈與訂閱操作中的 QoS 代表了不同的含義,發佈時的 QoS 表示消息發送到 MQTT 服務器 使用的 QoS 等級,訂閱時的 QoS 表示 MQTT Broker 向自己轉發消息時可以使用的最大 QoS 等級。需要保障發送與訂閱的 QoS 一致,才能確保最終收到的消息是固定的 QoS 等級,否則會出現消費降級的情況。例如:A 發送的消息 QoS 爲 2,B 訂閱的消息 QoS 爲 1,則最終接收到消息的 QoS 爲 1。

2.6、MQTT 安全

可以通過如下方法增強 MQTT 安全:

  1. 使用 TLS/SSL:通過使用傳輸層安全協議(TLS)或安全套接層協議(SSL),可以對 MQTT 通信進行加密,防止數據被竊聽和篡改。

  2. 身份認證:在 MQTT 連接建立時,進行客戶端和服務器之間的身份驗證,以確保通信雙方的合法性。常見的身份認證方式包括用戶名 / 密碼認證、客戶端證書認證等。

  3. 訪問控制:通過配置合適的訪問控制策略,限制用戶對 MQTT Broker 的訪問權限。例如,可以限制某些主題只允許特定的客戶端訂閱或發佈。

  4. 消息加密:對於特別敏感的數據,可以在發佈消息時進行端到端的加密,確保只有預期的接收方能夠解密並查看消息內容。

  5. 安全審計:對 MQTT 通信進行安全審計,記錄關鍵事件和操作,以便監控和追蹤潛在的安全問題。

三、怎麼使用 MQTT

3.1、MQTT Broker(代理服務)

MQTT 代理有多種實現方式,以滿足不同的需求,如開源、商業和託管雲服務。HiveMQ 提供商業版:HiveMQ 自託管和 HiveMQ 雲,一個託管的雲 MQTT 服務,以及 HiveMQ 社區版,一個開源版本。有關 MQTT 代理的詳細列表,請訪問 mqtt.cn 。有 HiveMQ、EMQX 等。

3.2、MQTT Client Libraries

支持 Java、Go、Python、Swift、JS 等等,幾乎常用的都支持。

https://mqtt.org/software/

3.3、使用案例示例

Mqtt Client:java   mqbroker:HiveMq

設備或網關在接入物聯網平臺時首先需要和平臺建立連接,從而將設備或網關與平臺進行關聯。開發者通過傳入設備信息,將設備或網關連接到物聯網平臺。

MQTT 基本信息

Gxl1x3

MQTT 賬號密碼

uc6KUc

  1. 在建立連接之前,先修改以下參數:
// MQTT 服務器地址
private static final String MQTT_BROKER = "tcp://iot.modbus.cn:1883"; MQTT地址/端口號
// MQTT 客戶端 ID
private static final String MQTT_CLIENT_ID = "4QR8TZ9ThuL4G";  //設備號SN碼
// MQTT 用戶名
private static final String MQTT_USERNAME = "ceshi";
// MQTT 密碼
private static final String MQTT_PASSWORD = "Abc123456";
// 訂閱的主題
private static final String MQTT_TOPIC_SUBSCRIBE = "/server/coo/4QR8TZ9ThuL4G";
// 發佈的主題
private static final String MQTT_TOPIC_PUBLISH = "/dev/coo/4QR8TZ9ThuL4G";
  1. 修改完 1 中的參數後就可使用 MqttClient 建立連接了。

  2. 連接成功後,設備顯示在線。

備註:從設備地址是 sensor_device_id ,寄存器是 port_id,數據數值是 Sdata ,具體稍後有具體說明。

使用 MQTT 通訊協議,下面的屬性名稱,Modbus 功能碼,數據格式,數據順序不會影響最終數據,隨便選擇就可以!

MQTT 主題一覽

Modbus 物聯網平臺 作爲物聯網 PaaS 雲平臺,對設備 MQTT 接入提供了內置的訪問協議規範,讓設備和雲平臺的消息通信更加有章可循,大大簡化了物聯網項目的開發難度,縮短了產品的開發週期。

不同於普通的 MQTT 使用方式,我們提供了標準的內置主題,這足以實現絕大多數的物聯網應用場景。

XWR5Um

4QR8TZ9ThuL4G 爲示例設備號 SN,請替換爲自己的!

上行通信

Topic:/dev/coo/4QR8TZ9ThuL4G 設備號 SN(請替換爲自己的)

// 連接到 MQTT 服務器
mqttClient.connect(options);
// 訂閱主題
mqttClient.subscribe(MQTT_TOPIC_SUBSCRIBE);
// 創建消息
MqttMessage message = new MqttMessage();
message.setPayload("[{"sensor_device_id": 1, "port_id": 1, "sdata": 66}]".getBytes());
// 發佈消息
mqttClient.publish(MQTT_TOPIC_PUBLISH, message);

備註:從設備地址是 sensor_device_id ,寄存器是 port_id,數據數值是 Sdata

下行通信

Topic:/server/coo/4QR8TZ9ThuL4G 設備號SN(請替換爲自己的)
System.out.println("設備ID: " + sensor_device_id + ", 端口ID: " + port_id + ", 數據: " + sdata);

備註:從設備地址是sensor_device_id ,寄存器是port_id,數據數值是Sdata

完整代碼:

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONArray;
import org.json.JSONObject;

public class MqttDemo {
    // MQTT 服務器地址
    private static final String MQTT_BROKER = "tcp://iot.modbus.cn:1883";
      // MQTT 客戶端 ID
    private static final String MQTT_CLIENT_ID = "4QR8TZ9ThuL4G";
      // MQTT 用戶名
    private static final String MQTT_USERNAME = "ceshi";
      // MQTT 密碼
    private static final String MQTT_PASSWORD = "Abc123456";
      // 訂閱的主題
    private static final String MQTT_TOPIC_SUBSCRIBE = "/server/coo/4QR8TZ9ThuL4G";
      // 發佈的主題
    private static final String MQTT_TOPIC_PUBLISH = "/dev/coo/4QR8TZ9ThuL4G";
      // 心跳間隔,單位爲秒
    private static final int MQTT_KEEP_ALIVE_INTERVAL = 60;

    public static void main(String[] args) throws MqttException {
    // 創建 MQTT 客戶端
    MqttClient mqttClient = new MqttClient(MQTT_BROKER, MQTT_CLIENT_ID, new MemoryPersistence());
    // 設置 MQTT 連接選項
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(MQTT_USERNAME);
    options.setPassword(MQTT_PASSWORD.toCharArray());
    options.setCleanSession(true);
    options.setKeepAliveInterval(MQTT_KEEP_ALIVE_INTERVAL);

    // 設置回調
    mqttClient.setCallback(new MqttCallback() {
      @Override
      public void connectionLost(Throwable cause) {
        System.out.println("連接斷開");
      }

      @Override
      public void messageArrived(String topic, MqttMessage message) throws Exception {
      System.out.println("收到消息。主題: " + topic);
      System.out.println("消息: " + new String(message.getPayload()));
          // 將消息轉換爲字符串
          String msg = new String(message.getPayload());
          // 使用 JSON 工具解析字符串
          JSONArray jsonArray = new JSONArray(msg);
          for (int i = 0; i < jsonArray.length(); i++) {
          JSONObject jsonObject = jsonArray.getJSONObject(i);
          int sensor_device_id = jsonObject.getInt("sensor_device_id");
          int port_id = jsonObject.getInt("port_id");
          int sdata = jsonObject.getInt("sdata");
          // 在這裏你可以處理接收到的數據
          System.out.println("設備ID: " + sensor_device_id + ", 端口ID: " + port_id + ", 數據: " + sdata);
        }
      }

      @Override
      public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("消息發佈完成");
      }
    });

    // 連接到 MQTT 服務器
    mqttClient.connect(options);

    // 訂閱主題
    mqttClient.subscribe(MQTT_TOPIC_SUBSCRIBE);

    // 創建消息
    MqttMessage message = new MqttMessage();
    message.setPayload("[{"sensor_device_id": 1, "port_id": 1, "sdata": 66}]".getBytes());

    // 發佈消息
      mqttClient.publish(MQTT_TOPIC_PUBLISH, message);
    }
}

四、小結

1、MQTT 是一種輕量級的遠程傳輸協議,可以以極少的代碼和有限的帶寬,爲連接遠程設備提供實時可靠的消息服務。憑藉着異步、穩定性使其在物聯網應用廣泛。

2、Mqtt 爲什麼能支持大規模物聯網設備?

MQTT 是一種輕量級的消息協議,特別適合大規模物聯網(IoT)設備的通信。它在大規模 IoT 場景中得到廣泛使用,主要因爲以下幾個關鍵特性:

1)、低帶寬和高效性

MQTT 設計簡單,傳輸協議非常輕量,數據包開銷小,特別適合帶寬有限的網絡環境。它的傳輸效率高,不會佔用大量網絡資源,這樣成千上萬的 IoT 設備也可以同時進行通信。

2)、發佈 - 訂閱模式

MQTT 使用發佈 - 訂閱模型,設備之間的通信通過中心化的消息代理(Broker)來管理。設備可以發佈消息到主題(Topic),並且只需訂閱自己關心的主題即可獲取消息。這種模式使得數十萬設備能夠高效地共享信息,減少了點對點通信的負擔。

3)、可靠安全性

支持三種服務質量(QoS)級別,確保數據的傳遞可靠性。根據不同的應用場景和網絡條件,用戶可以選擇不同的 QoS,以平衡數據可靠性和網絡開銷。

總之,MQTT 協議設計簡潔,資源佔用低,通過 Broker 管理設備通信,支持大規模併發連接,因此特別適合大規模 IoT 部署場景。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/n3dycD3bKGahBOIzN-d8Vg