初探可編程網關 Pipy

有幸參加了 Flomesh[1] 組織的 workshop,瞭解了他們的 Pipy 網絡代理,以及圍繞 Pipy 構建起來的生態。Pipy 在生態中,不止是代理的角色,還是 Flomesh 服務網格中的數據平面。

整理一下,做個記錄,順便瞄一下 Pipy 的部分源碼。

介紹

下面是摘自 Github 上關於 Pipy 的介紹:

Pipy 是一個輕量級、高性能、高穩定、可編程的網絡代理。Pipy 核心框架使用 C++ 開發,網絡 IO 採用 ASIO 庫。 Pipy 的可執行文件僅有 5M 左右,運行期的內存佔用 10M 左右,因此 Pipy 非常適合做 Sidecar proxy。

Pipy 內置了自研的 pjs 作爲腳本擴展,使得 Pipy 可以用 JS 腳本根據特定需求快速定製邏輯與功能。

Pipy 採用了模塊化、鏈式的處理架構,用順序執行的模塊來對網絡數據塊進行處理。這種簡單的架構使得 Pipy 底層簡單可靠,同時具備了動態編排流量的能力,兼顧了簡單和靈活。通過使用 REUSE_PORT 的機制(主流 Linux 和 BSD 版本都支持該功能),Pipy 可以以多進程模式運行,使得 Pipy 不僅適用於 Sidecar 模式,也適用於大規模的流量處理場景。 在實踐中,Pipy 獨立部署的時候用作 “軟負載”,可以在低延遲的情況下,實現媲美硬件的負載均衡吞吐能力,同時具有靈活的擴展性。

Pipy 的核心是消息流處理器:

Pipy 流量處理的流程:

核心概念

• 流(Stream)• 管道(Pipeline)• 模塊(Module)• 會話(Session)• 上下文(Context)

以下是個人淺見:

Pipy 使用 pjs 引擎將 JavaScript 格式的配置,解析成其抽象的 Configuration 對象。每個 Configuration 中包含了多個 Pipeline,每個 Configuration 中又會用到多個 Filter。這些都屬於 Pipy 的靜態配置部分。(後面會提到 Pipeline 的三種不同類型)

而屬於運行時的就是流、會話和上下文了,在 Pipy 中,數據流是由對象(Pipy 的抽象)組成的。而這些對象抵達 Pipy,被抽象成不同的事件。而事件觸發不同的過濾器的執行。

我個人更喜歡將其核心理解爲:對數據流的事件處理引擎。

理解歸理解,實踐出真知。“大膽假設,小心求證!”

本地編譯

從編譯 Pipy 開始。

環境準備

#安裝 nodejs
$ nvm install lts/erbium 
#安裝 cmake
$ brew install cmake

編譯 Pipy

從 https://github.com/flomesh-io/pipy.git 克隆代碼。

Pipy 的編譯包括了兩個部分,GUI 和 Pipy 本體。

GUI 是 Pipy 提供的一個用於開發模式下進行配置的界面,首先編譯 Pipy GUI。

# pipy root folder
$ cd gui
$ npm install
$ npm run build

接着編譯 Pipy 的本體

# pipy root folder
$ mkdir build
$ cd build
$ cmake -DCMAKE_BUILD_TYPE=Release -DPIPY_GUI=ON ..
$ make

完成後檢查根目錄下的 bin 目錄,可以看到 pipy 的可執行文件,大小隻有 11M。

Demo:Hello Pipy

開發模式下可以讓 Pipy 攜帶 GUI 啓動,通過 GUI 進行配置。

瀏覽器中打開

配置界面

展開 002-hello 子目錄點選 pipy 並點擊運行按鈕:

$ curl -i localhost:6080
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 7
Hello!

Pipy 過濾器

通過 pipe 的命令可以輸出其支持的過濾器列表,一共 31 個。通過將一系列過濾器進行組裝,可以實現複雜的流處理。

