Netty、Kafka 中的零拷貝技術到底有多牛?

來源:rrd.me/ggFBd

零拷貝,從字面意思理解就是數據不需要來回的拷貝,大大提升了系統的性能。我們也經常在 Java NIO,Netty,Kafka,RocketMQ 等框架中聽到零拷貝,它經常作爲其提升性能的一大亮點;下面從 I/O 的幾個概念開始,進而再分析零拷貝。

I/O 概念

緩衝區

緩衝區是所有 I/O 的基礎,I/O 講的無非就是把數據移進或移出緩衝區;進程執行 I/O 操作,就是向操作系統發出請求,讓它要麼把緩衝區的數據排幹 (寫),要麼填充緩衝區(讀)。

下面看一個 Java 進程發起 Read 請求加載數據大致的流程圖:

進程發起 Read 請求之後,內核接收到 Read 請求之後,會先檢查內核空間中是否已經存在進程所需要的數據,如果已經存在,則直接把數據 Copy 給進程的緩衝區。

如果沒有內核隨即向磁盤控制器發出命令,要求從磁盤讀取數據,磁盤控制器把數據直接寫入內核 Read 緩衝區,這一步通過 DMA 完成。

接下來就是內核將數據 Copy 到進程的緩衝區;如果進程發起 Write 請求,同樣需要把用戶緩衝區裏面的數據 Copy 到內核的 Socket 緩衝區裏面,然後再通過 DMA 把數據 Copy 到網卡中,發送出去。

你可能覺得這樣挺浪費空間的,每次都需要把內核空間的數據拷貝到用戶空間中,所以零拷貝的出現就是爲了解決這種問題的。

關於零拷貝提供了兩種方式分別是:

虛擬內存

所有現代操作系統都使用虛擬內存,使用虛擬的地址取代物理地址,這樣做的好處是:

利用第一條特性可以把內核空間地址和用戶空間的虛擬地址映射到同一個物理地址,這樣 DMA 就可以填充對內核和用戶空間進程同時可見的緩衝區了。

大致如下圖所示:

省去了內核與用戶空間的往來拷貝,Java 也利用操作系統的此特性來提升性能,下面重點看看 Java 對零拷貝都有哪些支持。

mmap+write 方式

使用 mmap+write 方式代替原來的 read+write 方式,mmap 是一種內存映射文件的方法,即將一個文件或者其他對象映射到進程的地址空間,實現文件磁盤地址和進程虛擬地址空間中一段虛擬地址的一一對應關係。

這樣就可以省掉原來內核 Read 緩衝區 Copy 數據到用戶緩衝區,但是還是需要內核 Read 緩衝區將數據 Copy 到內核 Socket 緩衝區。

大致如下圖所示:

Sendfile 方式

Sendfile 系統調用在內核版本 2.1 中被引入,目的是簡化通過網絡在兩個通道之間進行的數據傳輸過程。

Sendfile 系統調用的引入,不僅減少了數據複製,還減少了上下文切換的次數,大致如下圖所示:

數據傳送只發生在內核空間,所以減少了一次上下文切換;但是還是存在一次 Copy,能不能把這一次 Copy 也省略掉?

Linux2.4 內核中做了改進,將 Kernel buffer 中對應的數據描述信息(內存地址,偏移量)記錄到相應的 Socket 緩衝區當中,這樣連內核空間中的一次 CPU Copy 也省掉了。

Java 零拷貝

MappedByteBuffer

Java NIO 提供的 FileChannel 提供了 map() 方法,該方法可以在一個打開的文件和 MappedByteBuffer 之間建立一個虛擬內存映射。

MappedByteBuffer 繼承於 ByteBuffer,類似於一個基於內存的緩衝區,只不過該對象的數據元素存儲在磁盤的一個文件中。

調用 get() 方法會從磁盤中獲取數據,此數據反映該文件當前的內容,調用 put() 方法會更新磁盤上的文件,並且對文件做的修改對其他閱讀者也是可見的。

