基於靜態內存的循環隊列的實現

爲什麼有循環隊列

基本的標準庫容器比如 vector,map,list,deque 等不能滿足複雜的日常需求時,我們就應該想一個更高效的容器去處理我們的業務場景,以此去提升我們程序性能。

什麼場景下用循環隊列

一個比較常見的業務需求是,當讀寫數據同時往一處填充時,寫的速度和讀取速度不一致,並且需要時刻清理歷史數據,避免內存溢出,這些歷史數據也被參與其他場景的計算中,就導致資源被大量消耗。

可以想到的標準容器

1,有序的插入 std::map,底層是紅黑樹的實現,當寫入和刪除數據時,都會觸發紅黑樹的調整,降低程序效率;

2,無序的插入 std::unordered_map,底層實現是 hash,插入和刪除的效率不錯,但是是動態內存申請,容器的容量時刻更改,頻繁的內存申請降低程序的效率;

3,std::list, std::deque, std::queue 不支持隨機訪問,並且訪問的時間複雜度 O(n), 也是不合理的;

4,std::vector, std::set 對查找不友好,無法支持按特定需求去進行查找。

循環隊列登上歷史舞臺

固定的時間窗口,肯定是靜態內存的使用,並且可以高效訪問,插入容器。此時,只需要申請一塊連續大小的內存,此內存通常大於實際使用的內存大小,不斷在內存區域進行覆寫,新數據覆蓋老數據,無需重複申請新的內存,循環隊列派上用場了。

什麼是循環緩衝區隊列

循環緩衝區 (也稱爲環形緩衝區) 是固定大小的緩衝區,工作原理就像內存是連續的且可循環的一樣。在生成和使用內存時,不需將原來的數據全部重新清理掉,只要調整 head/tail 指針即可。當添加數據時,head 指針前進。當使用數據時,tail 指針向前移動。當到達緩衝區的尾部時,指針又回到緩衝區的起始位置。

如何實現循環緩衝區隊列

1,數據結構

基礎數據緩衝區

緩衝區的最大範圍

“head” 指針的當前位置(添加元素時增加)

“tail” 指針的當前位置(讀取元素後增加)

一個標誌位來指示緩衝區是否已滿

// The hidden definition of our circular buffer structure
typedef struct circular_buf_t
{
    uint8_t *buffer;
    size_t head;
    size_t tail;
    size_t max; //of the buffer
    bool full;
} *cbuf_handle_t;

2,確認緩衝區是否已滿

當 head == tail, 循環緩衝區的 “full” 和 “empty” 看起來是相同的:head 和 tail 指針是相等的。

有兩種方法區分 full 和 empty:

使用緩衝區中的一個數據位:

Full:tail + 1 == head

Empty:head == tail

使用一個 bool 標誌位和其他邏輯來區分:

Full:full

Empty:(head == tail) && (!full)

從上面的數據結構看到我們使用的是通過標誌位來判斷,那麼就需要在 get /set 操作方法中更新該標誌位。

3,初始化和復位

/// Pass in a storage buffer and size 
/// Returns a circular buffer handle
cbuf_handle_t circular_buf_init(uint8_t* buffer, size_t size)
{
    assert(buffer && size);

    cbuf_handle_t cbuf = malloc(sizeof(circular_buf_t));
    assert(cbuf);

    cbuf->buffer = buffer;
    cbuf->max = size;
    circular_buf_reset(cbuf);

    assert(circular_buf_empty(cbuf));

    return cbuf;
}

/// Reset the circular buffer to empty, head == tail
void circular_buf_reset(cbuf_handle_t cbuf)
{
    assert(cbuf);

    cbuf->head = 0;
    cbuf->tail = 0;
    cbuf->full = false;
}

/// Free a circular buffer structure.
/// Does not free data buffer; owner is responsible for that
void circular_buf_free(cbuf_handle_t cbuf)
{
    assert(cbuf);
    free(cbuf);
}

4,狀態檢查

