通過 Dapr 實現一個簡單的基於 dotnet 的微服務電商系統——一步一步教你如何擼 Dapr 之 Actor 服務
我個人認爲 Actor 應該是 Dapr 裏比較重頭的部分也是 Dapr 一直在講的所謂 “stateful applications” 真正具體的一個實現(個人認爲),上一章講到有狀態服務可能很多同學看到後的第一反應是“不就是個分佈式緩存嗎”。那今天就講講 Actor, 看看這個東西到底能不能算得上有狀態服務,同時由於篇幅有限,這裏只會快速的過一遍 Actor 相關的概念,着重還是代碼層面的實現。
目錄:
一、通過 Dapr 實現一個簡單的基於. net 的微服務電商系統
二、通過 Dapr 實現一個簡單的基於. net 的微服務電商系統 (二)——通訊框架講解
三、通過 Dapr 實現一個簡單的基於. net 的微服務電商系統 (三)——一步一步教你如何擼 Dapr
四、通過 Dapr 實現一個簡單的基於. net 的微服務電商系統 (四)——一步一步教你如何擼 Dapr 之訂閱發佈
五、通過 Dapr 實現一個簡單的基於. net 的微服務電商系統 (五)——一步一步教你如何擼 Dapr 之狀態管理
附錄:(如果你覺得對你有用,請給個 star)
一、電商 Demo 地址:https://github.com/sd797994/Oxygen-Dapr.EshopSample
二、通訊框架地址:https://github.com/sd797994/Oxygen-Dapr
最早我接觸到 Actor 應該是微軟的 Orleans 框架 (熟悉 Actor 或者 Orleans 的同學這一大段可以直接跳過),百度 Actor 關鍵詞一大堆 “通用併發編程模型” 可能讓人云裏霧裏的,其實它並不是一個特別複雜的概念。什麼是併發編程?這個概念大家應該很熟悉了,現在主流的 web 服務器 (如. netcore 的 kestrel 或者 dotnetty) 幾乎都是支持並行訪問的,通過線程池充分調度操作系統的多線程來並行完成任務。在傳統的多線程模式中如果多個線程同時訪問某個數據並對其進行非冪等操作,往往是線程不安全的。
在單應用時代我們可以很方便的通過 lock 關鍵字或者 semaphore 信號量或者 concurrent 線程安全集合或者 Interlocked 這樣的 CAS 原子操作去規避多線程訪問導致的數據不安全,亦或者直接採用以數據庫事務爲基礎的樂觀 or 悲觀事務來實現,而一旦我們的應用由於吞吐瓶頸需要以集羣的方式部署時或者分佈式部署後對數據庫也進行了拆分後,上面的那些方案都會失效或者會導致高昂的成本 (比如數據庫分佈式事務協調機制)。這個時候往往需要引入一些分佈式組件比如 zookeeper 或者 redis 鎖來解決。這也是分佈式系統比較常用的數據一致性方案。而 actor 則是提出了一個新的在分佈式環境下解決多線程污染數據的思路。
actor 概念相對比較複雜這裏就不展開了,簡單粗暴的來理解就是在內存裏爲每一個 actor 對象維護了一個消息隊列,當任意的請求不管該請求是來自於其他進程的線程亦或是當前進程的線程,都會將請求寫入該消息隊列,而 Actor 對象會監聽該隊列,當收到消息後 Actor 會處理該請求,在請求處理期間,外部線程會被阻塞在消息隊列中,並且新的請求也會入隊等待,直到 actor 對象完成操作後從隊列裏取出下一個請求處理直到整個隊列爲空。同時每一個 actor 對象在其臨界區內的內存是私有的,並不會被其他線程共享,從而就實現了內存安全。這樣當我們客戶端發起數個請求訪問一個或多個 Actor 對象時每個請求都會進入對應的 Actor 對象的消息隊列 (術語叫 Mailboxs) 並等待 actor 消費。同時 Dapr 框架會確保同一個 Actor 對象在同一時間在整個分佈式系統中只會被激活一個實例!從而確保了你無論從分佈式系統的任意角落訪問某個 Actor 對象(user?id=1),總能得到唯一的一個實例
Dapr 框架會確保你的 Actor 實例永遠能夠被訪問到 (正確激活),哪怕對象在長時間未被訪問後系統回收休眠亦或者在未處理的異常導致其崩潰後
正確使用 Actor 唯一的要求就只有一條,由於 Actor 是一個內存併發模型所以不要在併發訪問 Actor 時去做任意的可能的 IO 阻塞 (比如讀取數據庫)!
開始擼碼,首先我們做一個 RPC 服務,看看多線程訪問下的數據會是什麼個情況,再對比一下 Actor 模式!在 RPC 層我們創建一個接口,代表產品服務,其有兩個方法對應讀取產品以及減扣庫存
接着我們在 servicesample 層實現一下這個服務(這裏直接創建一個靜態變量模擬多線程下訪問共享內存數據的場景)
接着我們在 clientsample 發起對着兩個服務的 RPC 調用
減庫存前
並行訪問 1000 次
[RemoteService("servicesample", "product")]
public interface IProductService : IActorService
{
[RemoteFunc(FuncType.Actor)]
Task<ProductOutput> Get(ProductInput input);
[RemoteFunc(FuncType.Actor)]
Task<ProductOutput> ReduceStock(ProductInput input);
}
接着我們讓入參類繼承一個基類,這個基類需要派生類重寫其 Actorid 字段。原因是 Actor 是通過全局唯一標識符在內部被標識的,訪問相同標識會被路由到同一個 actor。
public class ProductInput : ActorSendDto
{
public int PorductId { get; set; }
public int ReduceStock { get; set; }
public override string ActorId { get; set; }
}
接下來我們改造一下 clientsample 的調用方法,這裏修改的部分不多,只是把代理生成的方式替換了一下
public async Task<dynamic> GetProduct()
{
var actorService = serviceProxyFactory.CreateActorProxy<IProductService>();
return await actorService.Get(new ProductInput() { ActorId = "1", PorductId = 1 });
}
public async Task<dynamic> ProductReduceStock()
{
var actorService = serviceProxyFactory.CreateActorProxy<IProductService>();
return await actorService.ReduceStock(new ProductInput() { ActorId = "1", PorductId = 1, ReduceStock = 1 });
}
接着我們對 servicesample 進行改造,首先我們需要在 hostbuilder 裏替換掉默認的 OxygenStartup,OxygenActorStartup 會幫我們掃描類型生成對應的 actor 代理 (其他代碼無變化,略)
.ConfigureWebHostDefaults(webhostbuilder => {
//註冊成爲oxygen服務節點
webhostbuilder.StartOxygenServer<OxygenActorStartup>((config) => {
config.Port = 80;
config.PubSubCompentName = "pubsub";
config.StateStoreCompentName = "statestore";
config.TracingHeaders = "Authentication";
});
})
接着我們需要將之前的商品持久化 PO 類繼承一個基類 ActorStateModel,該基類會強制派生類重寫兩個屬性 AutoSave 和 ReminderSeconds,前者代表是否自動持久化 (調用 Actor SDK 的 Statemanage 持久化到中間件,第二個代表如果開啓持久化,是瞬時持久化還是由 Actor 的 Timer 按照週期持久化,這裏的設計有點類似於 redis aof 模式下的 always 和 everysec,前者(ReminderSeconds=0) 採用每一次變更同步一次,性能損耗較大,後者採用每 n(取決於 ReminderSeconds 設置)秒通過 timer 異步同步一次,同時我在 Actor 代理中添加了版本管理,並不會導致你的 ReminderSeconds 設置了週期同步後到時間就會請求你的同步委託,而是檢測到版本變化後纔會請求), 這裏我測試就直接開啓自動同步並使用 always 模式
public class ProductPo : ActorStateModel
{
public int Id { get; set; }
public string Name { get; set; }
public int Stock { get; set; }
public override bool AutoSave { get; set; } = true;
public override int ReminderSeconds => 0;
}
最後我們對 ProductService 進行改造,如下:
public class ProductService : BaseActorService<ProductPo>, IProductService
{
static int visitCount = 0;
static ProductPo ProductPoInstance;
public async Task<ProductOutput> Get(ProductInput input)
{
ActorData ??= new ProductPo() { Id = 1, Name = "小白菜", Stock = 100 };
return new ProductOutput() { Message = $"第{visitCount}次請求成功,當前庫存剩餘{ActorData.Stock}" };
}
public async Task<ProductOutput> ReduceStock(ProductInput input)
{
Interlocked.Increment(ref visitCount);
await Task.Delay(new Random(Guid.NewGuid().GetHashCode()).Next(20, 50));//模擬數據庫耗時
ActorData ??= new ProductPo() { Id = 1, Name = "小白菜", Stock = 100 };
if (ActorData.Stock >= input.ReduceStock)
{
await Task.Delay(new Random(Guid.NewGuid().GetHashCode()).Next(50, 100));//模擬數據庫耗時
ActorData.Stock -= input.ReduceStock;
}
return new ProductOutput() { Message = $"第{visitCount}次請求成功,當前庫存剩餘{ActorData.Stock}" };
}
public override async Task SaveData(ProductPo model, ILifetimeScope scope)
{
Console.WriteLine("同步請求被調用了,此處可以進行數據庫持久化!");
await Task.CompletedTask;
}
}
可以看到我的服務繼承了一個基類 BaseActorService,並需要傳遞一個類型爲 ActorStateModel 的泛型,這樣在我的服務裏不再通過 IO 去拉取 ProductPoInstance,而是直接使用 ActorData 這個泛型實例進行各種操作即可,所以我刪除掉了對應的數據庫模擬耗時 (避免 actor 隊列訪問阻塞),最後你必須重寫 BaseActorService 的 SaveData 方法,該方法就是上文提到的同步委託,當我們開啓 AutoSave 時,ReminderSeconds=0 會在 actor 被調用操作完成後激活該委託,ReminderSeconds>0 時會被定時器定期根據 actor 對比版本後判斷是否需要激活。同時無論哪種方式我都在 actor 代理內部維護了一個 channel 異步隊列通過異步訂閱發佈的方式實現非阻塞式的 actor 持久化而不用擔心持久化導致的 io 阻塞問題。SaveData 入參返回的一個 ILifetimeScope 容器可以很方便的獲取到你的 repository 或者直接獲取 ef 的上下文進行對應的數據庫持久化操作 (這裏需要注意一下,Actor 持久化有兩層意思,第一層意思是 Actor sdk 會自帶一個 StateManager,當 Component 開啓 actor 支持後,可以通過 StateManager 將 actor 對象寫入中間件,而這裏提供的 SaveData 是我封裝的一個通過訂閱發佈異步調用的委託,方便開發人員持久化到數據庫用的,非 actor 原生自帶的設計)。
最後我們需要擴展我們的 Component,需要開啓 Actor 持久化支持,編輯文件後用 kubectl apply -f x.yaml 即可:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: actorStateStore
value: "true"
- name: redisHost
value: redis.infrastructure.svc.cluster.local:6379
- name: keyPrefix
value: none
接下來我們看看通過 jmter 重新請求後的情況
可以看到 Actor 確實解決了併發訪問安全的問題,同時也能看到我們的委託被正確的調用了。
總結一下,Actor 確實通過其特殊的設計模式解決了併發訪問數據安全的問題,同時也帶來了一些問題諸如需要特定框架支持,諸如 Actor 行爲內不能阻塞等等限制,不過相比其帶來的無鎖對象訪問來講,這點限制都是可以克服的,至少在特定場景下比如搶票、發紅包等等有一定併發同時又需要確保數據一致的場景,Actor 算是一個可選方案。至於更多的場景探索則需要同學們自己去摸索了,今天的分享就到這裏。下期不出意外的話我們會分享一下 Dapr 的服務限流
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/_2nKuhcEg9tzkZIOqhGI7g