搭建基於 MQTT 的高速行情轉發系統

MQTT 是基於 TCP/IP 協議棧構建的異步通信消息協議,是一種輕量級的發佈、訂閱信息傳輸協議。使用 MQTT 協議,消息發送者與接收者不受時間和空間的限制。本文介紹如何基於 MQTT 搭建一個高速的內網行情轉發系統,進行股票、期權、Level2 逐筆行情等多種行情的共享。

一、MQTT 系統的配置

1.1 安裝程序下載和配置

我們首先下載比較流行的 emqx 發行版程序:https://www.emqx.io/zh/downloads

然後進入http://xxx.xxx.xxx.xxx:18083/#/authentication網頁進行系統的配置,默認的用戶名密碼爲 admin/public。

這裏主要是客戶端認證和授權的權限配置,通過自帶的內置數據庫進行用戶管理。

1.2 客戶端程序和庫安裝

二、C++ 高速行情繫統轉發

2.1 異步客戶端實例化

#include "mqtt/async_client.h"

const std::string DFLT_ADDRESS { "mqtt://localhost:1883" }; // 連接地址
const std::string CLIENT_ID { "market-data-publish" }; // 客戶端ID
const int maxBufferedMessages = 1073741824;
const int QOS = 1; 
mqtt::connect_options connopts("xxxxxx""xxxxxx"); // 用戶名/密碼認證
mqtt::async_client mqtt_client(DFLT_ADDRESS, CLIENT_ID, maxBufferedMessages); // 客戶端實例

2.2 回報函數中轉發數據包

    /// 期權行情通知
    virtual void OnRtnSPMarketData(TORALEV1API::CTORATstpMarketDataField *pMarketDataField)
 {
        // SecurityID  SecurityName UpdateTime UpdateMillisec SysTime PreClosePrice OpenPrice Volume  Turnover TradingCount LastPrice HighestPrice LowestPrice PriceUpDown1 PriceUpDown2 UpperLimitPrice LowerLimitPrice  OpenInterest BidPrice1 AskPrice1 BidVolume1 AskVolume1 
  std::string SecurityID(pMarketDataField->SecurityID);
        std::string TradingTime(pMarketDataField->UpdateTime);
        json market_data = {
            {"SecurityID", SecurityID},
            {"TradeDate", today},
            {"TradingTime", today + " " + TradingTime},
            {"SysTime", duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()},
            {"PreClosePrice", pMarketDataField->PreClosePrice},
            {"OpenPrice", pMarketDataField->OpenPrice},
            {"Volume", pMarketDataField->Volume},
            {"Turnover", pMarketDataField->Turnover},
            {"TradingCount", pMarketDataField->TradingCount},
            {"LastPrice", pMarketDataField->LastPrice},
            {"HighestPrice", pMarketDataField->HighestPrice},
            {"LowestPrice", pMarketDataField->LowestPrice},
            {"PriceUpDown1", pMarketDataField->PriceUpDown1},
            {"PriceUpDown2", pMarketDataField->PriceUpDown2},
            {"UpperLimitPrice", pMarketDataField->UpperLimitPrice},
            {"LowerLimitPrice", pMarketDataField->LowerLimitPrice},
            {"OpenInterest", pMarketDataField->OpenInterest},
            {"BidPrice1", pMarketDataField->BidPrice1},
            {"AskPrice1", pMarketDataField->AskPrice1},
            {"BidVolume1", pMarketDataField->BidVolume1},
            {"AskVolume1", pMarketDataField->AskVolume1},
            {"BidPrice2", pMarketDataField->BidPrice2},
            {"AskPrice2", pMarketDataField->AskPrice2},
            {"BidVolume2", pMarketDataField->BidVolume2},
            {"AskVolume2", pMarketDataField->AskVolume2},
            {"BidPrice3", pMarketDataField->BidPrice3},
            {"AskPrice3", pMarketDataField->AskPrice3},
            {"BidVolume3", pMarketDataField->BidVolume3},
            {"AskVolume3", pMarketDataField->AskVolume3},
            {"BidPrice4", pMarketDataField->BidPrice4},
            {"AskPrice4", pMarketDataField->AskPrice4},
            {"BidVolume4", pMarketDataField->BidVolume4},
            {"AskVolume4", pMarketDataField->AskVolume4},
            {"BidPrice5", pMarketDataField->BidPrice5},
            {"AskPrice5", pMarketDataField->AskPrice5},
            {"BidVolume5", pMarketDataField->BidVolume5},
            {"AskVolume5", pMarketDataField->AskVolume5},
        };
        mqtt::message_ptr pubmsg = mqtt::make_message("level1/option/"+SecurityID, market_data.dump());
        mqtt_client.publish(pubmsg);

例如上面的期權行情,我們可以將其盤口 5 檔行情整合成一個 json 字符串,按照"level1/option/"+SecurityID的 topic 發送至 MQTT 服務端。

三、客戶端程序訂閱行情

3.1 客戶端讀取

以 Python 爲例,可以訂閱 level2 的逐筆、level1 快照、期權等:

from paho.mqtt import client as mqtt_client
import random

broker = 'xxx.xxx.xxx.xxx'
port = xxxxxx

#topic = "level1/option/+" # 訂閱所有期權行情
#topic = "level2/xts_market/118009" # level2-3s快照(上海轉債)
#topic = "level2/xts_tick/118009" # level2-逐筆報單(上海轉債)
#topic = "level2/ngts_tick/601888" # level2-逐筆報單(上海股票,暫無行情)
topic = "level2/order/601888" # level2-逐筆報單(上海股票,深圳股票,深圳轉債)
#topic = "level2/trans/601888" # level2-逐筆成交(不包含上海轉債)
#topic = "level2/index/000001" # level2-指數
#topic = "level2/market/000001" # level2-3s快照
#topic = "level1/market/837748" # level1-3s快照
client_id = f'python-mqtt-{random.randint(0, 1000)}'

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # Set Connecting Client ID
    client = mqtt_client.Client(client_id)
    client.username_pw_set('******'password='******')
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message

mqttclient = connect_mqtt()
subscribe(mqttclient)
mqttclient.loop_start()

3.2 基於 MQTT 的流處理平臺

MQTT 本身也建立了一個實時數據處理生態,基於 MQTT 也可以進行流計算:

比如ekuiper(https://ekuiper.org/docs/zh/latest/),可以進行快速的聚合統計分析,有興趣的小夥伴可以去閱讀官方文檔更加深入的瞭解。


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/xT_P-hxfMJhf9z8rIbf3DQ