eBPF Ringbuf 實現與原理
前言
在 eBPF 生態系統中,新的 MPSC(多生產者單消費者)Ring Buffer 的實現允許多個 CPU 向同一個共享的 Ring Buffer 提交數據,而在消費端則只假定一個消費者。本文介紹了該 Ringbuf 的設計動機、語法和 API,並與其他替代方案進行了比較。
動機
有兩個關鍵的動機促使了這個 Ring Buffer 的創建,這些動機並不能通過現有的 perf buffer
得到滿足,從而推動了新 Ringbuf 的產生:
- 更高效的內存利用:通過多個 CPU 共享同一個 Ring Buffer 實現;
- 事件順序保持:即使是跨多個 CPU 的順序事件(例如
fork/exec/exit
事件),也能按時間順序保留。
現有的 perf buffer
由於是 per-CPU 設計,因此在解決這兩個問題時都不夠理想。而通過 MPSC Ringbuf 實現可以很好地解決這兩點需求,尤其是在跨 CPU 保持事件順序方面, perf buffer
的 per-CPU 設計容易導致事件順序錯亂。
語法與 API
Ringbuf 在 BPF 程序中被表示爲 BPF_MAP_TYPE_RINGBUF
類型的 BPF map。此設計比創建每個 CPU 一個獨立的 Ringbuf 更加高效和簡單。
核心 API
BPF 程序使用以下 API 來與 Ringbuf 進行交互:
bpf_ringbuf_output()
:將數據從一個位置複製到 Ringbuf 中,類似於bpf_perf_event_output()
;bpf_ringbuf_reserve()/bpf_ringbuf_commit()/bpf_ringbuf_discard()
:通過分兩步完成數據提交。首先調用bpf_ringbuf_reserve()
預留固定大小的空間,如果成功,返回一個指向 Ringbuf 內存區域的指針,程序可以使用這個指針操作數據,完成後使用bpf_ringbuf_commit()
提交數據或bpf_ringbuf_discard()
丟棄數據。
需要注意的是, bpf_ringbuf_reserve()
必須與 bpf_ringbuf_commit()
或 bpf_ringbuf_discard()
成對使用,否則無法通過 eBPF 校驗器。此外,由於 bpf_ringbuf_reserve()
中可能存在鎖的爭用,爲了減少性能損失,建議儘可能晚地調用該函數以申請內存空間。
示例代碼
以下是一個簡單的 BPF 程序示例,展示如何使用 Ringbuf:
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
struct { // specify the type, eBPF specific syntax __uint(type, BPF_MAP_TYPE_RINGBUF); // specify the size of the buffer // has to be a multiple of the page size __uint(max_entries, 256 * 4096);} my_ringbuf SEC(".maps"); /* placed in maps section */
SEC("tracepoint/sched/sched_switch")
int handle_sched_switch(struct bpf_pt_regs *ctx) {
void *record;
// 預留空間
record = bpf_ringbuf_reserve(&my_ringbuf, sizeof(struct sched_event), 0);
if (!record) {
return 0; // 預留失敗
}
// 在這裏填充數據
struct sched_event *event = record;
event->pid = bpf_get_current_pid_tgid() >> 32;
event->cpu = bpf_get_smp_processor_id();
// 提交數據
bpf_ringbuf_commit(record,0);
return 0;
}
char _license[] SEC("license") = "GPL";
在用戶空間,可以使用 ring_buffer__poll
來讀取數據。下面是一個示例代碼:
#include <stdio.h>
#include <bpf/libbpf.h>
#include <bpf/ringbuf.h>
struct sched_event {
__u32 pid;
__u32 cpu;
};
int main() {
struct ring_buffer *ring_buf;
struct bpf_object *obj;
int err;
// 加載 BPF 程序
......
// 獲取 ring buffer
ring_buf = ring_buffer__new(bpf_map__fd(obj->maps->my_ringbuf), NULL, NULL);
if (!ring_buf) {
fprintf(stderr, "Failed to create ring buffer\n");
return 1;
}
// 輪詢數據
while (true) {
err = ring_buffer__poll(ring_buf, 100);
if (err < 0) {
fprintf(stderr, "Polling error: %d\n", err);
break;
}
}
ring_buffer__free(ring_buf);
bpf_object__close(obj);
return 0;
}
在這個示例中,我們定義了一個簡單的 BPF 程序,該程序在任務調度點記錄進程 ID 和 CPU 信息。在用戶空間,我們通過 ring_buffer__poll 函數來讀取 Ringbuf 中的數據。
設計與實現
這種預留 / 提交機制允許多個生產者(無論是在不同的 CPU 上還是在同一 CPU / 同一 BPF 程序中)獨立地預留記錄並進行操作,而不會阻塞其他生產者。這意味着,如果一個 BPF 程序被另一個共享同一 Ringbuf 的 BPF 程序中斷,它們都會成功預留記錄(只要有足夠的空間),並可以獨立工作並提交。
Ringbuf 內部實現爲大小爲 2 的冪的循環緩衝區,使用兩個邏輯上不斷增加的計數器:
- 消費者計數器:表示消費者消費到的數據的邏輯位置;
- 生產者計數器:表示所有生產者保留的數據的數量。 每當一個記錄被預留時,"擁有" 該記錄的生產者會成功推進生產者計數器。此時,數據尚未準備好供消費。每個記錄都有一個 8 字節的頭部,其中包含保留記錄的長度,以及兩個附加位:忙碌位和丟棄位。
與替代方案的比較
實現 BPF Ring Buffer 之前,作者評估了內核中現有的替代方案,但發現它們無法滿足需求。現有方案主要分爲幾類:
- per-CPU 緩衝區(如 perf、ftrace 等),無法滿足排序和內存消耗的需求;
- 基於鏈表的實現,儘管有些是多生產者設計,但從用戶空間消費時可能非常複雜且性能較差;
- io_uring 是 SPSC,但也要求固定大小的元素。簡單地將 SPSC 隊列轉換爲 MPSC 會導致性能不足;
- 專用實現(例如新的 printk Ring Buffer),具有很多特定的限制,無法很好地適應 BPF 程序的需求。
通過引入 MPSC Ring Buffer,eBPF 的使用場景更加靈活,同時保持了高效的內存使用和事件順序。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/s18ncQ-wd52I6dAKa-KerQ