Dapr - 狀態管理

前言:

 前一篇 Dapr - 服務調用 對 Dapr 的服務調用方式進行了解,本篇繼續對狀態管理進行了解。

一、狀態管理 - 解決的問題

 在分佈式應用程序中跟蹤狀態存在一下問題:

 Dapr 狀態管理構建塊解決了這些難題。它簡化了跟蹤狀態,而無需依賴關係或第三方存儲 Sdk 上的學習曲線。

Dapr 狀態管理提供 密鑰 / 值 API。此功能不支持關係數據存儲或圖形數據存儲

二、狀態管理 - 工作原理

 應用程序可以使用 Dapr 的狀態管理 API,使用狀態存儲組件保存和讀取鍵 / 值對。通過 HTTP 或 gRPC 調用 API。 

 例如,通過使用 HTTP POST 可以保存鍵 / 值對,通過使用 HTTP GET 可以讀取一個鍵並返回它的值。

 

三、狀態管理 - 特點

  Dapr 數據存儲被建模爲組件,可以在不修改你的服務代碼的情況下進行替換。

   CAP 定理是一組適用於存儲狀態的分佈式系統的原則。下圖展示了 CAP 定理的三個屬性。

  

  定理指出,分佈式數據系統提供一致性、可用性和分區容差之間的權衡。而且,任何數據存儲只能 保證三個屬性中的兩個

  分佈式應用程序必須處理 P 屬性。隨着服務彼此間的網絡調用通信,會發生網絡中斷 (P) 。考慮到這一點,分佈式應用程序必須是 AP 或 CP。

  AP 應用程序選擇 "可用性一致性"。Dapr 通過其 最終一致性 策略支持此選擇。請考慮使用基礎數據存儲(例如 Azure CosmosDB)將冗餘數據存儲在多個副本上。對於最終一致性,狀態存儲會將更新寫入一個副本並完成與客戶端的寫入請求。此時間過後,存儲將以異步方式更新其副本。讀取請求可以返回任何副本的數據,包括尚未收到最新更新的副本。

  CP 應用程序選擇一致性和可用性。Dapr 通過其 強一致性 策略支持此選擇。在此方案中,狀態存儲將同步更新 所有 (或在某些情況下,在完成寫入請求 之前) 必需副本的 仲裁。讀取操作將跨副本持續返回最新數據。

   在多用戶應用程序中,有可能多個用戶同時更新同一時間) (相同的數據。Dapr 支持樂觀併發控制 (OCC) 來管理衝突。OCC 基於一個假設,因爲用戶處理數據的不同部分,所以更新衝突很少見。更有效的方法是將更新成功,如果不成功,則重試。實現悲觀鎖定的替代方法可能會影響長時間運行的鎖定,導致數據爭用。

  Dapr 支持使用 Etag) (OCC 的樂觀併發控制。ETag 是與存儲的鍵 / 值對的特定版本相關聯的值。鍵 / 值對的每次更新時,ETag 值也會更新。當客戶端檢索鍵 / 值對時,響應包括當前 ETag 值。當客戶端更新或刪除鍵 / 值對時,它必須在請求正文中發送回該 ETag 值。如果其他客戶端同時更新了數據,則 Etag 不會匹配,請求將失敗。此時,客戶端必須檢索更新的數據,重新進行更改,然後重新提交更新。此策略稱爲 第一次寫入 - wins。

  Dapr 還支持 最後寫入 wins 策略。使用此方法時,客戶端不會將 ETag 附加到寫入請求。狀態存儲組件將始終允許更新,即使基礎值在會話期間已更改也是如此。最後寫入 - wins 對於數據爭用較少的高吞吐量寫入方案非常有用。同樣,可以容忍偶爾的用戶更新。

   Dapr 可以將 多項更改 作爲一個作爲事務實現的操作寫入數據存儲區。此功能僅適用於支持 ACID 事務的數據存儲。在撰寫本文時,這些存儲包括 Redis、MongoDB、PostgreSQL、SQL Server 和 Azure CosmosDB。

在下面的示例中,多項操作將發送到單個事務中的狀態存儲。所有操作都必須成功,事務才能提交。如果一個或多個操作失敗,則回滾整個事務。

四、.Net Core 中應用

  1、在項目【DaprFrontEnd】中添加控制器 - DaprStateController  用於展示狀態的各種操作

[Route("[controller]")]
[ApiController]
public class StateController : ControllerBase
{
    private readonly ILogger<DaprStateController> _logger;
    private readonly DaprClient _daprClient;
    public StateController(ILogger<StateController> logger, DaprClient daprClient)
    {
        _logger = logger;
        _daprClient = daprClient;
    }
    /// <summary>
    /// 獲取值
    /// </summary>
    /// <returns></returns>
    [HttpGet]
    public async Task<ActionResult> GetAsync()
    {
        var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
        return Ok(result);
    }
    /// <summary>
    /// 保存值
    /// </summary>
    /// <returns></returns>
    [HttpPost]
    public async Task<ActionResult> PostAsync()
    {
        await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
        return Ok("done");
    }
    /// <summary>
    /// 刪除值
    /// </summary>
    /// <returns></returns>
    [HttpDelete]
    public async Task<ActionResult> DeleteAsync()
    {
        await _daprClient.DeleteStateAsync("statestore", "guid");
        return Ok("done");
    }
    /// <summary>
    /// 通過tag防止併發衝突,保存值
    /// </summary>
    /// <returns></returns>
    [HttpPost("withtag")]
    public async Task<ActionResult> PostWithTagAsync()
    {
        var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
        await _daprClient.TrySaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), etag);
        return Ok("done");
    }
    /// <summary>
    /// 通過tag防止併發衝突,刪除值
    /// </summary>
    /// <returns></returns>
    [HttpDelete("withtag")]
    public async Task<ActionResult> DeleteWithTagAsync()
    {
        var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
        return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
    }
    /// <summary>
    /// 從綁定獲取值,健值name從路由模板獲取
    /// </summary>
    /// <param ></param>
    /// <returns></returns>
    [HttpGet("frombinding/{name}")]
    public async Task<ActionResult> GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
    {
        return await Task.FromResult<ActionResult>(Ok(state.Value));
    }
    /// <summary>
    /// 根據綁定獲取並修改值,健值name從路由模板獲取
    /// </summary>
    /// <param ></param>
    /// <returns></returns>
    [HttpPost("withbinding/{name}")]
    public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
    {
        state.Value = Guid.NewGuid().ToString();
        return Ok(await state.TrySaveAsync());
    }
    /// <summary>
    /// 獲取多個值
    /// </summary>
    /// <returns></returns>
    [HttpGet("list")]
    public async Task<ActionResult> GetListAsync()
    {
        var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
        return Ok(result);
    }
    /// <summary>
    /// 刪除多個值
    /// </summary>
    /// <returns></returns>
    [HttpDelete("list")]
    public async Task<ActionResult> DeleteListAsync()
    {
        var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
        var removeList = new List<BulkDeleteStateItem>();
        foreach (var item in data)
        {
            removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
        }
        await _daprClient.DeleteBulkStateAsync("statestore", removeList);
        return Ok("done");
    }
}

 2、啓動程序

dapr run --dapr-http-port 3501 --app-port 8230  --app-id frontend dotnet  .\DaprFrontEnd.dll

 3、調用過程:

  

  

五、總結:

  Dapr 狀態管理構建塊提供了一個 API,用於在各種數據存儲區中存儲鍵 / 值數據。API 爲以下內容提供支持:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/F_tbAF58TA0FCLCD9ilL3A