Dapr 入門教程之消息隊列
前面我們瞭解了 Dapr 對發佈訂閱的支持,本節我們將來介紹了 Dapr 中對消息隊列的支持。消息隊列,分爲兩種綁定,一種是輸出綁定,一種是輸入綁定。出和入是看數據的流向,輸出綁定就是作爲生產者的服務把消息通過 Dapr 傳給消息隊列,輸入綁定就是作爲消費者的服務通過 Dapr 從消息隊列裏得到消息。
這裏的消息隊列和發佈訂閱裏的消息總線有什麼區別呢?一個消息進入消息總線的話,所有訂閱者都能得到這個消息,而一個消息進入消息隊列的話,由消費者來取,一次只有一個人能得到。此外,消息總線是不要求處理順序的,兩個消息進入消息總線,誰先被拿到順序是不一定的,而消息隊列可以保證是先入先出的。
本節我們將創建兩個微服務,一個具有輸入綁定,另一個具有輸出綁定,前面我們都使用的 Redis 這種中間件,這裏我們將綁定到 Kafka。
-
Node.js 微服務使用輸入綁定
-
Python 微服務利用輸出綁定
綁定連接到 Kafka,允許我們將消息推送到 Kafka 實例(從 Python 微服務)中,並從該實例(從 Node.js 微服務)接收消息,而不必知道實例的位置。相反,同樣只需要直接使用 Dapr API 通過 sidecars 連接即可。
本地運行
首先我們在本地來運行示例應用,對應的架構圖如下所示:
Bindings 本地模式
同樣使用 quickstarts
這個代碼倉庫:
git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git
由於我們這裏是使用 Kafka 來做消息隊列的中間件,所以我們首先需要在本地環境運行 Kafka,我們可以直接使用 https://github.com/wurstmeister/kafka-docker
這個項目以 Docker 方式運行。
定位到 quickstarts
的 tutorials/bindings
目錄,下面有一個 docker-compose-single-kafka.yml
文件:
$ cd tutorials/bindings
$ cat docker-compose-single-kafka.yml
version: '2'
services:
zookeeper:
image: ghcr.io/dapr/3rdparty/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: ghcr.io/dapr/3rdparty/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "sample:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
我們可以直接而使用 docker-compose 來啓動一個單實例的 Kafka:
$ docker-compose -f ./docker-compose-single-kafka.yml up -d
隔一段時間鏡像拉取完成後以容器方式啓動 Kafka:
$ docker-compose -f ./docker-compose-single-kafka.yml ps
NAME COMMAND SERVICE STATUS PORTS
bindings-kafka-1 "start-kafka.sh" kafka running 0.0.0.0:9092->9092/tcp
bindings-zookeeper-1 "/bin/sh -c '/usr/sb…" zookeeper running 0.0.0.0:2181->2181/tcp
在本地運行了 Kafka 後,接着我們可以運行輸入綁定的 Node.js 微服務:
$ cd nodeapp
同樣先安裝服務依賴:
$ npm install # 或者執行 yarn 命令
然後我們就可以使用 dapr run
命令來啓動該微服務了,啓動方式我們應該比較熟悉了,如下所示:
$ dapr run --app-id bindings-nodeapp --app-port 3000 node app.js --components-path ../components
上面的命令和前面有點不一樣的地方是多了一個 --components-path
用來指定組件路徑,這是因爲現在我們要使用 Kafka 這種中間件來作爲我們的消息隊列組件,那麼我們就需要告訴 Dapr,在 ./components
目錄下面就包含一個對應的 kafka_bindings.yaml
文件,內容如下所示:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
前面在本地模式下面我們沒有主動聲明組件,是因爲我們使用的就是默認的 Redis,而 Kafka 並不是內置就有的,所以需要我們主動聲明,注意上面組件的類型爲 type: bindings.kafka
,metadata
下面是訪問 Kafka 相關的元數據。正常情況下上面的啓動命令會輸出如下所示的日誌信息:
ℹ️ Starting Dapr with id bindings-nodeapp. HTTP Port: 54215. gRPC Port: 54216
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 347.136ms app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
ℹ️ Updating metadata for app command: node app.js
✅ You're up and running! Both Dapr and your app logs will appear here.
INFO[0001] placement tables updated, version: 0 app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
接下來,需要運行輸出綁定的 Python 微服務,定位到 pythonapp
目錄,安裝 requests
依賴:
$ cd pythonapp
$ pip3 install requests
然後同樣用 dapr run
命令來啓動該微服務,也要注意帶上後面的 --components-path
參數:
$ dapr run --app-id bindings-pythonapp python3 app.py --components-path ../components
ℹ️ Starting Dapr with id bindings-pythonapp. HTTP Port: 54554. gRPC Port: 54555
ℹ️ Checking if Dapr sidecar is listening on HTTP port 54554
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=bindings-pythonapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
ℹ️ Checking if Dapr sidecar is listening on GRPC port 54555
ℹ️ Dapr sidecar is up and running.
ℹ️ Updating metadata for app command: python3 app.py
✅ You're up and running! Both Dapr and your app logs will appear here.
啓動完成後,觀察 Python 服務的日誌,可以看到不斷輸出如下所示成功輸出綁定到 Kafka 的日誌:
== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP == <Response [204]>
# ......
同樣這個時候 Node.js 微服務中也不斷有新的日誌數據產生:
== APP == <Response [204]>
== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP == <Response [204]>
# ......
這是因爲 Python 微服務每隔 1s 就會向我們綁定的消息隊列發送一條消息,而 Node.js 微服務作爲消費者當然會接收到對應的消息數據。
在 Kubernetes 中運行
上面在本地環境下可以正常運行 Dapr bindings 服務,接下來我們再次將該示例部署到 Kubernetes 集羣中來進行觀察。
同樣首先需要提供一個可用的 Kafka 實例,這裏我們仍然使用 Helm Chart 方式來進行安裝:
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update
然後使用如下所示的命令來安裝 Kafka:
$ helm upgrade --install dapr-kafka bitnami/kafka --wait --namespace kafka -f ./kafka-non-persistence.yaml --create-namespace
這裏我們指定了一個無需持久化數據(僅供測試)的 values 文件 kafka-non-persistence.yaml
,內容如下所示:
replicas: 1
# Disable persistent storage
persistence:
enabled: false
zookeeper:
persistence:
enabled: false
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
autoCreateTopicsEnable: true
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
安裝完成後可以查看 Pod 的狀態來保證 Kafka 啓動成功:
$ kubectl -n kafka get pods -w
NAME READY STATUS RESTARTS AGE
dapr-kafka-0 1/1 Running 0 2m7s
dapr-kafka-zookeeper-0 1/1 Running 0 2m57s
接下來我們首先需要在 Kubernetes 集羣中配置使用 Kafka 作爲 Binding 消息中間件的 Component 組件:
# kafka_bindings.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: dapr-kafka.kafka:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
注意該對象上面指定的組件類型爲 bindings.kafka
,metadata
下面的元信息包括 Kafka brokers 地址、生產者和消費者的配置等等,直接應用上面的資源清單即可:
$ kubectl apply -f kafka_bindings.yaml
$ kubectl get components sample-topic
NAME AGE
sample-topic 13s
創建完成後在 Dapr Dashboard 中也可以看到對應的組件信息:
dapr dashboard components
接着部署兩個 Node.js 和 Python 微服務即可:
$ kubectl apply -f deploy/node.yaml
service/bindings-nodeapp created
deployment.apps/bindings-nodeapp created
$ kubectl apply -f deploy/python.yaml
deployment.apps/bindings-pythonapp created
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
bindings-nodeapp-8bcdd744d-pj2j7 2/2 Running 0 3m44s
bindings-pythonapp-7b7fcc579b-kqx6p 2/2 Running 0 3m39s
部署完成後可以同樣分別觀察 Node.js 和 Python 微服務的日誌:
$ kubectl logs --selector app=bindingspythonapp -c python --tail=-1
{'data': {'orderId': 1}, 'operation': 'create'}
HTTPConnectionPool(host='localhost', port=3500): Max retries exceeded with url: /v1.0/bindings/sample-topic (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9e75181390>: Failed to establish a new connection: [Errno 111] Connection refused'))
{'data': {'orderId': 2}, 'operation': 'create'}
<Response [204]>
{'data': {'orderId': 3}, 'operation': 'create'}
<Response [204]>
# ......
$ kubectl logs --selector app=bindingsnodeapp -c node --tail=-1
Node App listening on port 3000!
Hello from Kafka!
{ orderId: 2 }
Hello from Kafka!
{ orderId: 3 }
# ......
可以看到兩個微服務的日誌也服務我們的預期的。
如何工作
前面我們在本地或 Kubernetes 中都運行了示例應用,而且沒有更改任何代碼,應用結果都符合預期,接下來我們看看這是如何工作的。
在查看應用程序代碼之前,我們先看看 Kafka 綁定組件的資源清單文件,它們爲 Kafka 連接指定 brokers
,爲消費者指定 topics
和 consumerGroup
,爲生產者指定了 publishTopic
。
我們創建了名爲 sample-topic
的組件,然後我們通過該組件配置的 Kafka 中的 sample
主題來設置輸入和輸出綁定。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: [kafka broker address]
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
現在我們先導航到 nodeapp
目錄下面打開 app.js
文件,這是 Node.js 輸入綁定示例應用的代碼。這裏使用 Express
暴露了一個 API 端點,需要注意的是 API 名稱必須與在 Kafka 綁定組件中聲明的組件名稱相同,然後 Dapr 運行時將使用來自 sample
主題的事件,然後將 POST 請求與事件負載一起發送給 Node 應用程序。
const express = require("express");
const bodyParser = require("body-parser");
const port = process.env.APP_PORT ?? "3000";
require("isomorphic-fetch");
const app = express();
app.use(bodyParser.json());
// 這裏的 api 端點需要與聲明的組件名稱相同
app.post("/sample-topic", (req, res) => {
console.log("Hello from Kafka!");
console.log(req.body);
res.status(200).send();
});
app.listen(port, () => console.log(`Node App listening on port ${port}!`));
所以當 Kafka 中收到消息後就會打印類似如下所示的日誌:
Hello from Kafka!
{ orderId: 3 }
然後我們導航到 pythonapp
目錄下面打開 app.py
文件,這是輸出綁定示例(生產者)應用程序的代碼,該服務會每秒發送一次 POST 請求到 Dapr 的 http 端點的 http://localhost:3500/v1.0/bindings/<output_bindings_name>
,並帶有事件的 payload 數據。這個應用程序使用 bindings 組件名 sample-topic
作爲 <output_bindings_name>
,然後 Dapr 運行時將事件發送到上面的 Kafka 綁定組件中指定的 sample
主題上去。
import time
import requests
import os
dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)
dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
n = 0
while True:
n += 1
payload = { "data": {"orderId": n}, "operation": "create" }
print(payload, flush=True)
try:
response = requests.post(dapr_url, json=payload)
print(response, flush=True)
except Exception as e:
print(e, flush=True)
time.sleep(1)
上面代碼中最重要的依然是 Dapr API 地址 dapr_url
的拼接 "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
,注意我們依然是面向 localhost
編程,而 v1.0/bindings/<output_bindings_name>
端點則是 Dapr API 爲我們封裝的輸出消息綁定的統一接口,非常簡單方便。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Sg4jIGm_31ZTFF8LJXU_sA