RingBuff 在多核通訊之間的妙用

公衆號

大家好,我是小麥,今天分享一篇 RingBuff 相關的文章。整體感覺很不錯。

前言

兩個核(分爲主核和從核)之間進行通訊,一般使用共享內存的形式進行。

核間通訊的機制,是以共享內存爲媒介,利用核間中斷來通知對方。通過核間對象的句柄進行具體的訪問和操作;

(1)多個核進行核間通訊時,首先由一個核創建一個核間對象,另外一個核通過名稱或索引定位到該對象的句柄,從而對核間通訊對象進行操作。

(2)核間中斷來通知對方,採用 “硬件信號量” 對資源進行臨界保護,再利用操作系統的信號量使得核間任務的通訊如同單核任務上的通訊。

1、定義

RingBuf 又稱爲 Circular Buffer, 分爲兩種:分片形式的 RingBuf內存分割形式的 RingBuf

2、特點

  1. 由主核和從覈定義好要進行數據共享內存區域(地址、大小);

  2. 環形緩衝區 RingBuf 用於兩個核之間通訊;

  3. 環形緩衝區 RingBuf 的使用遵守嚴格的先進先出順序進行處理;

  4. 環形緩衝區 RingBuf 是一項很好的技術,不用頻繁的分配內存,而且在大多數情況下,內存的反覆使用也使得我們能用更少的內存塊做更多的事。

  5. 環形緩衝區 RingBuf 是一個先進先出的循環緩衝區,可以向通信程序提供對緩衝區的互斥訪問。

  6. 環形緩衝區 RingBuf 是點對點的單向通訊;

3、環形緩衝區的實現原理

環形緩衝區通常有一個讀指針和一個寫指針

讀指針指向環形緩衝區中可讀的數據,寫指針指向環形緩衝區中可寫的緩衝區。通過移動讀指針和寫指針就可以實現緩衝區的數據讀取和寫入。

由於 Ringbuf 具體實現上可以分爲分片形式的 RingBuf、內存分割形式的 RingBuf;下面分別講述這兩種實現方式。

分片形式的 RingBuf

1 原理

分片形式的 RingBuf 首先在內存堆中申請一塊兩核共享的內存區域,然後將這塊內存區域分割成固定大小的分片,然後使用相應的內存管理結構進行管理。如下圖所示:

2 RingBuf 管理結構體

typedef struct
{
    u32 WrIndex;//寫入分片的索引值
    u32 RdIndex;//讀出分片的索引值
    u8  *MemBufAddr;//內存堆的起始地址
    u32 MemShardingNum;//內存分片數量
    u32 MemShardingSize;//每個內存分片大小
    u32 InitDoneFlag;//初始化完成標誌
    u32HandleSem;//RingBuf使用的信號量
    u32 SrcCpuID;//寫入端CPU ID號
    u32 DstCpuID;//讀取端CPU ID號
}RingBufMan;

3 RingBuf 要實現的函數接口

  1. 初始化 RingBuf 對象RingBufInit(), 由其中一個核根據 Ringbuf 配置參數創建 RingBuf 對象,創建完成後需要完成 Ringbuf 管理結構體的初始化;

  2. 將新創建的 RingBuf 對象加入到共享對象管理結構體,便於集中管理 (RingBufJoinShareQue());

  3. 查找相應的 RingBuf 對象 ID(根據寫入端 / 讀取端 CPU ID 號在共享對象隊列中查找 RingBufGetID());

  4. 判空RingBufIsEmpty()

(ringBufMan-> WrIndex == ringBufMan-> RdIndex)
  1. 判滿 RingBufIsFull()
Tmp = ringBufMan-> WrIndex - ringBufMan-> RdIndex + 1

(Tmp ==|| Tmp == ringBufMan-> MemShardingNum)
  1. 寫入RingBufPut(RingBufMan*,u8 *BufAddr,u32 LenPut)
pWr=ringBufMan->MemBufAddr+ringBufMan->WrIndex*ringBufMan->MemShardingSize;

*(u32*)pWr = LenPut;

memcpy((void*)( pWr+sizeof(u32) ),(void*)pBufAddr, LenPut);

ringBufMan->WrIndex= (ringBufMan->WrIndex+1)% ringBufMan-> MemShardingNum;
  1. 讀取RingBufGet(RingBufMan*,u8 *BufAddr,u32 LenMaxGut,u8 *Len)
pRd=ringBufMan->MemBufAddr+ringBufMan->RdIndex*ringBufMan->MemShardingSize;

LenValid= *(u32*)pRd;

Lencp=min(LenValid, LenMaxGut);

memcpy((void*)BufAddr ,(void*)( pWr+sizeof(u32) ), Lencp);

