Dapr 入門教程之發佈訂閱
前面我們瞭解瞭如果在 Dapr 下面進行服務調用,以及最簡單的狀態管理,本節我們來了解如何啓用 Dapr 的發佈 / 訂閱模式,發佈者將生成特定主題的消息,而訂閱者將監聽特定主題的信息。
-
使用發佈服務,開發人員可以重複發佈消息到一個主題上。
-
Pub/sub 組件對這些消息進行排隊處理。
-
該主題訂閱者將從隊列中獲取到消息並處理他們。
接下來我們使用的這個示例包含一個發佈者:
- React 前端消息生成器
包含另外 3 個消息訂閱者:
-
Node.js 訂閱者
-
Python 訂閱者
-
C# 訂閱者
Dapr 使用可插拔的消息總線來支持發佈 - 訂閱,並將消息傳遞給 CloudEvents(一個 CNCF 項目) 作爲通用的事件信封格式,以提高連接服務的互操作性。
我們這裏將使用 Redis Streams(在 Redis version = > 5 中啓用),當然也可以使用 RabbitMQ、Kafka 等中間件。下圖是用來說明組件之間是如何在本地模式下互相連接的。
dapr pub/sub
本地初始化
Dapr 允許你將相同的微服務從本地機器部署到雲環境中去,這裏爲了和大家說明這種便利性,我們先在本地部署這個實例項目,然後再將其部署到 Kubernetes 環境中去。
要在本地使用 Dapr 服務,需要先在本地初始化 Dapr:
$ dapr init
由於某些網絡原因使用上面的命令可能並不能初始化成功,我們可以使用離線的方式進行安裝,前往 https://github.com/dapr/installer-bundle/releases
下載對應系統的 Bundle 👝 包,然後解壓,比如我這裏是 Mac M1,使用下面的命令下載:
$ wget https://github.91chi.fun/https://github.com/dapr/installer-bundle/releases/download/v1.8.4/daprbundle_darwin_arm64.tar.gz
$ tar -xvf daprbundle_darwin_arm64.tar.gz
x daprbundle/
x daprbundle/README.md
x daprbundle/dapr
x daprbundle/details.json
x daprbundle/dist/
x daprbundle/dist/daprd_darwin_arm64.tar.gz
x daprbundle/dist/dashboard_darwin_arm64.tar.gz
x daprbundle/dist/placement_darwin_arm64.tar.gz
x daprbundle/docker/
x daprbundle/docker/daprio-dapr-1.8.4.tar.gz
然後我們可以重新使用下面的命令進行初始化:
$ dapr init --from-dir daprbundle/
⌛ Making the jump to hyperspace...
⚠ Local bundle installation using --from-dir flag is currently a preview feature and is subject to change. It is only available from CLI version 1.7 onwards.
ℹ️ Installing runtime version 1.8.4
↙ Extracting binaries and setting up components...
Dapr runtime installed to /Users/cnych/.dapr/bin, you may run the following to add it to your path if you want to run daprd directly:
export PATH=$PATH:/Users/cnych/.dapr/bin
8d7366c22fd8: Loading layer [==================================================>] 3.697MB/3.697MB
61f7f94319f6: Loading layer [==================================================>] 238.6MB/238.6MB
← Extracting binaries and setting up components... Loaded image: daprio/dapr:1.8.4
✅ Extracting binaries and setting up components...
✅ Extracted binaries and completed components set up.
ℹ️ daprd binary has been installed to /Users/cnych/.dapr/bin.
ℹ️ dapr_placement container is running.
ℹ️ Use `docker ps` to check running containers.
$ dapr version
CLI version: 1.8.0
Runtime version: 1.8.4
默認會啓用
zipkin
這個 tracing 服務,使用上面的命令初始化如果沒有對應的容器,則可以使用docker run --name dapr_zipkin -d -p 9411:9411 dockerproxy.com/openzipkin/zipkin
啓動該服務。同樣也需要運行一個 Redis 服務:docker run --name dapr_redis -d -p 6379:6379 dockerproxy.com/redislabs/rejson
。
消息訂閱服務
這裏我們還是使用前面使用的 quickstarts
這個項目,克隆項目到本地:
git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git
進入 tutorials/pub_sub
目錄下面:
➜ pub-sub git:(622b7d9) ls
README.md deploy makefile message_b.json node-subscriber react-form
csharp-subscriber img message_a.json message_c.json python-subscriber
運行 Node 消息訂閱服務
首先我們使用 Dapr 運行 node 消息訂閱服務,導航到 node-subscriber
目錄,安裝依賴:
$ cd node-subscriber
$ npm install # 或者 yarn
執行如下所示命令運行 node 消息訂閱服務:
$ dapr run --app-id node-subscriber --app-port 3000 node app.js
ℹ️ Starting Dapr with id node-subscriber. HTTP Port: 50728. gRPC Port: 50729
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] app is subscribed to the following topics: [A B] through pubsub=pubsub app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] dapr initialized. Status: Running. Init Elapsed 312.69599999999997ms app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] placement tables updated, version: 0 app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
ℹ️ Updating metadata for app command: node app.js
✅ You're up and running! Both Dapr and your app logs will appear here.
上面命令中的 app-id
是微服務的唯一標識符,--app-port
是 Node 應用程序運行的端口,最後,運行應用程序的命令是 node app.js
。
運行 Python 消息訂閱服務
接下來使用 Dapr 運行 Python 消息訂閱服務,導航到 python-subscriber
目錄:
$ cd python-subscriber
安裝應用依賴:
$ pip3 install -r requirements.txt
同樣再次使用 dapr run
來運行該訂閱服務:
$ dapr run --app-id python-subscriber --app-port 5001 python3 app.py
ℹ️ Starting Dapr with id python-subscriber. HTTP Port: 55508. gRPC Port: 55509
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] log level set to: info app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] enabled gRPC metrics middleware app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime.grpc.internal type=log ver=1.8.4
INFO[0000] internal gRPC server is running on port 55514 app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] application protocol: http. waiting on port 5001. This will block until the app is listening on that port. app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] application discovered on port 5001 app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
WARN[0000] [DEPRECATION NOTICE] Adding a default content type to incoming service invocation requests is deprecated and will be removed in the future. See https://docs.dapr.io/operations/support/support-preview-features/ for more details. You can opt into the new behavior today by setting the configuration option `ServiceInvocation.NoDefaultContentType` to true. app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
== APP == * Serving Flask app "app" (lazy loading)
== APP == * Environment: production
== APP == WARNING: This is a development server. Do not use it in a production deployment.
== APP == Use a production WSGI server instead.
== APP == * Debug mode: off
== APP == * Running on http://127.0.0.1:5001/ (Press CTRL+C to quit)
ℹ️ Updating metadata for app command: python3 app.py
✅ You're up and running! Both Dapr and your app logs will appear here.
由於我們這裏沒有 C# 環境,所以只運行 Node 和 Python 這兩個消息訂閱服務了。
消息發佈服務
接下來我們來運行 React 這個前端消息發佈服務,同樣先導航到 react-form
項目目錄下面:
$ cd react-form
然後執行下面的命令安裝依賴並構建服務:
$ npm run buildclient
$ npm install
構建完成後可以使用下面的 dapr 命令來啓動該前端服務:
$ dapr run --app-id react-form --app-port 8080 npm run start
ℹ️ Starting Dapr with id react-form. HTTP Port: 57303. gRPC Port: 57304
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
== APP ==
== APP == > react-form@1.0.0 start
== APP == > node server.js
== APP ==
== APP == Listening on port 8080!
INFO[0000] application discovered on port 8080 app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 760.39ms app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
ℹ️ Updating metadata for app command: npm run start
✅ You're up and running! Both Dapr and your app logs will appear here.
INFO[0001] placement tables updated, version: 0 app_id=react-form instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
當看到 == APP == Listening on port 8080!
這樣的日誌時,表示應用啓動成功了。然後我們就可以在瀏覽器中訪問 http://localhost:8080
訪問前端應用了。
比如現在我們選擇消息類型 A
,然後隨便輸入一些消息內容,點擊 Submit
發送,然後觀察上面的 Node 和 Python 這兩個消息訂閱者服務的日誌。
選擇一個主題,輸入一些文字,然後發送一條信息!觀察通過你們各自的 Dapr 的日誌。
注意,Node 訂閱者接收類型爲 A
和 B
的消息,而 Python 訂閱者接收類型爲 A
和 C
的消息,所以注意每個控制檯窗口的日誌顯示。
此外 Dapr CLI 提供了一個機制來發布消息用於測試,比如我們可以使用如下命令來發布一條消息:
$ dapr publish --publish-app-id react-form --pubsub pubsub --topic A --data-file message_a.json
到這裏我們就完成了使用 Dapr 來進行消息訂閱發佈的功能演示。
在 Kubernetes 中運行
上面我們是將演示服務在本地部署的,我們知道使用 Dapr 開發的服務是和平臺沒關係的,可以很輕鬆遷移到雲環境,比如現在我們再將上面的示例應用部署到 Kubernetes 集羣中。
要在 Kubernetes 中運行相同的代碼,首先需要設置 Redis 存儲,然後部署微服務,將使用相同的微服務,但最終架構有所不同:
前面我們已經使用 Helm 安裝了 bitnami 下面的 redis 應用:
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update
$ helm install redis bitnami/redis
有了 Redis 服務過後,接着我們需要創建一個發佈訂閱的 Component 組件,前文是創建的一個使用 Redis 的狀態管理組件,對應的組件資源清單如下所示:
# deploy/redis.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
# These settings will work out of the box if you use `helm install
# bitnami/redis`. If you have your own setup, replace
# `redis-master:6379` with your own Redis master address, and the
# Redis password with your own Secret's name. For more information,
# see https://docs.dapr.io/operations/components/component-secrets .
- name: redisHost
value: redis-master:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
auth:
secretStore: kubernetes
直接應用上面的資源清單即可:
$ kubectl apply -f deploy/redis.yaml
component.dapr.io/pubsub created
$ kubectl get components
NAME AGE
pubsub 26s
statestore 45h
現在我們就有了一個使用 Redis 爲中間件的發佈訂閱組件了,注意上面對象的類型爲 pubsub.redis
。
接着我們就可以部署 Python、Node 和 React-form 這 3 個微服了:
$ kubectl apply -f deploy/node-subscriber.yaml
$ kubectl apply -f deploy/python-subscriber.yaml
$ kubectl apply -f deploy/react-form.yaml
部署後查看 Pod 的狀態:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
node-subscriber-5b5777c785-z8jzn 2/2 Running 0 30m
python-subscriber-76d9fc6c87-ffj7r 2/2 Running 0 30m
react-form-68db4b7777-7qmtj 2/2 Running 0 30m
react-form
這個微服務會通過一個 LoadBalancer
類型的 Service 來對外暴露服務:
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
node-subscriber-dapr ClusterIP None <none> 80/TCP,50001/TCP,50002/TCP,9090/TCP 31m
python-subscriber-dapr ClusterIP None <none> 80/TCP,50001/TCP,50002/TCP,9090/TCP 31m
react-form LoadBalancer 10.110.199.146 192.168.0.51 80:32510/TCP 30m
react-form-dapr ClusterIP None <none> 80/TCP,50001/TCP,50002/TCP,9090/TCP 30m
然後我們就可以通過分配的 EXTERNAL-IP
訪問前端服務了。同樣在前端頁面發送幾個不同的消息通知,然後使用 kubectl logs
觀察 Node 和 Python 訂閱服務的日誌。
$ kubectl logs --selector app=node-subscriber -c node-subscriber
$ kubectl logs --selector app=python-subscriber -c python-subscriber
如何工作
現在,我們已經在本地和 Kubernetes 中運行了訂閱發佈示例應用,接下來我們來分析下這是如何工作的。該應用程序分爲兩個訂閱者和一個發佈者。
Node 消息訂閱服務
重新導航到 node-scriber
目錄並查看 Node.js 訂閱者代碼 app.js
,該服務通過 Express
暴露了三個 API 端點。第一個是 GET 端點:
app.get("/dapr/subscribe", (_req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "A",
route: "A",
},
{
pubsubname: "pubsub",
topic: "B",
route: "B",
},
]);
});
該段代碼是告訴 Dapr 要訂閱 pubsub 這個組件的哪些主題,其中的 route
表示使用路由到那個端點來處理消息,當部署(本地或 Kubernetes)時,Dapr 將調用服務以確定它是否訂閱了任何內容。其他兩個端點是後端點:
app.post("/A", (req, res) => {
console.log("A: ", req.body.data.message);
res.sendStatus(200);
});
app.post("/B", (req, res) => {
console.log("B: ", req.body.data.message);
res.sendStatus(200);
});
這兩個端點處理來自每個主題類型的消息,我們這裏只是記錄消息,當然在更復雜的應用程序中,這裏就是需要處理業務邏輯的地方了。
此外我們也可以直接通過創建一個 Subscription 的對象來聲明在哪些服務裏面來訂閱組件中的哪些主題。
Python 消息訂閱服務
同樣導航到 python-subscriber
目錄,查看 Python 訂閱服務的代碼文件 app.py
。與 Node.js 訂閱者一樣,我們暴露了三個 API 端點,只是這裏使用的是 flask
,第一個是 GET 端點:
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [{
'pubsubname': 'pubsub', 'topic': 'A', 'route': 'A'
}, {
'pubsubname': 'pubsub', 'topic': 'C', 'route': 'C'
}]
return jsonify(subscriptions)
同樣的方式,這是告訴 Dapr 要訂閱 pubsub
組件的哪些主題,這裏我們訂閱的組件名爲 pubsub
的,主題爲 A
和 C
,這些主題的消息通過其他兩個路由進行處理:
@app.route('/A', methods=['POST'])
def a_subscriber():
print(f'A: {request.json}', flush=True)
print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic']), flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
@app.route('/C', methods=['POST'])
def c_subscriber():
print(f'C: {request.json}', flush=True)
print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic']), flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
React 前端應用
上面是兩個訂閱服務,接下來查看下發布者,我們的發佈者分爲客戶端和服務器。
客戶端是一個簡單的單頁 React 應用程序,使用 Create React App
啓動,相關的客戶端代碼位於react-form/client/src/MessageForm.js
,當用戶提交表單時,將使用最新的聚合 JSON 數據更新 React 狀態。默認情況下,數據設置爲:
{
messageType: "A",
message: ""
};
提交表單後,聚合的 JSON 數據將發送到服務器:
fetch("/publish", {
headers: {
Accept: "application/json",
"Content-Type": "application/json",
},
method: "POST",
body: JSON.stringify(this.state),
});
服務端是一個典型的 express
應用程序,它暴露了一個 POST 端點:/publish
。這樣可以從客戶端接收請求,並根據 Dapr 發佈它們。Express 內置的 JSON 中間件函數用於解析傳入請求中的 JSON:
app.use(express.json());
這樣我們可以獲取到提交的 messageType
,可以確定使用哪個主題來發布消息。要使用 Dapr 來發布消息,同樣也是直接使用 Dapr 提供的 API 端點 http://localhost:<DAPR_URL>/publish/<PUBSUB_NAME>/<TOPIC>
即可,根據獲取到的數據構建 Dapr 消息發佈的 URL,提交 JSON 數據,POST 請求還需要在成功完成後返回響應中的成功代碼。
const publishUrl = `${daprUrl}/publish/${pubsubName}/${req.body?.messageType}`;
await axios.post(publishUrl, req.body);
return res.sendStatus(200);
daprUrl
的地址所在的端口可以用下面的代碼來獲取:
const daprUrl = `http://localhost:${process.env.DAPR_HTTP_PORT || 3500}/v1.0`;
默認情況下,Dapr 在 3500 上運行,但如果我們在本地運行 Dapr 並將其設置爲其他端口(使用 CLI run
命令中的 --app-port
標誌),則該端口將作爲環境變量注入應用程序。
此外服務端還通過將默認主頁 /
路由請求轉發到構建的客戶端代碼來託管 React 應用程序本身:
app.get("/", function (_req, res) {
res.sendFile(path.join(__dirname, "client/build", "index.html"));
});
所以我們可以直接通過服務端來訪問到前端頁面。
發佈 - 訂閱模式是我們微服務開發中非常重要的一個模式,可以用來實現高可伸縮性和松耦合。發佈訂閱通常用於需要高度可伸縮的大型應用程序,發佈和訂閱應用程序通常比傳統的 client/server 應用程序具有更好的伸縮性。Pub-sub 允許我們完全解耦組件,發佈者不必知道他們的任何訂閱者,訂閱者也不必知道發佈者。這使得開發人員可以編寫更精簡的微服務,而不會直接依賴彼此。
從上面的示例可以看出 Dapr 中使用發佈訂閱模式進行開發就完全變成了面向
localhost
編程了。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/lE6C1FBssgr8gu15sbim4A