bool circular_buf_full(cbuf_handle_t cbuf)
{
    assert(cbuf);

    return cbuf->full;
}
bool circular_buf_empty(cbuf_handle_t cbuf)
{
    assert(cbuf);

    return (!cbuf->full && (cbuf->head == cbuf->tail));
}
size_t circular_buf_capacity(cbuf_handle_t cbuf)
{
    assert(cbuf);

    return cbuf->max;
}
size_t circular_buf_size(cbuf_handle_t cbuf)
{
    assert(cbuf);

    size_t size = cbuf->max;

    if(!cbuf->full)
    {
        if(cbuf->head >= cbuf->tail)
        {
            size = (cbuf->head - cbuf->tail);
        }
        else
        {
            size = (cbuf->max + cbuf->head - cbuf->tail);
        }
    }

    return size;
}

5,數據填充刪除

從緩衝區讀取數據後刪除,即取出 tail 指針位置的值並更新 tail 指針。如果緩衝區是空的,不修改指針值並且返回 error=-1。

int circular_buf_get(cbuf_handle_t cbuf, uint8_t * data)
{
    assert(cbuf && data && cbuf->buffer);

    int r = -1;

    if(!circular_buf_empty(cbuf))
    {
        *data = cbuf->buffer[cbuf->tail];
        // 當刪除數據時,full 標誌置爲 flase ,tail 指針向前移一位
        cbuf->full = false;
        cbuf->tail = (cbuf->tail + 1) % cbuf->max;
        r = 0;
    }

    return r;
}

當讀跟不上寫速度的時候有兩種方法,一種是覆蓋舊數據,一種是不做處理,阻塞或者丟棄新數據

1,阻塞或者丟棄新數據

以丟棄新數據爲例, 如果 buff 已滿了(circular_buf_full = true),則不做任何處理

int circular_buf_put_block(cbuf_handle_t cbuf, uint8_t data)
{
    int r = -1;

    assert(cbuf && cbuf->buffer);

    if(!circular_buf_full(cbuf))
    {
        cbuf->buffer[cbuf->head] = data;
        //  將寫指針位置往前移動
        cbuf->head = (cbuf->head + 1) % cbuf->max;
        cbuf->full = (cbuf->head == cbuf->tail);
        r = 0;
    }

    return r;
}

以 Demo 爲例

100ms 寫一次,而 400ms 讀一次,buff 大小爲 16B,所以其阻塞住了,並且丟棄了新來寫的數據,只有 buff 有緩衝的時候才能寫。

void *read_thread(void *arg)
{
    uint8_t data;;
    printf("%s %d\n", __FUNCTION__, __LINE__);
    while (1)
    {
        pthread_mutex_lock(&rw_mutex);
        circular_buf_get(arg, &data);
        pthread_mutex_unlock(&rw_mutex);
        printf("%s %d ===>data:0x%x\n", __FUNCTION__, __LINE__, data);

        usleep(400*1000);
    }

    return NULL;
}

int main()
{
    pthread_t ntid;
    uint8_t data;
    uint8_t buff[16] = {0};
    cbuf_handle_t handle = circular_buf_init(buff, sizeof(buff));
    int ret;
    pthread_create(&ntid, NULL, read_thread, handle);
    printf("%s %d data:0x%x\n", __FUNCTION__, __LINE__, data);
    while (1)
    {
        usleep(100*1000);
        data = rand()%0xff;
        pthread_mutex_lock(&rw_mutex);
        ret = circular_buf_put_block(handle, data);
        pthread_mutex_unlock(&rw_mutex);
        if(!ret)
            printf("%s %d write data:0x%x\n", __FUNCTION__, __LINE__, data);
    }
    free(buff);
    circular_buf_free(handle);
    return 0;
}

打印輸出:

