Dapr 入門教程之發佈訂閱

前面我們瞭解瞭如果在 Dapr 下面進行服務調用,以及最簡單的狀態管理,本節我們來了解如何啓用 Dapr 的發佈 / 訂閱模式,發佈者將生成特定主題的消息,而訂閱者將監聽特定主題的信息。

接下來我們使用的這個示例包含一個發佈者:

包含另外 3 個消息訂閱者:

Dapr 使用可插拔的消息總線來支持發佈 - 訂閱,並將消息傳遞給 CloudEvents(一個 CNCF 項目) 作爲通用的事件信封格式,以提高連接服務的互操作性。

我們這裏將使用 Redis Streams(在 Redis version = > 5 中啓用),當然也可以使用 RabbitMQ、Kafka 等中間件。下圖是用來說明組件之間是如何在本地模式下互相連接的。

dapr pub/sub

本地初始化

Dapr 允許你將相同的微服務從本地機器部署到雲環境中去,這裏爲了和大家說明這種便利性,我們先在本地部署這個實例項目,然後再將其部署到 Kubernetes 環境中去。

要在本地使用 Dapr 服務,需要先在本地初始化 Dapr:

$ dapr init

由於某些網絡原因使用上面的命令可能並不能初始化成功,我們可以使用離線的方式進行安裝,前往 https://github.com/dapr/installer-bundle/releases 下載對應系統的 Bundle 👝 包,然後解壓,比如我這裏是 Mac M1,使用下面的命令下載:

$ wget https://github.91chi.fun/https://github.com/dapr/installer-bundle/releases/download/v1.8.4/daprbundle_darwin_arm64.tar.gz
$ tar -xvf daprbundle_darwin_arm64.tar.gz
x daprbundle/
x daprbundle/README.md
x daprbundle/dapr
x daprbundle/details.json
x daprbundle/dist/
x daprbundle/dist/daprd_darwin_arm64.tar.gz
x daprbundle/dist/dashboard_darwin_arm64.tar.gz
x daprbundle/dist/placement_darwin_arm64.tar.gz
x daprbundle/docker/
x daprbundle/docker/daprio-dapr-1.8.4.tar.gz

然後我們可以重新使用下面的命令進行初始化:

$ dapr init --from-dir daprbundle/
⌛  Making the jump to hyperspace...
⚠  Local bundle installation using --from-dir flag is currently a preview feature and is subject to change. It is only available from CLI version 1.7 onwards.
ℹ️  Installing runtime version 1.8.4
↙  Extracting binaries and setting up components...
Dapr runtime installed to /Users/cnych/.dapr/bin, you may run the following to add it to your path if you want to run daprd directly:
    export PATH=$PATH:/Users/cnych/.dapr/bin
8d7366c22fd8: Loading layer [==================================================>]  3.697MB/3.697MB
61f7f94319f6: Loading layer [==================================================>]  238.6MB/238.6MB
←  Extracting binaries and setting up components... Loaded image: daprio/dapr:1.8.4
✅  Extracting binaries and setting up components...
✅  Extracted binaries and completed components set up.
ℹ️  daprd binary has been installed to /Users/cnych/.dapr/bin.
ℹ️  dapr_placement container is running.
ℹ️  Use `docker ps` to check running containers.
$ dapr version
CLI version: 1.8.0
Runtime version: 1.8.4

默認會啓用 zipkin 這個 tracing 服務,使用上面的命令初始化如果沒有對應的容器,則可以使用 docker run --name dapr_zipkin -d -p 9411:9411 dockerproxy.com/openzipkin/zipkin 啓動該服務。同樣也需要運行一個 Redis 服務:docker run --name dapr_redis -d -p 6379:6379 dockerproxy.com/redislabs/rejson

消息訂閱服務

這裏我們還是使用前面使用的 quickstarts 這個項目,克隆項目到本地:

git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git

進入 tutorials/pub_sub 目錄下面:

➜  pub-sub git:(622b7d9) ls
README.md         deploy            makefile          message_b.json    node-subscriber   react-form
csharp-subscriber img               message_a.json    message_c.json    python-subscriber

運行 Node 消息訂閱服務

首先我們使用 Dapr 運行 node 消息訂閱服務,導航到 node-subscriber 目錄,安裝依賴:

cd node-subscriber
$ npm install  # 或者 yarn

執行如下所示命令運行 node 消息訂閱服務:

