Dapr - 發佈 - 訂閱

前言

 前篇文章對 Dapr 的狀態管理進行了解,本篇繼續對 訂閱 / 發佈 構建塊進行了解。

一、定義:

 發佈訂閱的概念來自於事件驅動架構(EDA)的設計思想,這是一種讓程序(應用、服務)之間解耦的主要方式,通過發佈訂閱的思想也可以實現服務之間的異步調用。而大部分分佈式應用都會依賴這樣的發佈訂閱解耦模式。

 

** 步驟:**

  1. 發佈服務器將消息發送到消息代理。

  2. 訂閱服務器將綁定到消息代理上的訂閱。

  3. 消息代理將消息的副本轉發給感興趣的訂閱。

  4. 訂閱服務器從其訂閱使用消息。

 但是不同的消息中間件之間存在細微的差異,項目使用不同的產品需要實現不同的實現類,雖然是明智的決策,但必須編寫和維護抽象及其基礎實現。此方法需要複雜、重複且容易出錯的自定義代碼。

 Dapr 爲了解決這種問題,提供開箱即用的消息傳送抽象和實現,封裝在 Dapr 構建基塊中。業務系統只需調用跟據 Dapr 的要求實現訂閱發佈即可。

二、工作原理:

 Dapr 發佈 & 訂閱構建基塊提供了平臺無關的 API 框架來發送和接收消息。你的服務將消息發佈到一個命名主題 (topic)。服務訂閱主題(topic) 來使用消息。

 服務在 Dapr Sidecar 上調用 pub/sub API。然後,Sidecar 將調用一個預定義的 Dapr pub/sub 組件來封裝特定的消息代理產品。下圖 顯示了 Dapr 發佈 / 訂閱 消息傳遞堆棧。

 

三、功能:

  Dapr 發佈 & 訂閱構建基塊提供了一個與平臺無關的 API 框架來發送和接收消息。

  服務將消息發佈到指定主題, 業務服務訂閱主題以使用消息。

  服務在 Dapr sidecar 上調用 pub/sub API。然後,sidecar 調用預定義 Dapr pub/sub 組件。

  任何編程平臺都可以使用 Dapr 本機 API 通過 HTTP 或 gRPC 調用構建基塊。若要發佈消息,請進行以下 API 調用:

http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>

  上述調用中有幾個特定於 Dapr 的 URL 段:

  要啓用消息路由併爲每個消息提供附加上下文,Dapr 使用 CloudEvents 1.0 規範 作爲其消息格式。使用 Dapr 應用程序發送的任何信息都將自動包入 Cloud Events 信封中,datacontenttype 屬性使用 Content-Type 頭部值。

  Dapr 實現以下 Cloud Events 字段:

  下面的示例顯示了 CloudEvent v1.0 中序列化爲 JSON 的 XML 內容:

