Dapr 入門教程之消息隊列

前面我們瞭解了 Dapr 對發佈訂閱的支持,本節我們將來介紹了 Dapr 中對消息隊列的支持。消息隊列,分爲兩種綁定,一種是輸出綁定,一種是輸入綁定。出和入是看數據的流向,輸出綁定就是作爲生產者的服務把消息通過 Dapr 傳給消息隊列,輸入綁定就是作爲消費者的服務通過 Dapr 從消息隊列裏得到消息。

這裏的消息隊列和發佈訂閱裏的消息總線有什麼區別呢?一個消息進入消息總線的話,所有訂閱者都能得到這個消息,而一個消息進入消息隊列的話,由消費者來取,一次只有一個人能得到。此外,消息總線是不要求處理順序的,兩個消息進入消息總線,誰先被拿到順序是不一定的,而消息隊列可以保證是先入先出的。

本節我們將創建兩個微服務,一個具有輸入綁定,另一個具有輸出綁定,前面我們都使用的 Redis 這種中間件,這裏我們將綁定到 Kafka。

綁定連接到 Kafka,允許我們將消息推送到 Kafka 實例(從 Python 微服務)中,並從該實例(從 Node.js 微服務)接收消息,而不必知道實例的位置。相反,同樣只需要直接使用 Dapr API 通過 sidecars 連接即可。

本地運行

首先我們在本地來運行示例應用,對應的架構圖如下所示:

Bindings 本地模式

同樣使用 quickstarts 這個代碼倉庫:

git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git

由於我們這裏是使用 Kafka 來做消息隊列的中間件,所以我們首先需要在本地環境運行 Kafka,我們可以直接使用 https://github.com/wurstmeister/kafka-docker 這個項目以 Docker 方式運行。

定位到 quickstartstutorials/bindings 目錄,下面有一個 docker-compose-single-kafka.yml 文件:

cd tutorials/bindings
$ cat docker-compose-single-kafka.yml
version: '2'
services:
  zookeeper:
    image: ghcr.io/dapr/3rdparty/zookeeper:latest
    ports:
      - "2181:2181"
  kafka:
    image: ghcr.io/dapr/3rdparty/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "sample:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

我們可以直接而使用 docker-compose 來啓動一個單實例的 Kafka:

$ docker-compose -f ./docker-compose-single-kafka.yml up -d

隔一段時間鏡像拉取完成後以容器方式啓動 Kafka:

$ docker-compose -f ./docker-compose-single-kafka.yml ps
NAME                   COMMAND                  SERVICE             STATUS              PORTS
bindings-kafka-1       "start-kafka.sh"         kafka               running             0.0.0.0:9092->9092/tcp
bindings-zookeeper-1   "/bin/sh -c '/usr/sb…"   zookeeper           running             0.0.0.0:2181->2181/tcp

在本地運行了 Kafka 後,接着我們可以運行輸入綁定的 Node.js 微服務:

cd nodeapp

同樣先安裝服務依賴:

$ npm install  # 或者執行 yarn 命令

然後我們就可以使用 dapr run 命令來啓動該微服務了,啓動方式我們應該比較熟悉了,如下所示:

$ dapr run --app-id bindings-nodeapp --app-port 3000 node app.js --components-path ../components

上面的命令和前面有點不一樣的地方是多了一個 --components-path 用來指定組件路徑,這是因爲現在我們要使用 Kafka 這種中間件來作爲我們的消息隊列組件,那麼我們就需要告訴 Dapr,在 ./components 目錄下面就包含一個對應的 kafka_bindings.yaml 文件,內容如下所示:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: localhost:9092
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

前面在本地模式下面我們沒有主動聲明組件,是因爲我們使用的就是默認的 Redis,而 Kafka 並不是內置就有的,所以需要我們主動聲明,注意上面組件的類型爲 type: bindings.kafkametadata 下面是訪問 Kafka 相關的元數據。正常情況下上面的啓動命令會輸出如下所示的日誌信息:

ℹ️  Starting Dapr with id bindings-nodeapp. HTTP Port: 54215. gRPC Port: 54216
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 347.136ms  app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
ℹ️  Updating metadata for app command: node app.js
✅  You're up and running! Both Dapr and your app logs will appear here.