下面看一個簡單的讀取實例,然後再對 MappedByteBuffer 進行分析:

public class MappedByteBufferTest {

    public static void main(String[] args) throws Exception {
        File file = new File("D://db.txt");
        long len = file.length();
        byte[] ds = new byte[(int) len];
        MappedByteBuffer mappedByteBuffer = new FileInputStream(file).getChannel().map(FileChannel.MapMode.READ_ONLY, 0,
                len);
        for (int offset = 0; offset < len; offset++) {
            byte b = mappedByteBuffer.get();
            ds[offset] = b;
        }
        Scanner scan = new Scanner(new ByteArrayInputStream(ds)).useDelimiter(" ");
        while (scan.hasNext()) {
            System.out.print(scan.next() + " ");
        }
    }
}

主要通過 FileChannel 提供的 map() 來實現映射,map() 方法如下:

    public abstract MappedByteBuffer map(MapMode mode,
                                         long position, long size)
        throws IOException;

分別提供了三個參數,MapMode,Position 和 Size,分別表示:

重點看一下 MapMode,前兩個分別表示只讀和可讀可寫,當然請求的映射模式受到 Filechannel 對象的訪問權限限制,如果在一個沒有讀權限的文件上啓用 READ_ONLY,將拋出 NonReadableChannelException。

PRIVATE 模式表示寫時拷貝的映射,意味着通過 put() 方法所做的任何修改都會導致產生一個私有的數據拷貝並且該拷貝中的數據只有 MappedByteBuffer 實例可以看到。

該過程不會對底層文件做任何修改,而且一旦緩衝區被施以垃圾收集動作(garbage collected),那些修改都會丟失。

大致瀏覽一下 map() 方法的源碼:

    public MappedByteBuffer map(MapMode mode, long position, long size)
        throws IOException
    {
            ...省略...
            int pagePosition = (int)(position % allocationGranularity);
            long mapPosition = position - pagePosition;
            long mapSize = size + pagePosition;
            try {
                // If no exception was thrown from map0, the address is valid
                addr = map0(imode, mapPosition, mapSize);
            } catch (OutOfMemoryError x) {
                // An OutOfMemoryError may indicate that we've exhausted memory
                // so force gc and re-attempt map
                System.gc();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException y) {
                    Thread.currentThread().interrupt();
                }
                try {
                    addr = map0(imode, mapPosition, mapSize);
                } catch (OutOfMemoryError y) {
                    // After a second OOME, fail
                    throw new IOException("Map failed", y);
                }
            }

            // On Windows, and potentially other platforms, we need an open
            // file descriptor for some mapping operations.
            FileDescriptor mfd;
            try {
                mfd = nd.duplicateForMapping(fd);
            } catch (IOException ioe) {
                unmap0(addr, mapSize);
                throw ioe;
            }

            assert (IOStatus.checkAll(addr));
            assert (addr % allocationGranularity == 0);
            int isize = (int)size;
            Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
            if ((!writable) || (imode == MAP_RO)) {
                return Util.newMappedByteBufferR(isize,
                                                 addr + pagePosition,
                                                 mfd,
                                                 um);
            } else {
                return Util.newMappedByteBuffer(isize,
                                                addr + pagePosition,
                                                mfd,
                                                um);
            }
     }

大致意思就是通過 Native 方法獲取內存映射的地址,如果失敗,手動 GC 再次映射。

最後通過內存映射的地址實例化出 MappedByteBuffer,MappedByteBuffer 本身是一個抽象類,其實這裏真正實例化出來的是 DirectByteBuffer。

DirectByteBuffer

DirectByteBuffer 繼承於 MappedByteBuffer,從名字就可以猜測出開闢了一段直接的內存,並不會佔用 JVM 的內存空間。

上一節中通過 Filechannel 映射出的 MappedByteBuffer 其實際也是 DirectByteBuffer,當然除了這種方式,也可以手動開闢一段空間:

ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(100);

