Dapr - 狀態管理
前言:
前一篇 Dapr - 服務調用 對 Dapr 的服務調用方式進行了解,本篇繼續對狀態管理進行了解。
一、狀態管理 - 解決的問題
在分佈式應用程序中跟蹤狀態存在一下問題:
-
應用程序可能需要不同類型的數據存儲。
-
訪問和更新數據時可能需要不同的一致性級別。
-
多個用戶可以同時更新數據,需要衝突解決。
-
在與數據存儲交互時,服務必須重試發生的任何短暫的暫時性錯誤 。
Dapr 狀態管理構建塊解決了這些難題。它簡化了跟蹤狀態,而無需依賴關係或第三方存儲 Sdk 上的學習曲線。
Dapr 狀態管理提供 密鑰 / 值 API。此功能不支持關係數據存儲或圖形數據存儲。
二、狀態管理 - 工作原理
應用程序可以使用 Dapr 的狀態管理 API,使用狀態存儲組件保存和讀取鍵 / 值對。通過 HTTP 或 gRPC 調用 API。
例如,通過使用 HTTP POST 可以保存鍵 / 值對,通過使用 HTTP GET 可以讀取一個鍵並返回它的值。
三、狀態管理 - 特點
-
可插拔狀態存儲
Dapr 數據存儲被建模爲組件,可以在不修改你的服務代碼的情況下進行替換。
-
一致性
CAP 定理是一組適用於存儲狀態的分佈式系統的原則。下圖展示了 CAP 定理的三個屬性。
定理指出,分佈式數據系統提供一致性、可用性和分區容差之間的權衡。而且,任何數據存儲只能 保證三個屬性中的兩個:
-
一致性 (C) 。羣集中的每個節點都將使用最新的數據做出響應,即使在所有副本更新之前,系統都必須阻止請求。如果查詢當前正在更新的項的 "一致性系統",則在所有副本都成功更新之前,將不會收到響應。但是,您將始終接收最新的數據。
-
可用性 () 。即使該響應不是最新的數據,每個節點都將返回立即響應。如果您在 "可用系統" 中查詢正在更新的項,則您將獲得該服務在此時可以提供的最佳可能的答案。
-
分區容差 (P) 。即使複製的數據節點發生故障或失去與其他複製的數據節點的連接,保證系統仍可繼續運行。
分佈式應用程序必須處理 P 屬性。隨着服務彼此間的網絡調用通信,會發生網絡中斷 (P) 。考慮到這一點,分佈式應用程序必須是 AP 或 CP。
AP 應用程序選擇 "可用性一致性"。Dapr 通過其 最終一致性 策略支持此選擇。請考慮使用基礎數據存儲(例如 Azure CosmosDB)將冗餘數據存儲在多個副本上。對於最終一致性,狀態存儲會將更新寫入一個副本並完成與客戶端的寫入請求。此時間過後,存儲將以異步方式更新其副本。讀取請求可以返回任何副本的數據,包括尚未收到最新更新的副本。
CP 應用程序選擇一致性和可用性。Dapr 通過其 強一致性 策略支持此選擇。在此方案中,狀態存儲將同步更新 所有 (或在某些情況下,在完成寫入請求 之前) 必需副本的 仲裁。讀取操作將跨副本持續返回最新數據。
-
併發
在多用戶應用程序中,有可能多個用戶同時更新同一時間) (相同的數據。Dapr 支持樂觀併發控制 (OCC) 來管理衝突。OCC 基於一個假設,因爲用戶處理數據的不同部分,所以更新衝突很少見。更有效的方法是將更新成功,如果不成功,則重試。實現悲觀鎖定的替代方法可能會影響長時間運行的鎖定,導致數據爭用。
Dapr 支持使用 Etag) (OCC 的樂觀併發控制。ETag 是與存儲的鍵 / 值對的特定版本相關聯的值。鍵 / 值對的每次更新時,ETag 值也會更新。當客戶端檢索鍵 / 值對時,響應包括當前 ETag 值。當客戶端更新或刪除鍵 / 值對時,它必須在請求正文中發送回該 ETag 值。如果其他客戶端同時更新了數據,則 Etag 不會匹配,請求將失敗。此時,客戶端必須檢索更新的數據,重新進行更改,然後重新提交更新。此策略稱爲 第一次寫入 - wins。
Dapr 還支持 最後寫入 wins 策略。使用此方法時,客戶端不會將 ETag 附加到寫入請求。狀態存儲組件將始終允許更新,即使基礎值在會話期間已更改也是如此。最後寫入 - wins 對於數據爭用較少的高吞吐量寫入方案非常有用。同樣,可以容忍偶爾的用戶更新。
-
事務
Dapr 可以將 多項更改 作爲一個作爲事務實現的操作寫入數據存儲區。此功能僅適用於支持 ACID 事務的數據存儲。在撰寫本文時,這些存儲包括 Redis、MongoDB、PostgreSQL、SQL Server 和 Azure CosmosDB。
在下面的示例中,多項操作將發送到單個事務中的狀態存儲。所有操作都必須成功,事務才能提交。如果一個或多個操作失敗,則回滾整個事務。
四、.Net Core 中應用
1、在項目【DaprFrontEnd】中添加控制器 - DaprStateController 用於展示狀態的各種操作
[Route("[controller]")]
[ApiController]
public class StateController : ControllerBase
{
private readonly ILogger<DaprStateController> _logger;
private readonly DaprClient _daprClient;
public StateController(ILogger<StateController> logger, DaprClient daprClient)
{
_logger = logger;
_daprClient = daprClient;
}
/// <summary>
/// 獲取值
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ActionResult> GetAsync()
{
var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
return Ok(result);
}
/// <summary>
/// 保存值
/// </summary>
/// <returns></returns>
[HttpPost]
public async Task<ActionResult> PostAsync()
{
await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
return Ok("done");
}
/// <summary>
/// 刪除值
/// </summary>
/// <returns></returns>
[HttpDelete]
public async Task<ActionResult> DeleteAsync()
{
await _daprClient.DeleteStateAsync("statestore", "guid");
return Ok("done");
}
/// <summary>
/// 通過tag防止併發衝突,保存值
/// </summary>
/// <returns></returns>
[HttpPost("withtag")]
public async Task<ActionResult> PostWithTagAsync()
{
var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
await _daprClient.TrySaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), etag);
return Ok("done");
}
/// <summary>
/// 通過tag防止併發衝突,刪除值
/// </summary>
/// <returns></returns>
[HttpDelete("withtag")]
public async Task<ActionResult> DeleteWithTagAsync()
{
var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
}
/// <summary>
/// 從綁定獲取值,健值name從路由模板獲取
/// </summary>
/// <param ></param>
/// <returns></returns>
[HttpGet("frombinding/{name}")]
public async Task<ActionResult> GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
{
return await Task.FromResult<ActionResult>(Ok(state.Value));
}
/// <summary>
/// 根據綁定獲取並修改值,健值name從路由模板獲取
/// </summary>
/// <param ></param>
/// <returns></returns>
[HttpPost("withbinding/{name}")]
public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
{
state.Value = Guid.NewGuid().ToString();
return Ok(await state.TrySaveAsync());
}
/// <summary>
/// 獲取多個值
/// </summary>
/// <returns></returns>
[HttpGet("list")]
public async Task<ActionResult> GetListAsync()
{
var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
return Ok(result);
}
/// <summary>
/// 刪除多個值
/// </summary>
/// <returns></returns>
[HttpDelete("list")]
public async Task<ActionResult> DeleteListAsync()
{
var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
var removeList = new List<BulkDeleteStateItem>();
foreach (var item in data)
{
removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
}
await _daprClient.DeleteBulkStateAsync("statestore", removeList);
return Ok("done");
}
}
2、啓動程序
dapr run --dapr-http-port 3501 --app-port 8230 --app-id frontend dotnet .\DaprFrontEnd.dll
3、調用過程:
五、總結:
Dapr 狀態管理構建塊提供了一個 API,用於在各種數據存儲區中存儲鍵 / 值數據。API 爲以下內容提供支持:
-
批量操作
-
強一致性和最終一致性
-
樂觀併發控制
-
多項事務
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/F_tbAF58TA0FCLCD9ilL3A