ringBufMan->RdIndex= (ringBufMan->RdIndex+1)% ringBufMan-> MemShardingNum;

*Len= Lencp;

當然,這裏的 Put\Get 只是一個分片的讀寫,至於一包數據寫時需要多少個分片,讀時需要讀完幾個分片,需要根據數據包的大小具體的計算;

內存分割形式的 RingBuf

typedef struct { 

        unsigned char *buffer; 
        unsigned int size; 
        unsigned int in; 
        unsigned int out; 
        spinlock_t *lock; 
}kfifo ;

其中 buffer 指向存放數據的緩衝區,size 是緩衝區的大小,in 是寫指針下標,out 是讀指針下標,lock 是加到 struct kfifo 上的自旋鎖(上面說不加鎖不是這裏的鎖),防止多個進程併發訪問此數據結構。

注:我們保有對應的讀寫指針,當第一批數據(藍色)完成,第二批數據(紅色)會根據當前的寫指針位置繼續我們的數據操作,當達到最大的 Buffer_Size 時,會重新回到 Buffer 的開始端。

4、多個應用讀寫 RingBuf 情況下的處理

互斥鎖

在通常情況下,環形緩衝區的讀用戶僅僅會影響讀指針,而寫用戶僅僅會影響寫指針。如果僅僅有一個讀用戶和一個寫用戶,那麼不需要添加互斥保護機制就可以保證數據的正確性。如果有多個讀寫用戶訪問環形緩衝區,那麼必須添加互斥保護機制來確保多個用戶互斥訪問環形緩衝區。

互斥鎖可以採用兩個核共享的自旋鎖來實現,哪個核等到鎖,哪個核有權對 RingBuf 資源進行讀寫操作。

使用異步消息隊列 AsyncMsgQ

AsyncMsgQ 是基於 RingBuf 來實現的,用於主從核內多個線程之間數據交換。

  1. AsyncMsgQ 是基於 RingBuf,實現了兩個核之間兩個線程之間的通訊,通過 MsgAttr 來表示消息來自哪個核的哪個線程 ID;

  2. 可以將 AsyncMsgQ 與線程 ID 進行綁定,並在收發端設置一個守護線程,根據接收到的 MsgAttr 中的目的線程 ID 進行消息分發,進而將消息分發到不同的 AsyncMsgQ 隊列中,一包消息接收完成之後可以調用 AsyncMsgQ 已經掛載好的回調函數,進一步對消息進行解析或使用信號量將消息發送到任務。

多個應用程序讀寫還可以在 RingBuf 的基礎上實現消息隊列,消息隊列通過管理結構體記錄消息的 port,保證寫入到 ringbuf 時數據寫入 / 讀取的原子性。

除了保證寫入 / 讀取的原子性操作,還有一個問題就是,若核 1 中有多個應用程序以臨界訪問的形式向 RingBuf 中寫數據,那麼另外一個核 0 如何知道是哪個應用程序寫入到 RingBuf

爲了實現 Ringbuf 這種攜帶數據的功能,我們可以對寫入 RingBuf 的每一條消息進行標識。

例如,在每一包消息的頭部增加一個數據結構用於表示該報數據來自哪個核的哪個應用,又去往哪個核的哪個應用,包含了多少數據,使用了多少分片等等信息。

typedef struct
{
    u32 MsgShardingNum;//本消息包占用的Ringbuf分片數
    u32 MsgLen;//本消息包的長度
    u32 SrcCpuID;//源端CPU ID號
    u32 DstCpuID;//目的端CPU ID號
    u32 SrcThreadID;//源端線程ID
    u32 DstThreadID;//目的端線程ID

}MsgAttr;

同時爲了表示每個分片所屬消息 ID 及有效數據大小,還涉及了以下結構:

typedef struct
{
    u16ShardingID;//分片ID,用於識別是第幾個分片
    u16ShardingLen;//分片有效數據長度

}ShardingAttr;

RingBuf 在內存中的分佈如下:

AsyncMsgQ 主要的接口函數:

  1. 初始化 AsyncMsgQ

  2. AsyncMsgQ 通道申請

申請未被使用的 AsyncMsgQ 通道,並同 ThreadID 綁定,同時,設置相應的消息接收完成回調函數

  1. 數據發送

計算發送數據所需要的 RingBuf 分片數,將數據拷貝到分片,並設置相應的MsgAttrShardingAttr

  1. 數據接收

包括輪詢接收,信號量阻塞接收,根據接收到的 MsgAttr 進行數據解析和分發;

參考:

https://en.wikipedia.org/wiki/Circular_buffer

原文鏈接:https://blog.csdn.net/u010961173/article/details/79839450

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VdPWsjmrByCm66YHYfE1lg