Dapr-Actor 構建塊

一、Actor 簡介:

 Actors 爲最低級別的 “計算單元”。換句話說,您將代碼寫入獨立單元 ( 稱爲 actor) ,該單元接收消息並一次處理消息,而不進行任何類型的並行或線程處理。

 當代碼處理一條消息時,它可以向其他參與者發送一條或多條消息,或者創建新的 Actors。底層 運行時 將管理每個 actor 的運行方式,時機和位置,並在 Actors 之間傳遞消息。

 大量 Actors 可以同時執行,而 Actors 可以相互獨立執行。

 Dapr 包含專門實現 virtual actors 模式 的運行時。通過 Dapr 的實現,您可以根據 Actors 模型編寫 Dapr Actor,而 Dapr 利用底層平臺提供的可擴展性和可靠性保證。

 應用場景:

 生命週期:

  Dapr Actors 是虛擬的,意思是他們的生命週期與他們的 in - memory 表現不相關。因此,它們不需要顯式創建或銷燬。Dapr Actors 運行時在第一次接收到該 actor ID 的請求時自動激活 actor。如果 actor 在一段時間內未被使用,那麼 Dapr Actors 運行時將回收內存對象。如果以後需要重新啓動,它還將保持對 actor 的一切原有數據。

  調用 actor 方法和 reminders 將重置空閒時間,例如,reminders 觸發將使 actor 保持活動狀態。不論 actor 是否處於活動狀態或不活動狀態 Actor reminders 都會觸發,對不活動 actor ,那麼會首先激活 actor。Actor timers 不會重置空閒時間,因此 timer 觸發不會使參與者保持活動狀態。Timer 僅在 actor 活躍時被觸發。

  空閒超時和掃描時間間隔 Dapr 運行時用於查看是否可以對 actor 進行垃圾收集。當 Dapr 運行時調用 actor 服務以獲取受支持的 actor 類型時,可以傳遞此信息。

  Virtual actors 生命週期抽象會將一些警告作爲 virtual actors 模型的結果,而事實上, Dapr Actors 實施有時會偏離此模型。

  在第一次將消息發送到其 actor 標識時,將自動激活 actor (導致構造 actor 對象) 。在一段時間後,actor 對象將被垃圾回收。以後,再次使用 actor ID 訪問,將構造新的 actor。Actor 的狀態比對象的生命週期更久,因爲狀態存儲在 Dapr 運行時的配置狀態提供程序中(也就是說 Actor 即使不在活躍狀態,仍然可以讀取它的狀態)。

二、工作原理:

 Dapr Sidecar 提供了用於調用執行組件的 HTTP/gRPC API。這是 HTTP API 的 URL 格式: 

http://localhost:<daprPort>/v1.0/actors/<actorType>/<actorId>/

 a)Actor 組件放置服務流程:

  1. 啓動時,Sidecar 調用執行組件服務以獲取註冊的執行組件類型和執行組件的配置設置。

  2. Sidecar 將註冊的執行組件類型的列表發送到放置服務。

  3. 放置服務會將更新的分區信息廣播到所有執行組件服務實例。每個實例都將保留分區信息的緩存副本,並使用它來調用執行組件。

b)調用 Actor 組件方法流程:

  

  1. 服務在 Sidecar 上調用執行組件 API。請求正文中的 JSON 有效負載包含要發送到執行組件的數據。

  2. Sidecar 使用位置服務中的本地緩存的分區信息來確定哪個執行組件服務實例 (分區) 負責託管 ID 爲的執行組件 3 。在此示例中,它是 pod 2 中的服務實例。調用將轉發到相應的 Sidecar 。

  3. Pod 2 中的 Sidecar 實例調用服務實例以調用執行組件。 服務實例激活 actor(如果它還沒有激活)並執行 actor 方法。

三、Actor timers(定時器) 和 reminders(提醒)

 可以使用計時器和提醒來計劃自身的調用。這兩個概念都支持配置截止時間。不同之處在於回調註冊的生存期:

1、Timers 定時器:

Dapr Actor 運行時確保回調方法被順序調用,而非併發調用。這意味着,在此回調完成執行之前,不會有其他 Actor 方法或 timer/remider 回調被執行。

  Timer 的下一個週期在回調完成執行後開始計算。這意味着 timer 在回調執行時停止,並在回調完成時啓動。

  Dapr Actor 運行時在回調完成時保存對 actor 的狀態所作的更改。如果在保存狀態時發生錯誤,那麼將取消激活該 actor 對象,並且將激活新實例。

  當 actor 作爲垃圾回收 (GC) 的一部分被停用時,所有 timer 都會停止。在此之後,將不會再調用 timer 的回調。此外, Dapr Actors 運行時不會保留有關在失活之前運行的 timer 的任何信息。也就是說,重新啓動 actor 後將會激活的 timer 完全取決於註冊時登記的 timer。

a) 創建定時器:

POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>

  Timer 的 duetime 和回調函數可以在請求主體中指定。到期時間(due time)表示註冊後 timer 將首次觸發的時間。 period 表示 timer 在此之後觸發的頻率。到期時間爲 0 表示立即執行。負 due times 和負 periods 都是無效。

  以下請求體配置了一個 timer, dueTime 9 秒, period 3 秒。這意味着它將在 9 秒後首次觸發,然後每 3 秒觸發一次。

{
  "dueTime":"0h0m9s0ms",
  "period":"0h0m3s0ms"
}

b) 刪除定時器:

DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>

2、Reminders 提醒:

  Reminders 是一種在指定時間內觸發 persistent 回調的機制。它們的功能類似於 timer。但與 timer 不同,在所有情況下 reminders 都會觸發,直到 actor 顯式取消註冊 reminders 或刪除 actor 。具體而言, reminders 會在所有 actor 失活和故障時也會觸發觸發,因爲 Dapr Actors 運行時會將 reminders 信息持久化到 Dapr Actors 狀態提供者中。

  a) 創建 Reminders

POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

Reminders 的 duetime 和回調函數可以在請求主體中指定。到期時間(due time)表示註冊後 reminders 將首次觸發的時間。 period 表示在此之後 reminders 將觸發的頻率。到期時間爲 0 表示立即執行。負 due times 和負 periods 都是無效。若要註冊僅觸發一次的 reminders ,請將 period 設置爲空字符串。

  以下請求體配置了一個 reminders, dueTime 9 秒, period 3 秒。這意味着它將在 9 秒後首次觸發,然後每 3 秒觸發一次。

{
  "dueTime":"0h0m9s0ms",
  "period":"0h0m3s0ms"
}

  b) 獲取 Reminders 

GET http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

  c) 刪除 Reminders 

DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

四、數據持久化:

  使用 Dapr 狀態管理構建塊保存執行組件狀態。由於執行組件可以一輪執行多個狀態操作,因此狀態存儲組件必須支持多項事務

  當前狀態管理組件支持事務 / Actors 支持情況:

g6qmJb

 需要支持 Actor 狀態存儲需添加以下內容:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: actorStateStore
    value: "true"

五、.NET Core 實例:

1、添加 nuget 包引用:Dapr.ActorsDapr.Actors.AspNetCore。

2、定義 IOrderStatusActor 接口,需要繼承自 IActor

public interface IOrderStatusActor : IActor
{
    Task<string> Paid(string orderId);
    Task<string> GetStatus(string orderId);
}

  執行組件方法的返回類型必須爲 Task 或 Task<T> 。此外,執行組件方法最多隻能有一個參數。返回類型和參數都必須可 System.Text.Json 序列化。

3、定義 OrderStatusActor 實現 IOrderStatusActor,並繼承自 Actor

public class OrderStatusActor : Actor, IOrderStatusActor
{
    public OrderStatusActor(ActorHost host) : base(host)
    {
    }
    public async Task<string> Paid(string orderId)
    {
        // change order status to paid
        await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");
        return orderId;
    }
    public async Task<string> GetStatus(string orderId)
    {
        return await StateManager.GetStateAsync<string>(orderId);
    }
}

4、修改 Statup.cs 文件

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();
    //註冊Actor
    services.AddActors(option =>
    {
        option.Actors.RegisterActor<OrderStatusActor>();
    });
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }
    app.UseRouting();
    app.UseAuthorization();
    app.UseEndpoints(endpoints =>
    {
        endpoints.MapControllers();
        //
        endpoints.MapActorsHandlers();
    });
}

5、添加 ActorController 操作 Actor

[Route("api/[controller]")]
[ApiController]
public class ActorController : ControllerBase
{
    private readonly IActorProxyFactory _actorProxyFactory;