INFO[0001] placement tables updated, version: 0          app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4

接下來,需要運行輸出綁定的 Python 微服務,定位到 pythonapp 目錄,安裝 requests 依賴:

cd pythonapp
$ pip3 install requests

然後同樣用 dapr run 命令來啓動該微服務,也要注意帶上後面的 --components-path 參數:

$ dapr run --app-id bindings-pythonapp python3 app.py --components-path ../components
ℹ️  Starting Dapr with id bindings-pythonapp. HTTP Port: 54554. gRPC Port: 54555
ℹ️  Checking if Dapr sidecar is listening on HTTP port 54554
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=bindings-pythonapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
ℹ️  Checking if Dapr sidecar is listening on GRPC port 54555
ℹ️  Dapr sidecar is up and running.
ℹ️  Updating metadata for app command: python3 app.py
✅  You're up and running! Both Dapr and your app logs will appear here.

啓動完成後,觀察 Python 服務的日誌,可以看到不斷輸出如下所示成功輸出綁定到 Kafka 的日誌:

== APP == {'data'{'orderId': 1}'operation''create'}
== APP == <Response [204]>
== APP == {'data'{'orderId': 2}'operation''create'}
== APP == <Response [204]>
== APP == {'data'{'orderId': 3}'operation''create'}
== APP == <Response [204]>
# ......

同樣這個時候 Node.js 微服務中也不斷有新的日誌數據產生:

== APP == <Response [204]>
== APP == {'data'{'orderId': 1}'operation''create'}
== APP == <Response [204]>
== APP == {'data'{'orderId': 2}'operation''create'}
== APP == <Response [204]>
== APP == {'data'{'orderId': 3}'operation''create'}
== APP == <Response [204]>
# ......

這是因爲 Python 微服務每隔 1s 就會向我們綁定的消息隊列發送一條消息,而 Node.js 微服務作爲消費者當然會接收到對應的消息數據。

在 Kubernetes 中運行

上面在本地環境下可以正常運行 Dapr bindings 服務,接下來我們再次將該示例部署到 Kubernetes 集羣中來進行觀察。

同樣首先需要提供一個可用的 Kafka 實例,這裏我們仍然使用 Helm Chart 方式來進行安裝:

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update

然後使用如下所示的命令來安裝 Kafka:

$ helm upgrade --install dapr-kafka bitnami/kafka --wait --namespace kafka -f ./kafka-non-persistence.yaml --create-namespace

這裏我們指定了一個無需持久化數據(僅供測試)的 values 文件 kafka-non-persistence.yaml,內容如下所示:

replicas: 1

# Disable persistent storage
persistence:
  enabled: false
zookeeper:
  persistence:
    enabled: false
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
          - matchExpressions:
              - key: kubernetes.io/os
                operator: In
                values:
                  - linux
              - key: kubernetes.io/arch
                operator: In
                values:
                  - amd64

autoCreateTopicsEnable: true

affinity:
  nodeAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
        - matchExpressions:
            - key: kubernetes.io/os
              operator: In
              values:
                - linux
            - key: kubernetes.io/arch
              operator: In
              values:
                - amd64

安裝完成後可以查看 Pod 的狀態來保證 Kafka 啓動成功:

$ kubectl -n kafka get pods -w

NAME                     READY   STATUS    RESTARTS   AGE
dapr-kafka-0             1/1     Running   0          2m7s
dapr-kafka-zookeeper-0   1/1     Running   0          2m57s

接下來我們首先需要在 Kubernetes 集羣中配置使用 Kafka 作爲 Binding 消息中間件的 Component 組件:

# kafka_bindings.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: dapr-kafka.kafka:9092
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

注意該對象上面指定的組件類型爲 bindings.kafkametadata 下面的元信息包括 Kafka brokers 地址、生產者和消費者的配置等等,直接應用上面的資源清單即可:

$ kubectl apply -f kafka_bindings.yaml
$ kubectl get components sample-topic
NAME           AGE
sample-topic   13s

創建完成後在 Dapr Dashboard 中也可以看到對應的組件信息:

dapr dashboard components