$ dapr run --app-id node-subscriber --app-port 3000 node app.js
ℹ️  Starting Dapr with id node-subscriber. HTTP Port: 50728. gRPC Port: 50729
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] app is subscribed to the following topics: [A B] through pubsub=pubsub  app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] dapr initialized. Status: Running. Init Elapsed 312.69599999999997ms  app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] placement tables updated, version: 0          app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
ℹ️  Updating metadata for app command: node app.js
✅  You're up and running! Both Dapr and your app logs will appear here.

上面命令中的 app-id 是微服務的唯一標識符,--app-port 是 Node 應用程序運行的端口,最後,運行應用程序的命令是 node app.js

運行 Python 消息訂閱服務

接下來使用 Dapr 運行 Python 消息訂閱服務,導航到 python-subscriber 目錄:

cd python-subscriber

安裝應用依賴:

$ pip3 install -r requirements.txt

同樣再次使用 dapr run 來運行該訂閱服務:

$ dapr run --app-id python-subscriber --app-port 5001 python3 app.py
ℹ️  Starting Dapr with id python-subscriber. HTTP Port: 55508. gRPC Port: 55509
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] log level set to: info                        app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] enabled gRPC metrics middleware               app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime.grpc.internal type=log ver=1.8.4
INFO[0000] internal gRPC server is running on port 55514  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] application protocol: http. waiting on port 5001.  This will block until the app is listening on that port.  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] application discovered on port 5001           app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
WARN[0000] [DEPRECATION NOTICE] Adding a default content type to incoming service invocation requests is deprecated and will be removed in the future. See https://docs.dapr.io/operations/support/support-preview-features/ for more details. You can opt into the new behavior today by setting the configuration option `ServiceInvocation.NoDefaultContentType` to true.  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
== APP ==  * Serving Flask app "app" (lazy loading)
== APP ==  * Environment: production
== APP ==    WARNING: This is a development server. Do not use it in a production deployment.
== APP ==    Use a production WSGI server instead.
== APP ==  * Debug mode: off
== APP ==  * Running on http://127.0.0.1:5001/ (Press CTRL+C to quit)
ℹ️  Updating metadata for app command: python3 app.py
✅  You're up and running! Both Dapr and your app logs will appear here.

由於我們這裏沒有 C# 環境,所以只運行 Node 和 Python 這兩個消息訂閱服務了。

消息發佈服務

接下來我們來運行 React 這個前端消息發佈服務,同樣先導航到 react-form 項目目錄下面:

cd react-form

然後執行下面的命令安裝依賴並構建服務:

$ npm run buildclient
$ npm install

構建完成後可以使用下面的 dapr 命令來啓動該前端服務:

$ dapr run --app-id react-form --app-port 8080 npm run start
ℹ️  Starting Dapr with id react-form. HTTP Port: 57303. gRPC Port: 57304
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
== APP ==
== APP == > react-form@1.0.0 start
== APP == > node server.js
== APP ==
== APP == Listening on port 8080!
INFO[0000] application discovered on port 8080           app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 760.39ms  app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
ℹ️  Updating metadata for app command: npm run start
✅  You're up and running! Both Dapr and your app logs will appear here.

INFO[0001] placement tables updated, version: 0          app_id=react-form instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4

當看到 == APP == Listening on port 8080! 這樣的日誌時,表示應用啓動成功了。然後我們就可以在瀏覽器中訪問 http://localhost:8080 訪問前端應用了。

比如現在我們選擇消息類型 A,然後隨便輸入一些消息內容,點擊 Submit 發送,然後觀察上面的 Node 和 Python 這兩個消息訂閱者服務的日誌。

選擇一個主題,輸入一些文字,然後發送一條信息!觀察通過你們各自的 Dapr 的日誌。

注意,Node 訂閱者接收類型爲 AB 的消息,而 Python 訂閱者接收類型爲 AC 的消息,所以注意每個控制檯窗口的日誌顯示。

此外 Dapr CLI 提供了一個機制來發布消息用於測試,比如我們可以使用如下命令來發布一條消息:

$ dapr publish --publish-app-id react-form --pubsub pubsub --topic A --data-file message_a.json

到這裏我們就完成了使用 Dapr 來進行消息訂閱發佈的功能演示。

在 Kubernetes 中運行