    public ActorController(IActorProxyFactory actorProxyFactory)
    {
        _actorProxyFactory = actorProxyFactory;
    }

    /// <summary>
    /// 方式一:ActorProxy.Create方式
    /// </summary>
    /// <param ></param>
    /// <returns></returns>
    [HttpGet("paid/{orderId}")]
    public async Task<ActionResult> PaidAsync(string orderId)
    {
        var actorId = new ActorId("myid-" + orderId);
        var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");
        var result = await proxy.Paid(orderId);
        return Ok(result);
    }

    /// <summary>
    /// 方式二:依賴注入方式
    /// </summary>
    /// <param ></param>
    /// <returns></returns>
    [HttpGet("get/{orderId}")]
    public async Task<ActionResult> GetAsync(string orderId)
    {
        var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>(
            new ActorId("myid-" + orderId), "OrderStatusActor");

        return Ok(await proxy.GetStatus(orderId));
    }
}

6、Timer 應用:使用 Actor 基類的 **RegisterTimerAsync **方法註冊計時器:在 OrderStatusActor 類中新增方法

#region Timer操作

/// <summary>
/// 啓動Timer定時器
/// </summary>
/// <param >定時器名稱</param>
/// <param >定時器參數</param>
/// <returns></returns>
public Task StartTimerAsync(string name, string text)
{
    //註冊立即執行的間隔3s執行的定時器
    return RegisterTimerAsync(
        name,
        nameof(TimerCallbackAsync),
        Encoding.UTF8.GetBytes(text),
        TimeSpan.Zero,
        TimeSpan.FromSeconds(3));
}

/// <summary>
/// 定時器回調
/// </summary>
/// <param ></param>
/// <returns></returns>
public Task TimerCallbackAsync(byte[] state)
{
    var text = Encoding.UTF8.GetString(state);

    Console.WriteLine($"Timer fired: {text}");

    return Task.CompletedTask;

}

/// <summary>
/// 停止定時器
/// </summary>
/// <param ></param>
/// <returns></returns>
public Task StopTimerAsync(string name)
{
    //停止計時器 UnregisterTimerAsync
    return UnregisterTimerAsync(name);
}

#endregion

7、Reminder 操作:使用 Actor 基類的 RegisterReminderAsync 方法計劃計時器。在 OrderStatusActor 類中新增方法

#region Reminder 操作

public Task SetReminderAsync(string text)
{
    return RegisterReminderAsync(
        "test-reminder",
        Encoding.UTF8.GetBytes(text),
        TimeSpan.Zero,
        TimeSpan.FromSeconds(1));
}

/// <summary>
/// Reminder觸發處理(實現IRemindable接口處理觸發)
/// </summary>
/// <param ></param>
/// <param ></param>
/// <param ></param>
/// <param ></param>
/// <returns></returns>
public Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
{
    if (reminderName == "test-reminder")
    {
        var text = Encoding.UTF8.GetString(state);
        Console.WriteLine($"reminder fired: {text}");
    }
    return Task.CompletedTask;
}
#endregion

8、啓動定時器:

public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable
{
    public OrderStatusActor(ActorHost host) : base(host)
    {
        //註冊Timer
        StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();

        //設置Reminder
        SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();
    }
    //其他處理邏輯    
}

總結:

 Dapr 執行組件構建基塊可以更輕鬆地編寫正確的併發系統。執行組件是狀態和邏輯的小單元。它們使用基於輪次的訪問模型,無需使用鎖定機制編寫線程安全代碼。執行組件是隱式創建的,在未執行任何操作時以無提示方式從內存中卸載。重新激活執行組件時,自動持久保存並加載執行組件中存儲的任何狀態。執行組件模型實現通常是爲特定語言或平臺創建的。但是,藉助 Dapr 執行組件構建基塊,可以從任何語言或平臺利用執行組件模型。

 Actor 組件支持計時器和提醒來計劃將來的工作。計時器不會重置空閒計時器,並且允許執行組件在未執行其他操作時停用。提醒會重置空閒計時器,並且也會自動保留。計時器和提醒都遵守基於輪次的訪問模型,確保在處理計時器 / 提醒事件時無法執行任何其他操作。

 使用 Dapr 狀態管理構建基塊 持久保存執行組件狀態。支持多項事務的任何狀態存儲都可用於存儲執行組件狀態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/mztVlqPjANs7vlr1MnM9fA