main 201 data:0x0
read_thread 179
read_thread 185 ===>data:0x0
main 210 write data:0x29
main 210 write data:0x6b
main 210 write data:0xd6
read_thread 185 ===>data:0x29
main 210 write data:0xeb
main 210 write data:0x2c
main 210 write data:0xa9
main 210 write data:0x3
read_thread 185 ===>data:0x6b
main 210 write data:0x21
main 210 write data:0xbb
main 210 write data:0xef
main 210 write data:0x5f
read_thread 185 ===>data:0xd6
main 210 write data:0x5f
main 210 write data:0x4c
main 210 write data:0xfc
main 210 write data:0x10
read_thread 185 ===>data:0xeb
main 210 write data:0xec
main 210 write data:0xbe
main 210 write data:0xd4
main 210 write data:0xed
read_thread 185 ===>data:0x2c
main 210 write data:0x51
main 210 write data:0x6
read_thread 185 ===>data:0xa9
main 210 write data:0x99
read_thread 185 ===>data:0x3
main 210 write data:0x65
read_thread 185 ===>data:0x21
main 210 write data:0x33

覆蓋舊數據

int circular_buf_put_overwrite(cbuf_handle_t cbuf, uint8_t data)
{
    assert(cbuf && cbuf->buffer);

    cbuf->buffer[cbuf->head] = data;

    // 如果buff 滿了需要複寫讀到的數據,那麼需要將讀位置往前移動
    if (cbuf->full)
    {
        cbuf->tail = (cbuf->tail + 1) % cbuf->max;
    }

    cbuf->head = (cbuf->head + 1) % cbuf->max;
    cbuf->full = (cbuf->head == cbuf->tail);
    return 0;
}

同樣以該 Demo 爲例

100ms 寫一次,而 400ms 讀一次,buff 大小爲 16B,將寫替換爲 circular_buf_put_overwrite 來覆蓋舊數據。

int main()
{
    pthread_t ntid;
    uint8_t data;
    uint8_t buff[16] = {0};
    cbuf_handle_t handle = circular_buf_init(buff, sizeof(buff));
    int ret;
    pthread_create(&ntid, NULL, read_thread, handle);
    printf("%s %d data:0x%x\n", __FUNCTION__, __LINE__, data);
    while (1)
    {
        usleep(100*1000);
        data = rand()%0xff;
        pthread_mutex_lock(&rw_mutex);
        ret = circular_buf_put_overwrite(handle, data);
        pthread_mutex_unlock(&rw_mutex);
        if(!ret)
            printf("%s %d write data:0x%x\n", __FUNCTION__, __LINE__, data);
    }
    free(buff);
    circular_buf_free(handle);
    return 0;
}

100ms 寫一次,而 400ms 讀一次,buff 大小爲 16B,因爲複寫了,所以 buffer 滿了之後,讀到的數據總是 16B 寫之前寫的首次數據

輸出

main 227 write data:0x33
main 227 write data:0xec
main 227 write data:0x3f
main 227 write data:0x54
read_thread 202 ===>data:0x51
main 227 write data:0x16
main 227 write data:0xa7
main 227 write data:0x22
main 227 write data:0xcd
read_thread 202 ===>data:0x99
main 227 write data:0xcc
main 227 write data:0x8f
main 227 write data:0x60
main 227 write data:0xd4
read_thread 202 ===>data:0x65
main 227 write data:0xf3
main 227 write data:0x4e
main 227 write data:0x4a
main 227 write data:0x60
read_thread 202 ===>data:0x33   // 讀到這個數據位置與之前的寫位置正好相差16
main 227 write data:0x3d

認識一個人就是開了一扇窗戶,就能看到不一樣的東西,聽到不一樣的聲音,能讓你思考,覺悟,這已經夠了。其他還有很多,比如機會,幫助,我不確定。這個在一般人看來可能不重要,但是我知道這個很重要。 我是小陽哥,一個喜歡碼字和交流的程序員:2023,我們依然在一起,希望用身邊的人,身邊的事,讓我們少走一些彎路,一點點就好。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/xH7pF456PPnxkH5OpsN26A