淺談緩衝的理論與實踐

深入理解緩衝的本質

緩衝(Buffer)通過對數據進行暫存,然後批量進行傳輸或者操作,多采用順序方式,來緩解不同設備之間次數頻繁但速度緩慢的隨機讀寫。

你可以把緩衝區,想象成一個蓄水池。放水的水龍頭一直開着,如果池子裏有水,它就以恆定的速度流淌,不需要暫停;供水的水龍頭速度卻不確定,有時候會快一些,有時候會特別慢。它通過判斷水池裏水的狀態,就可以自由控制進水的速度。

或者再想象一下包餃子的過程,包餡的需要等着擀皮的。如果擀皮的每擀一個就交給包餡的,速度就會很慢;但如果中間放一個盆子,擀皮的只管往裏扔,包餡的只管從盆裏取,這個過程就快得多。許多工廠流水線也經常使用這種方法,可見 “緩衝” 這個理念的普及性和實用性。

從宏觀上來說,JVM 的堆就是一個大的緩衝區,代碼不停地在堆空間中生產對象,而垃圾回收器進程則在背後默默地進行垃圾回收。

通過上述比喻和釋意,你可以發現緩衝區的好處

Java 語言廣泛應用了緩衝,在 IDEA 中搜索 Buffer,可以看到長長的類列表,其中最典型的就是文件讀取和寫入字符流。

文件讀寫流

接下來,本文將以文件讀取和寫入字符流爲例進行講解。

Java 的 I/O 流設計,採用的是裝飾器模式,當需要給類添加新的功能時,就可以將被裝飾者通過參數傳遞到裝飾者,封裝成新的功能方法。下圖是裝飾器模式的典型示意圖,就增加功能來說,裝飾模式比生成子類更爲靈活。

在讀取和寫入流的 API 中,BufferedInputStream 和 BufferedReader 可以加快讀取字符的速度,BufferedOutputStream 和 BufferedWriter 可以加快寫入的速度。

下面是直接讀取文件的代碼實現:

int result = 0;
try(Reader reader=new FileReader(FILE_PATH)){
    int value;
    while((value=reader.read())!=-1){
        result+=value;
    }
}
return result;

要使用緩衝方式讀取,只需要將 FileReader 裝飾一下即可:

int result = 0;
try (Reader reader = new BufferedReader(new FileReader(FILE_PATH))) {
    int value;
    while ((value = reader.read()) != -1) {
        result += value;
    }
}
return result;

我們先看一下與之類似的,BufferedInputStream 類的具體實現方法:

//代碼來自JDK 
public synchronized int read () throws IOException {
    if (pos >= count) {
        fill();
        if (pos >= count)
            return -1;
    }
    return getBufIfOpen()[pos++] & 0xff;
}

當緩衝區的內容讀取完畢,將嘗試使用 fill 函數把輸入流讀入緩衝區:

//代碼來自JDK 
private void fill() throws IOException {
    byte[] buffer = getBufIfOpen();
    if (markpos < 0)
        pos = 0;            /* no mark: throw away the buffer */
    else if (pos >= buffer.length)  /* no room left in buffer */
        if (markpos > 0) {  /* can throw away early part of the buffer */
            int sz = pos - markpos;
            System.arraycopy(buffer, markpos, buffer, 0, sz);
            pos = sz;
            markpos = 0;
        } else if (buffer.length >= marklimit) {
            markpos = -1;   /* buffer got too big, invalidate mark */
            pos = 0;        /* drop buffer contents */
        } else if (buffer.length >= MAX_BUFFER_SIZE) {
            throw new OutOfMemoryError("Required array size too large");
        } else {            /* grow buffer */
            int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
                    pos * 2 : MAX_BUFFER_SIZE;
            if (nsz > marklimit)
                nsz = marklimit;
            byte[] nbuf = new byte[nsz];
            System.arraycopy(buffer, 0, nbuf, 0, pos);
            if (!U.compareAndSetObject(this, BUF_OFFSET, buffer, nbuf)) {
                // Can't replace buf if there was an async close.
                // Note: This would need to be changed if fill()
                // is ever made accessible to multiple threads.
                // But for now, the only way CAS can fail is via close.
                // assert buf == null;
                throw new IOException("Stream closed");
            }
            buffer = nbuf;
        }
    count = pos;
    int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    if (n > 0)
        count = n + pos;
}

程序會調整一些讀取的位置,並對緩衝區進行位置更新,然後使用被裝飾的 InputStream 進行數據讀取:

int n = getInIfOpen().read(buffer, pos, buffer.length - pos);

那麼爲什麼要這麼做呢?直接讀寫不行嗎?