{
    "specversion" : "1.0",
    "type" : "xml.message",
    "source" : "https://example.com/message",
    "subject" : "Test XML Message",
    "id" : "id-1234-5678-9101",
    "time" : "2020-09-23T06:23:21Z",
    "datacontenttype" : "text/xml",
    "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}

  Dapr 應用程序可以訂閱已發佈的 topics。Dapr 允許您的應用程序有兩種方法來訂閱 topics:

**   聲明式**:其中定義在外部文件中:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: test_topic  //主題
  route: /TestPubSub //路由
  pubsubname: pubsub //名稱
scopes:
- frontend       //爲該應用啓用訂閱

   上面的示例顯示了 test_topic主題的事件訂閱,使用組件 pubsub

**   編程方式**:訂閱在用戶代碼中定義

   Dapr 保證消息傳遞 at-least-once 語義。這意味着,當應用程序使用發佈 / 訂閱 API 將消息發佈到主題時,Dapr 可確保此消息至少傳遞給每個訂閱者一次(at least once)

   多個消費組、多個應用程序實例使用一個消費組,這些都將由 Dapr 自動處理。當同一個應用程序的多個實例 (相同的 ID) 訂閱主題時,Dapr 只將每個消息傳遞給該應用程序的一個實例。

   

   同樣,如果兩個不同的應用程序 (不同的 ID) 訂閱同一主題,那麼 Dapr 將每個消息僅傳遞到每個應用程序的一個實例。

   默認情況下,支持 Dapr 發佈 / 訂閱組件的所有主題 (例如,Kafka、Redis、RabbitMQ) 都可用於配置該組件的每個應用程序。爲了限制哪個應用程序可以發佈或訂閱 topic,Dapr 提供了 topic 作用域限定。這使您能夠讓應用程序允許發佈哪些主題以及應用程序允許訂閱哪些主題。

pub/sub 主題作用域限定

爲每個 pub/sub 組件定義發佈 / 訂閱範圍。您可能有一個名爲 pubsub 的 pub/sub 組件,它有一組範圍設置,另一個 pubsub2 另有一組範圍設置。

要使用這個主題範圍,可以設置一個 pub/sub 組件的三個元數據屬性:

   Dapr 可以在每個消息的基礎上設置超時。表示如果消息未從 Pub/Sub 組件讀取,則消息將被丟棄。這是爲了防止未讀消息的積累。在隊列中超過配置的 TTL 的消息就可以說它掛了。  

四、.NET Core 應用

 1、設置 Pub/Sub 組件:

  本機默認下安裝了 Redis Staram, 在 Windows 上打開%UserProfile%\.dapr\components\pubsub.yaml 組件文件以驗證:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

 2、實現發佈 / 訂閱功能 :

  添加控制器(PubSubController)

[Route("api/[controller]")]
[ApiController]
public class PubSubController : ControllerBase
{
    private readonly ILogger<PubSubController> _logger;
    private readonly DaprClient _daprClient;
    public PubSubController(ILogger<PubSubController> logger, DaprClient daprClient)
    {
        _logger = logger;
        _daprClient = daprClient;
    }

    /// <summary>
    /// 發佈消息
    /// </summary>
    /// <returns></returns>
    [HttpGet("pub")]
    public async Task<ActionResult> PubAsync()
    {
        var data = new WeatherForecast() { Summary = "city", Date = DateTime.Now };
        await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data);
        return Ok();
    }

    /// <summary>
    /// 消費消息
    /// </summary>
    /// <returns></returns>
    [Topic("pubsub", "test_topic")]
    [HttpPost("sub")]
    public async Task<ActionResult> Sub()
    {
        Stream stream = Request.Body;
        byte[] buffer = new byte[Request.ContentLength.Value];
        stream.Position = 0L;
        await stream.ReadAsync(buffer, 0, buffer.Length);
        string content = Encoding.UTF8.GetString(buffer);
        _logger.LogInformation("testsub" + content);
        return Ok(content);
    }
}

  Startup.cs 中調整:

public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }
    public IConfiguration Configuration { get; }
    // This method gets called by the runtime. Use this method to add services to the container.
    public void ConfigureServices(IServiceCollection services)
    {
        //注入Dapr
        services.AddControllers().AddDapr();
    }
    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        // 使用CoudEvent
        app.UseCloudEvents();
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
        app.Use((context, next) =>
        {
            context.Request.EnableBuffering();
            return next();
        });
        app.UseRouting();
        app.UseAuthorization();
        app.UseEndpoints(endpoints =>
        {
            endpoints.MapControllers()
            //訂閱處理
            endpoints.MapSubscribeHandler();;
        });
    }
}

 3、dapr 運行程序:

dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .\FrontEnd.dll

 4、調用發佈命令:

http://127.0.0.1:3501/v1.0/invoke/frontend/method/api/pubsub/pub

 5、通過 Dapr cli 發佈消息:

dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'

總結

 pub/sub 模式可幫助你分離分佈式應用程序中的服務。Dapr 發佈 & 訂閱構建基塊簡化了在應用程序中實現此行爲。

 通過 Dapr pub/sub,可以將消息發佈到特定 主題。構建基塊還將查詢服務,以確定 (訂閱) 主題。

 可以通過 HTTP 或特定於語言的 SDK 之一(例如用於 Dapr 的 .NET SDK)本機使用 Dapr pub/sub。.NET SDK 與 ASP.NET 平臺緊密集成。

 使用 Dapr,可以將受支持的消息代理產品插入應用程序。然後,無需更改應用程序的代碼,即可交換消息代理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/kBHeewyqvYv_xc-y0IT5MA