接着部署兩個 Node.js 和 Python 微服務即可:

$ kubectl apply -f deploy/node.yaml
service/bindings-nodeapp created
deployment.apps/bindings-nodeapp created
$ kubectl apply -f deploy/python.yaml
deployment.apps/bindings-pythonapp created
$ kubectl get pods
NAME                                  READY   STATUS    RESTARTS         AGE
bindings-nodeapp-8bcdd744d-pj2j7      2/2     Running   0                3m44s
bindings-pythonapp-7b7fcc579b-kqx6p   2/2     Running   0                3m39s

部署完成後可以同樣分別觀察 Node.js 和 Python 微服務的日誌:

$ kubectl logs --selector app=bindingspythonapp -c python --tail=-1
{'data'{'orderId': 1}'operation''create'}
HTTPConnectionPool(host='localhost'port=3500): Max retries exceeded with url: /v1.0/bindings/sample-topic (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9e75181390>: Failed to establish a new connection: [Errno 111] Connection refused'))
{'data'{'orderId': 2}'operation''create'}
<Response [204]>
{'data'{'orderId': 3}'operation''create'}
<Response [204]>
# ......
$ kubectl logs --selector app=bindingsnodeapp -c node --tail=-1
Node App listening on port 3000!
Hello from Kafka!
{ orderId: 2 }
Hello from Kafka!
{ orderId: 3 }
# ......

可以看到兩個微服務的日誌也服務我們的預期的。

如何工作

前面我們在本地或 Kubernetes 中都運行了示例應用,而且沒有更改任何代碼,應用結果都符合預期,接下來我們看看這是如何工作的。

在查看應用程序代碼之前,我們先看看 Kafka 綁定組件的資源清單文件,它們爲 Kafka 連接指定 brokers,爲消費者指定 topicsconsumerGroup,爲生產者指定了 publishTopic

我們創建了名爲 sample-topic 的組件,然後我們通過該組件配置的 Kafka 中的 sample 主題來設置輸入和輸出綁定。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: [kafka broker address]
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

現在我們先導航到 nodeapp 目錄下面打開 app.js 文件,這是 Node.js 輸入綁定示例應用的代碼。這裏使用 Express 暴露了一個 API 端點,需要注意的是 API 名稱必須與在 Kafka 綁定組件中聲明的組件名稱相同,然後 Dapr 運行時將使用來自 sample 主題的事件,然後將 POST 請求與事件負載一起發送給 Node 應用程序。

const express = require("express");
const bodyParser = require("body-parser");
const port = process.env.APP_PORT ?? "3000";

require("isomorphic-fetch");

const app = express();
app.use(bodyParser.json());

// 這裏的 api 端點需要與聲明的組件名稱相同
app.post("/sample-topic"(req, res) ={
  console.log("Hello from Kafka!");
  console.log(req.body);
  res.status(200).send();
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));

所以當 Kafka 中收到消息後就會打印類似如下所示的日誌:

Hello from Kafka!
{ orderId: 3 }

然後我們導航到 pythonapp 目錄下面打開 app.py 文件,這是輸出綁定示例(生產者)應用程序的代碼,該服務會每秒發送一次 POST 請求到 Dapr 的 http 端點的 http://localhost:3500/v1.0/bindings/<output_bindings_name>,並帶有事件的 payload 數據。這個應用程序使用 bindings 組件名 sample-topic 作爲 <output_bindings_name>,然後 Dapr 運行時將事件發送到上面的 Kafka 綁定組件中指定的 sample 主題上去。

import time
import requests
import os

dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)

dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
n = 0
while True:
    n += 1
    payload = { "data"{"orderId": n}"operation""create" }
    print(payload, flush=True)
    try:
        response = requests.post(dapr_url, json=payload)
        print(response, flush=True)

    except Exception as e:
        print(e, flush=True)

    time.sleep(1)

上面代碼中最重要的依然是 Dapr API 地址 dapr_url 的拼接 "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port),注意我們依然是面向 localhost 編程,而 v1.0/bindings/<output_bindings_name> 端點則是 Dapr API 爲我們封裝的輸出消息綁定的統一接口,非常簡單方便。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Sg4jIGm_31ZTFF8LJXU_sA