這是因爲:字符流操作的對象,一般是文件或者 Socket,要從這些緩慢的設備中,通過頻繁的交互獲取數據,效率非常慢;而緩衝區的數據是保存在內存中的,能夠顯著地提升讀寫速度。

既然好處那麼多,爲什麼不把所有的數據全部讀到緩衝區呢?

這就是一個權衡的問題,緩衝區開得太大,會增加單次讀寫的時間,同時內存價格很高,不能無限制使用,緩衝流的默認緩衝區大小是 8192 字節,也就是 8KB,算是一個比較折中的值。

這好比搬磚,如果一塊一塊搬,時間便都耗費在往返路上了;但若給你一個小推車,往返的次數便會大大降低,效率自然會有所提升。

下圖是使用 FileReader 和 BufferedReader 讀取文件的 JMH 對比,可以看到,使用了緩衝,讀取效率有了很大的提升(暫未考慮系統文件緩存)。

package cn.wja;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.Reader;
import java.util.concurrent.TimeUnit;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
public class BenchmarkReader {
    private static final String FILE_PATH = "F:\\農民工老王\\servertools.zip";
    @Benchmark
    public int bufferedReaderTest() throws Exception {
        int result = 0;
        try (Reader reader = new BufferedReader(new FileReader(FILE_PATH))) {
            int value;
            while ((value = reader.read()) != -1) {
                result += value;
            }
        }
        return result;
    }
    @Benchmark
    public int fileReadTest() throws Exception {
        int result = 0;
        try (Reader reader = new FileReader(FILE_PATH)) {
            int value;
            while ((value = reader.read()) != -1) {
                result += value;
            }
        }
        return result;
    }
    public static void main(String[] args) throws Exception {
        Options opts = new OptionsBuilder().include(BenchmarkReader.class.getSimpleName()).build();
        new Runner(opts).run();
    }
}

日誌緩衝

日誌是程序員們最常打交道的地方。在高併發應用中,即使對日誌進行了採樣,日誌數量依舊驚人,所以選擇高速的日誌組件至關重要。

SLF4J 是 Java 裏標準的日誌記錄庫,它是一個允許你使用任何 Java 日誌記錄庫的抽象適配層,最常用的實現是 Logback,支持修改後自動 reload,它比 Java 自帶的 JUL 還要流行。

Logback 性能也很高,其中一個原因就是異步日誌,它在記錄日誌時,使用了一個緩衝隊列,當緩衝的內容達到一定的閾值時,纔會把緩衝區的內容寫到文件裏。使用異步日誌有兩個考慮:

Logback 的異步日誌也比較好配置,我們需要在正常配置的基礎上,包裝一層異步輸出的邏輯

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property />
    <property 
              value="%d{yyyy-MM-dd  HH:mm:ss.SSS} %-5level [%10.10thread] [%X{X-B3-TraceId}] %logger{20} - %msg%n"/>
    <appender >
        <file>${LOG_HOME}/test.log</file>
        <rollingPolicy>
            <fileNamePattern>${LOG_HOME}/backup/log.log.%d{yyyy-MM-dd}</fileNamePattern>
            <maxHistory>100</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${ENCODER_PATTERN}</pattern>
        </encoder>
    </appender>
    <appender >
        <file>${LOG_HOME}/test2.log</file>
        <rollingPolicy>
            <fileNamePattern>${LOG_HOME}/backup/log2.log.%d{yyyy-MM-dd}</fileNamePattern>
            <maxHistory>100</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${ENCODER_PATTERN}</pattern>
        </encoder>
    </appender>
    <appender >
        <discardingThreshold>0</discardingThreshold>
        <queueSize>512</queueSize>
        <appender-ref ref="FILE"/>
    </appender>
    <appender >
        <layout>
            <pattern>${ENCODER_PATTERN}</pattern>
        </layout>
    </appender>
    <logger >
        <appender-ref ref="ASYNC"/>
    </logger>
    <logger >
        <appender-ref ref="FILE2"/>
    </logger>
</configuration>

如下圖,異步日誌輸出之後,日誌信息將暫存在 ArrayBlockingQueue 列表中,後臺會有一個 Worker 線程不斷地獲取緩衝區內容,然後寫入磁盤中。

上圖中有三個關鍵參數:

緩衝區優化思路

毫無疑問緩衝區是可以提高性能的,但它通常會引入一個異步的問題,使得編程模型變複雜。

通過文件讀寫流和 Logback 兩個例子,我們來看一下對於緩衝區設計的一些常規操作。

