eBPF Ringbuf 實現與原理

前言

在 eBPF 生態系統中,新的 MPSC(多生產者單消費者)Ring Buffer 的實現允許多個 CPU 向同一個共享的 Ring Buffer 提交數據,而在消費端則只假定一個消費者。本文介紹了該 Ringbuf 的設計動機、語法和 API,並與其他替代方案進行了比較。

動機

有兩個關鍵的動機促使了這個 Ring Buffer 的創建,這些動機並不能通過現有的 perf buffer 得到滿足,從而推動了新 Ringbuf 的產生:

現有的 perf buffer 由於是 per-CPU 設計,因此在解決這兩個問題時都不夠理想。而通過 MPSC Ringbuf 實現可以很好地解決這兩點需求,尤其是在跨 CPU 保持事件順序方面, perf buffer 的 per-CPU 設計容易導致事件順序錯亂。

語法與 API

Ringbuf 在 BPF 程序中被表示爲 BPF_MAP_TYPE_RINGBUF 類型的 BPF map。此設計比創建每個 CPU 一個獨立的 Ringbuf 更加高效和簡單。

核心 API

BPF 程序使用以下 API 來與 Ringbuf 進行交互:

需要注意的是, bpf_ringbuf_reserve() 必須與 bpf_ringbuf_commit()bpf_ringbuf_discard() 成對使用,否則無法通過 eBPF 校驗器。此外,由於 bpf_ringbuf_reserve() 中可能存在鎖的爭用,爲了減少性能損失,建議儘可能晚地調用該函數以申請內存空間。

示例代碼

以下是一個簡單的 BPF 程序示例,展示如何使用 Ringbuf:

#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
struct {  // specify the type, eBPF specific syntax  __uint(type, BPF_MAP_TYPE_RINGBUF);    // specify the size of the buffer  // has to be a multiple of the page size   __uint(max_entries, 256 * 4096);} my_ringbuf SEC(".maps");  /* placed in maps section */
SEC("tracepoint/sched/sched_switch")
int handle_sched_switch(struct bpf_pt_regs *ctx) {
    void *record;
    // 預留空間
    record = bpf_ringbuf_reserve(&my_ringbuf, sizeof(struct sched_event), 0);
    if (!record) {
        return 0; // 預留失敗
    }
    // 在這裏填充數據
    struct sched_event *event = record;
    event->pid = bpf_get_current_pid_tgid() >> 32;
    event->cpu = bpf_get_smp_processor_id();
    // 提交數據
    bpf_ringbuf_commit(record,0);
    return 0;
}
char _license[] SEC("license") = "GPL";

在用戶空間,可以使用 ring_buffer__poll 來讀取數據。下面是一個示例代碼:

#include <stdio.h>
#include <bpf/libbpf.h>
#include <bpf/ringbuf.h>
struct sched_event {
    __u32 pid;
    __u32 cpu;
};
int main() {
    struct ring_buffer *ring_buf;
    struct bpf_object *obj;
    int err;
    // 加載 BPF 程序
    ......
    // 獲取 ring buffer
    ring_buf = ring_buffer__new(bpf_map__fd(obj->maps->my_ringbuf), NULL, NULL);
    if (!ring_buf) {
        fprintf(stderr, "Failed to create ring buffer\n");
        return 1;
    }
    // 輪詢數據
    while (true) {
        err = ring_buffer__poll(ring_buf, 100);
        if (err < 0) {
            fprintf(stderr, "Polling error: %d\n", err);
            break;
        }
    }
    ring_buffer__free(ring_buf);
    bpf_object__close(obj);
    return 0;
}

在這個示例中,我們定義了一個簡單的 BPF 程序,該程序在任務調度點記錄進程 ID 和 CPU 信息。在用戶空間,我們通過 ring_buffer__poll 函數來讀取 Ringbuf 中的數據。

設計與實現

這種預留 / 提交機制允許多個生產者(無論是在不同的 CPU 上還是在同一 CPU / 同一 BPF 程序中)獨立地預留記錄並進行操作,而不會阻塞其他生產者。這意味着,如果一個 BPF 程序被另一個共享同一 Ringbuf 的 BPF 程序中斷,它們都會成功預留記錄(只要有足夠的空間),並可以獨立工作並提交。

Ringbuf 內部實現爲大小爲 2 的冪的循環緩衝區,使用兩個邏輯上不斷增加的計數器:

與替代方案的比較

實現 BPF Ring Buffer 之前,作者評估了內核中現有的替代方案,但發現它們無法滿足需求。現有方案主要分爲幾類:

通過引入 MPSC Ring Buffer,eBPF 的使用場景更加靈活,同時保持了高效的內存使用和事件順序。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/s18ncQ-wd52I6dAKa-KerQ