上面我們是將演示服務在本地部署的,我們知道使用 Dapr 開發的服務是和平臺沒關係的,可以很輕鬆遷移到雲環境,比如現在我們再將上面的示例應用部署到 Kubernetes 集羣中。

要在 Kubernetes 中運行相同的代碼,首先需要設置 Redis 存儲,然後部署微服務,將使用相同的微服務,但最終架構有所不同:

前面我們已經使用 Helm 安裝了 bitnami 下面的 redis 應用:

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update
$ helm install redis bitnami/redis

有了 Redis 服務過後,接着我們需要創建一個發佈訂閱的 Component 組件,前文是創建的一個使用 Redis 的狀態管理組件,對應的組件資源清單如下所示:

# deploy/redis.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    # These settings will work out of the box if you use `helm install
    # bitnami/redis`.  If you have your own setup, replace
    # `redis-master:6379` with your own Redis master address, and the
    # Redis password with your own Secret's name. For more information,
    # see https://docs.dapr.io/operations/components/component-secrets .
    - name: redisHost
      value: redis-master:6379
    - name: redisPassword
      secretKeyRef:
        name: redis
        key: redis-password
auth:
  secretStore: kubernetes

直接應用上面的資源清單即可:

$ kubectl apply -f deploy/redis.yaml
component.dapr.io/pubsub created
$ kubectl get components
NAME         AGE
pubsub       26s
statestore   45h

現在我們就有了一個使用 Redis 爲中間件的發佈訂閱組件了,注意上面對象的類型爲 pubsub.redis

接着我們就可以部署 Python、Node 和 React-form 這 3 個微服了:

$ kubectl apply -f deploy/node-subscriber.yaml
$ kubectl apply -f deploy/python-subscriber.yaml
$ kubectl apply -f deploy/react-form.yaml

部署後查看 Pod 的狀態:

$ kubectl get pods
NAME                                 READY   STATUS    RESTARTS         AGE
node-subscriber-5b5777c785-z8jzn     2/2     Running   0                30m
python-subscriber-76d9fc6c87-ffj7r   2/2     Running   0                30m
react-form-68db4b7777-7qmtj          2/2     Running   0                30m

react-form 這個微服務會通過一個 LoadBalancer 類型的 Service 來對外暴露服務:

$ kubectl get svc
NAME                     TYPE           CLUSTER-IP       EXTERNAL-IP    PORT(S)                               AGE
node-subscriber-dapr     ClusterIP      None             <none>         80/TCP,50001/TCP,50002/TCP,9090/TCP   31m
python-subscriber-dapr   ClusterIP      None             <none>         80/TCP,50001/TCP,50002/TCP,9090/TCP   31m
react-form               LoadBalancer   10.110.199.146   192.168.0.51   80:32510/TCP                          30m
react-form-dapr          ClusterIP      None             <none>         80/TCP,50001/TCP,50002/TCP,9090/TCP   30m

然後我們就可以通過分配的 EXTERNAL-IP 訪問前端服務了。同樣在前端頁面發送幾個不同的消息通知,然後使用 kubectl logs 觀察 Node 和 Python 訂閱服務的日誌。

$ kubectl logs --selector app=node-subscriber -c node-subscriber
$ kubectl logs --selector app=python-subscriber -c python-subscriber

如何工作

現在,我們已經在本地和 Kubernetes 中運行了訂閱發佈示例應用,接下來我們來分析下這是如何工作的。該應用程序分爲兩個訂閱者和一個發佈者。

Node 消息訂閱服務

重新導航到 node-scriber 目錄並查看 Node.js 訂閱者代碼 app.js,該服務通過 Express 暴露了三個 API 端點。第一個是 GET 端點:

app.get("/dapr/subscribe"(_req, res) ={
  res.json([
    {
      pubsubname: "pubsub",
      topic: "A",
      route: "A",
    },
    {
      pubsubname: "pubsub",
      topic: "B",
      route: "B",
    },
  ]);
});

該段代碼是告訴 Dapr 要訂閱 pubsub 這個組件的哪些主題,其中的 route 表示使用路由到那個端點來處理消息,當部署(本地或 Kubernetes)時,Dapr 將調用服務以確定它是否訂閱了任何內容。其他兩個端點是後端點:

app.post("/A"(req, res) ={
  console.log("A: ", req.body.data.message);
  res.sendStatus(200);
});

app.post("/B"(req, res) ={
  console.log("B: ", req.body.data.message);
  res.sendStatus(200);
});