如上開闢了 100 字節的直接內存空間。

Channel-to-Channel 傳輸

經常需要從一個位置將文件傳輸到另外一個位置,FileChannel 提供了 transferTo() 方法用來提高傳輸的效率,首先看一個簡單的實例:

public class ChannelTransfer {
    public static void main(String[] argv) throws Exception {
        String files[]=new String[1];
        files[0]="D://db.txt";
        catFiles(Channels.newChannel(System.out), files);
    }

    private static void catFiles(WritableByteChannel target, String[] files)
            throws Exception {
        for (int i = 0; i < files.length; i++) {
            FileInputStream fis = new FileInputStream(files[i]);
            FileChannel channel = fis.getChannel();
            channel.transferTo(0, channel.size(), target);
            channel.close();
            fis.close();
        }
    }
}

通過 FileChannel 的 transferTo() 方法將文件數據傳輸到 System.out 通道,接口定義如下:

    public abstract long transferTo(long position, long count,
                                    WritableByteChannel target)
        throws IOException;

幾個參數也比較好理解,分別是開始傳輸的位置,傳輸的字節數,以及目標通道;transferTo() 允許將一個通道交叉連接到另一個通道,而不需要一箇中間緩衝區來傳遞數據。

注:這裏不需要中間緩衝區有兩層意思:第一層不需要用戶空間緩衝區來拷貝內核緩衝區,另外一層兩個通道都有自己的內核緩衝區,兩個內核緩衝區也可以做到無需拷貝數據。

Netty 零拷貝

Netty 提供了零拷貝的 Buffer,在傳輸數據時,最終處理的數據會需要對單個傳輸的報文,進行組合和拆分,NIO 原生的 ByteBuffer 無法做到,Netty 通過提供的 Composite(組合)和 Slice(拆分)兩種 Buffer 來實現零拷貝。

看下面一張圖會比較清晰:

TCP 層 HTTP 報文被分成了兩個 ChannelBuffer,這兩個 Buffer 對我們上層的邏輯(HTTP 處理)是沒有意義的。

但是兩個 ChannelBuffer 被組合起來,就成爲了一個有意義的 HTTP 報文,這個報文對應的 ChannelBuffer,纔是能稱之爲 “Message” 的東西,這裏用到了一個詞“Virtual Buffer”。

可以看一下 Netty 提供的 CompositeChannelBuffer 源碼:

public class CompositeChannelBuffer extends AbstractChannelBuffer {

    private final ByteOrder order;
    private ChannelBuffer[] components;
    private int[] indices;
    private int lastAccessedComponentId;
    private final boolean gathering;

    public byte getByte(int index) {
        int componentId = componentId(index);
        return components[componentId].getByte(index - indices[componentId]);
    }
    ...省略...

Components 用來保存的就是所有接收到的 Buffer,Indices 記錄每個 buffer 的起始位置,lastAccessedComponentId 記錄上一次訪問的 ComponentId。

CompositeChannelBuffer 並不會開闢新的內存並直接複製所有 ChannelBuffer 內容,而是直接保存了所有 ChannelBuffer 的引用,並在子 ChannelBuffer 裏進行讀寫,實現了零拷貝。

其他零拷貝

RocketMQ 的消息採用順序寫到 commitlog 文件,然後利用 consume queue 文件作爲索引。

RocketMQ 採用零拷貝 mmap+write 的方式來回應 Consumer 的請求。

同樣 Kafka 中存在大量的網絡數據持久化到磁盤和磁盤文件通過網絡發送的過程,Kafka 使用了 Sendfile 零拷貝方式。

總結

零拷貝如果簡單用 Java 裏面對象的概率來理解的話,其實就是使用的都是對象的引用,每個引用對象的地方對其改變就都能改變此對象,永遠只存在一份對象。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VD_D_t3XLYNyJrxunS-7NA