如下圖所示,資源 A 讀取或寫入一些操作到資源 B,這本是一個正常的操作流程,但由於中間插入了一個額外的存儲層,所以這個流程被生生截斷了,這時就需要你手動處理被截斷兩方的資源協調問題。

根據資源的不同,對正常業務進行截斷後的操作,分爲同步操作和異步操作。

同步操作

同步操作的編程模型相對簡單,在一個線程中就可完成,你只需要控制緩衝區的大小,並把握處理的時機。比如,緩衝區大小達到閾值,或者緩衝區的元素在緩衝區的停留時間超時,這時就會觸發批量操作。

由於所有的操作又都在單線程,或者同步方法塊中完成,再加上資源 B 的處理能力有限,那麼很多操作就會阻塞並等待在調用線程上。比如寫文件時,需要等待前面的數據寫入完畢,才能處理後面的請求。

異步操作

異步操作就複雜很多。

緩衝區的生產者一般是同步調用,但也可以採用異步方式進行填充,一旦採用異步操作,就涉及緩衝區滿了以後,生產者的一些響應策略。

此時,應該將這些策略抽象出來,根據業務的屬性選擇,比如直接拋棄、拋出異常,或者直接在用戶的線程進行等待。你會發現它與線程池的飽和策略是類似的,這部分的詳細概念將在後續的文章中講解。

許多應用系統還會有更復雜的策略,比如在用戶線程等待,設置一個超時時間,以及成功進入緩衝區之後的回調函數等。

對緩衝區的消費,一般採用開啓線程的方式,如果有多個線程消費緩衝區,還會存在信息同步和順序問題。

Kafka 緩衝區示例

這裏以一個常見的面試題來講解上面的知識點:Kafka 的生產者,有可能會丟數據嗎?

如圖,要想解答這個問題,需要先了解 Kafka 對生產者的一些封裝,其中有一個對性能影響非常大的點,就是緩衝。

生產者會把發送到同一個 partition 的多條消息,封裝在一個 batch(緩衝區)中。當 batch 滿了(參數 batch.size),或者消息達到了超時時間(參數 linger.ms),緩衝區中的消息就會被髮送到 broker 上。

這個緩衝區默認是 16KB,如果生產者的業務突然斷電,這 16KB 數據是沒有機會發送出去的。此時,就造成了消息丟失。

解決的辦法有兩種:

另外一個面試的問題是:Kafka 生產者會影響業務的高可用嗎?

這同樣和生產者的緩衝區有關。緩衝區大小畢竟是有限制的,如果消息產生得過快,或者生產者與 broker 節點之間有網絡問題,緩衝區就會一直處於 full 的狀態。此時,有新的消息到達,會如何處理呢?

通過配置生產者的超時參數和重試次數,可以讓新的消息一直阻塞在業務方。一般來說,這個超時值設置成 1 秒就已經夠大了,有的應用在線上把超時參數配置得非常大,比如 1 分鐘,就造成了用戶的線程迅速佔滿,整個業務不能再接受新的請求。

緩衝區的其他案例

使用緩衝區來提升性能的做法非常多,下面再舉幾個例子:

緩衝區的注意事項

雖然緩衝區可以幫我們大大地提高應用程序的性能,但同時它也有不少問題,在我們設計時,要注意這些異常情況。

其中,比較嚴重就是緩衝區內容的丟失。即使你使用 addShutdownHook 做了優雅關閉,有些情形依舊難以防範避免,比如機器突然間斷電,應用程序進程突然死亡等。這時,緩衝區內未處理完的信息便會丟失,尤其金融信息,電商訂單信息的丟失都是比較嚴重的。

所以,內容寫入緩衝區之前,需要先預寫日誌,故障後重啓時,就會根據這些日誌進行數據恢復。在數據庫領域,文件緩衝的場景非常多,一般都是採用 WAL 日誌(Write-Ahead Logging)解決。對數據完整性比較嚴格的系統,甚至會通過電池或者 UPS 來保證緩衝區的落地。這就是性能優化帶來的新問題,必須要解決。

小結

可以看到,緩衝區優化是對正常的業務流程進行截斷,然後加入緩衝組件的一個操作,它分爲同步和異步方式,其中異步方式的實現難度相對更高。

大多數組件,從操作系統到數據庫,從 Java 的 API 到一些中間件,都可以通過設置一些參數,來控制緩衝區大小,從而取得較大的性能提升。但需要注意的是,某些極端場景(斷電、異常退出、kill -9 等)可能會造成數據丟失,若你的業務對此容忍度較低,那麼你需要花更多精力來應對這些異常。

作者:農民工老王

來源:blog.csdn.net/monarch91/article/details/123688424

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/gbi2ZQEQ6knPgsDsl8_mdg