搭建基於 MQTT 的高速行情轉發系統
MQTT 是基於 TCP/IP 協議棧構建的異步通信消息協議,是一種輕量級的發佈、訂閱信息傳輸協議。使用 MQTT 協議,消息發送者與接收者不受時間和空間的限制。本文介紹如何基於 MQTT 搭建一個高速的內網行情轉發系統,進行股票、期權、Level2 逐筆行情等多種行情的共享。
一、MQTT 系統的配置
1.1 安裝程序下載和配置
我們首先下載比較流行的 emqx 發行版程序:https://www.emqx.io/zh/downloads
然後進入http://xxx.xxx.xxx.xxx:18083/#/authentication
網頁進行系統的配置,默認的用戶名密碼爲 admin/public。
這裏主要是客戶端認證和授權的權限配置,通過自帶的內置數據庫進行用戶管理。
1.2 客戶端程序和庫安裝
-
Python:直接
pip3 install paho-mqtt==1.6.1
-
C++:
-
克隆源碼:
git clone --recursive https://github.com/eclipse/paho.mqtt.cpp
-
CMake 構建:
cmake -Bbuild -H. -DPAHO_BUILD_STATIC=OFF -DPAHO_BUILD_DOCUMENTATION=OFF -DPAHO_BUILD_SAMPLES=OFF
-
安裝:
make install
二、C++ 高速行情繫統轉發
2.1 異步客戶端實例化
#include "mqtt/async_client.h"
const std::string DFLT_ADDRESS { "mqtt://localhost:1883" }; // 連接地址
const std::string CLIENT_ID { "market-data-publish" }; // 客戶端ID
const int maxBufferedMessages = 1073741824;
const int QOS = 1;
mqtt::connect_options connopts("xxxxxx", "xxxxxx"); // 用戶名/密碼認證
mqtt::async_client mqtt_client(DFLT_ADDRESS, CLIENT_ID, maxBufferedMessages); // 客戶端實例
2.2 回報函數中轉發數據包
/// 期權行情通知
virtual void OnRtnSPMarketData(TORALEV1API::CTORATstpMarketDataField *pMarketDataField)
{
// SecurityID SecurityName UpdateTime UpdateMillisec SysTime PreClosePrice OpenPrice Volume Turnover TradingCount LastPrice HighestPrice LowestPrice PriceUpDown1 PriceUpDown2 UpperLimitPrice LowerLimitPrice OpenInterest BidPrice1 AskPrice1 BidVolume1 AskVolume1
std::string SecurityID(pMarketDataField->SecurityID);
std::string TradingTime(pMarketDataField->UpdateTime);
json market_data = {
{"SecurityID", SecurityID},
{"TradeDate", today},
{"TradingTime", today + " " + TradingTime},
{"SysTime", duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()},
{"PreClosePrice", pMarketDataField->PreClosePrice},
{"OpenPrice", pMarketDataField->OpenPrice},
{"Volume", pMarketDataField->Volume},
{"Turnover", pMarketDataField->Turnover},
{"TradingCount", pMarketDataField->TradingCount},
{"LastPrice", pMarketDataField->LastPrice},
{"HighestPrice", pMarketDataField->HighestPrice},
{"LowestPrice", pMarketDataField->LowestPrice},
{"PriceUpDown1", pMarketDataField->PriceUpDown1},
{"PriceUpDown2", pMarketDataField->PriceUpDown2},
{"UpperLimitPrice", pMarketDataField->UpperLimitPrice},
{"LowerLimitPrice", pMarketDataField->LowerLimitPrice},
{"OpenInterest", pMarketDataField->OpenInterest},
{"BidPrice1", pMarketDataField->BidPrice1},
{"AskPrice1", pMarketDataField->AskPrice1},
{"BidVolume1", pMarketDataField->BidVolume1},
{"AskVolume1", pMarketDataField->AskVolume1},
{"BidPrice2", pMarketDataField->BidPrice2},
{"AskPrice2", pMarketDataField->AskPrice2},
{"BidVolume2", pMarketDataField->BidVolume2},
{"AskVolume2", pMarketDataField->AskVolume2},
{"BidPrice3", pMarketDataField->BidPrice3},
{"AskPrice3", pMarketDataField->AskPrice3},
{"BidVolume3", pMarketDataField->BidVolume3},
{"AskVolume3", pMarketDataField->AskVolume3},
{"BidPrice4", pMarketDataField->BidPrice4},
{"AskPrice4", pMarketDataField->AskPrice4},
{"BidVolume4", pMarketDataField->BidVolume4},
{"AskVolume4", pMarketDataField->AskVolume4},
{"BidPrice5", pMarketDataField->BidPrice5},
{"AskPrice5", pMarketDataField->AskPrice5},
{"BidVolume5", pMarketDataField->BidVolume5},
{"AskVolume5", pMarketDataField->AskVolume5},
};
mqtt::message_ptr pubmsg = mqtt::make_message("level1/option/"+SecurityID, market_data.dump());
mqtt_client.publish(pubmsg);
例如上面的期權行情,我們可以將其盤口 5 檔行情整合成一個 json 字符串,按照"level1/option/"+SecurityID
的 topic 發送至 MQTT 服務端。
三、客戶端程序訂閱行情
3.1 客戶端讀取
以 Python 爲例,可以訂閱 level2 的逐筆、level1 快照、期權等:
from paho.mqtt import client as mqtt_client
import random
broker = 'xxx.xxx.xxx.xxx'
port = xxxxxx
#topic = "level1/option/+" # 訂閱所有期權行情
#topic = "level2/xts_market/118009" # level2-3s快照(上海轉債)
#topic = "level2/xts_tick/118009" # level2-逐筆報單(上海轉債)
#topic = "level2/ngts_tick/601888" # level2-逐筆報單(上海股票,暫無行情)
topic = "level2/order/601888" # level2-逐筆報單(上海股票,深圳股票,深圳轉債)
#topic = "level2/trans/601888" # level2-逐筆成交(不包含上海轉債)
#topic = "level2/index/000001" # level2-指數
#topic = "level2/market/000001" # level2-3s快照
#topic = "level1/market/837748" # level1-3s快照
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
client.username_pw_set('******', password='******')
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
mqttclient = connect_mqtt()
subscribe(mqttclient)
mqttclient.loop_start()
-
期權行情
-
逐筆行情
3.2 基於 MQTT 的流處理平臺
MQTT 本身也建立了一個實時數據處理生態,基於 MQTT 也可以進行流計算:
比如ekuiper
(https://ekuiper.org/docs/zh/latest/),可以進行快速的聚合統計分析,有興趣的小夥伴可以去閱讀官方文檔更加深入的瞭解。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/xT_P-hxfMJhf9z8rIbf3DQ