這兩個端點處理來自每個主題類型的消息,我們這裏只是記錄消息,當然在更復雜的應用程序中,這裏就是需要處理業務邏輯的地方了。

此外我們也可以直接通過創建一個 Subscription 的對象來聲明在哪些服務裏面來訂閱組件中的哪些主題。

Python 消息訂閱服務

同樣導航到 python-subscriber 目錄,查看 Python 訂閱服務的代碼文件 app.py。與 Node.js 訂閱者一樣,我們暴露了三個 API 端點,只是這裏使用的是 flask,第一個是 GET 端點:

@app.route('/dapr/subscribe'methods=['GET'])
def subscribe():
    subscriptions = [{
        'pubsubname''pubsub''topic''A''route''A'
    }{
        'pubsubname''pubsub''topic''C''route''C'
    }]
    return jsonify(subscriptions)

同樣的方式,這是告訴 Dapr 要訂閱 pubsub 組件的哪些主題,這裏我們訂閱的組件名爲 pubsub 的,主題爲 AC,這些主題的消息通過其他兩個路由進行處理:

@app.route('/A'methods=['POST'])
def a_subscriber():
    print(f'A: {request.json}'flush=True)
    print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic'])flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

@app.route('/C'methods=['POST'])
def c_subscriber():
    print(f'C: {request.json}'flush=True)
    print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic'])flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

React 前端應用

上面是兩個訂閱服務,接下來查看下發布者,我們的發佈者分爲客戶端和服務器。

客戶端是一個簡單的單頁 React 應用程序,使用 Create React App 啓動,相關的客戶端代碼位於react-form/client/src/MessageForm.js,當用戶提交表單時,將使用最新的聚合 JSON 數據更新 React 狀態。默認情況下,數據設置爲:

{
    messageType: "A",
    message: ""
};

提交表單後,聚合的 JSON 數據將發送到服務器:

fetch("/publish"{
  headers: {
    Accept: "application/json",
    "Content-Type""application/json",
  },
  method: "POST",
  body: JSON.stringify(this.state),
});

服務端是一個典型的 express 應用程序,它暴露了一個 POST 端點:/publish。這樣可以從客戶端接收請求,並根據 Dapr 發佈它們。Express 內置的 JSON 中間件函數用於解析傳入請求中的 JSON:

app.use(express.json());

這樣我們可以獲取到提交的 messageType,可以確定使用哪個主題來發布消息。要使用 Dapr 來發布消息,同樣也是直接使用 Dapr 提供的 API 端點 http://localhost:<DAPR_URL>/publish/<PUBSUB_NAME>/<TOPIC> 即可,根據獲取到的數據構建 Dapr 消息發佈的 URL,提交 JSON 數據,POST 請求還需要在成功完成後返回響應中的成功代碼。

const publishUrl = `${daprUrl}/publish/${pubsubName}/${req.body?.messageType}`;
await axios.post(publishUrl, req.body);
return res.sendStatus(200);

daprUrl 的地址所在的端口可以用下面的代碼來獲取:

const daprUrl = `http://localhost:${process.env.DAPR_HTTP_PORT || 3500}/v1.0`;

默認情況下,Dapr 在 3500 上運行,但如果我們在本地運行 Dapr 並將其設置爲其他端口(使用 CLI run 命令中的 --app-port 標誌),則該端口將作爲環境變量注入應用程序。

此外服務端還通過將默認主頁 / 路由請求轉發到構建的客戶端代碼來託管 React 應用程序本身:

app.get("/"function (_req, res) {
  res.sendFile(path.join(__dirname, "client/build""index.html"));
});

所以我們可以直接通過服務端來訪問到前端頁面。

發佈 - 訂閱模式是我們微服務開發中非常重要的一個模式,可以用來實現高可伸縮性和松耦合。發佈訂閱通常用於需要高度可伸縮的大型應用程序,發佈和訂閱應用程序通常比傳統的 client/server 應用程序具有更好的伸縮性。Pub-sub 允許我們完全解耦組件,發佈者不必知道他們的任何訂閱者,訂閱者也不必知道發佈者。這使得開發人員可以編寫更精簡的微服務,而不會直接依賴彼此。

從上面的示例可以看出 Dapr 中使用發佈訂閱模式進行開發就完全變成了面向 localhost 編程了。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/lE6C1FBssgr8gu15sbim4A