比如 007-logging 的配置實現了日誌的功能:記錄請求和響應的數據,並批量發送到 ElasticSearch。這裏就用到了 forkconnectonSessionStartencodeHttpRequestdecodeHttpRequestonMessageStartonMessagedecodeHttpResponsereplaceMessagelinkmuxtask 等十多種過濾器。

$ bin/pipy --list-filters
connect             (target[, options])                         Sends data to a remote endpoint and receives data from it
demux               (target)                                    Sends messages to a different pipline with each one in its own session and context
decodeDubbo         ()                                          Deframes a Dubbo message
decodeHttpRequest   ()                                          Deframes an HTTP request message
decodeHttpResponse  ()                                          Deframes an HTTP response message
dummy               ()                                          Eats up all events
dump                ([tag])                                     Outputs events to the standard output
encodeDubbo         ([head])                                    Frames a Dubbo message
encodeHttpRequest   ([head])                                    Frames an HTTP request message
encodeHttpResponse  ([head])                                    Frames an HTTP response message
exec                (command)                                   Spawns a child process and connects to its input/output
fork                (target[, sessionData])                     Sends copies of events to other pipeline sessions
link                (target[, when[, target2[, when2, ...]]])   Sends events to a different pipeline
mux                 (target[, selector])                        Sends messages from different sessions to a shared pipeline session
onSessionStart      (callback)                                  Handles the initial event in a session
onData              (callback)                                  Handles a Data event
onMessageStart      (callback)                                  Handles a MessageStart event
onMessageEnd        (callback)                                  Handles a MessageEnd event
onSessionEnd        (callback)                                  Handles a SessionEnd event
onMessageBody       (callback)                                  Handles a complete message body
onMessage           (callback)                                  Handles a complete message including the head and the body
print               ()                                          Outputs raw data to the standard output
replaceSessionStart (callback)                                  Replaces the initial event in a session
replaceData         ([replacement])                             Replaces a Data event
replaceMessageStart ([replacement])                             Replaces a MessageStart event
replaceMessageEnd   ([replacement])                             Replaces a MessageEnd event
replaceSessionEnd   ([replacement])                             Replaces a SessionEnd event
replaceMessageBody  ([replacement])                             Replaces an entire message body
replaceMessage      ([replacement])                             Replaces a complete message including the head and the body
tap                 (quota[, account])                          Throttles message rate or data rate
use                 (module, pipeline[, argv...])               Sends events to a pipeline in a different module
wait                (condition)                                 Buffers up events until a condition is fulfilled

原理

“Talk is cheap, show me the code.”

配置加載

個人比較喜歡看源碼來理解實現,即使是 C++。從瀏覽器請求入手發現運行時向/api/program 發送了 POST 請求,請求的內容是配置文件的地址。

檢查源碼後,找到邏輯的實現在 src/gui.cpp:189

  1. 創建新的 worker2. 加載配置,將 JavaScrip 代碼解析成 Configuration 對象 3. 啓動 worker,執行Configuration::apply()4. 卸載舊的 worker

從 src/api/configuration.cpp:267 處看:pipelinelisten 和 task 配置實際在 Pipy 的配置中都是被抽象爲 Pipeline 對象,只是在類型上有差異分別爲:NAMEDLISTEN 和 TASK。比如 listen 中可以通過 fork 過濾器將事件的副本發送到指定的 pipeline 中。

基於數據流事件的處理

src/inbound.cpp:171

結語

Pipy 雖小(只有 11M),但以其可編程的特性提供了靈活的配置能力,潛力無限。

Pipy 像處理 HTTP 一樣處理任意的七層協議。內部版本支持 Dubbo、Redis、Socks 等,目前正在遷移到開源版本。

期待即將開源的 Portal,以及服務網格 Flomesh。持續關注,後面考慮再寫幾篇。

“未來可期!”

引用鏈接

[1] Flomesh: https://flomesh.cn/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/l8JzYRn350fjuCAOoo8pcg