深入理解 RPC 底層原理與設計實踐

在微服務系統當中,各個服務之間進行遠程調用的時候需要考慮各種各樣的場景,例如以下幾種異常情況:

等等......

國內也有一些有先見之明的技術專家們對於這些技術有了較早的認知,因此很早便開始了關於遠程服務調用中間件的開發。慢慢地,一些國內大廠自研的 RPC 調用框架開始變做了一款產品向市面上去進行推廣。

今年年初的時候,我花了大概一個半月的業餘時間自己打磨了一套 RPC 框架,通過實踐嘗試後發現,要想真正地落地一款給公司內部使用的 RPC 框架難度真的超乎想象。本文不會過多地去介紹市面上某一款中間件的底層源代碼是如何執行和編寫的,更多是通過結合一些中間件底層設計的原理來闡述 我自己是如何設計一款 RPC 框架的。

準備工作

爲了寫一款可用的 RPC 框架,我大概準備了這些技術工作:

RPC 的整體設計思想

起初在設計 RPC 遠程調用框架的時候,主要的設計思路是採用了經典的生產者 - 消費者思想。客戶端發送請求,服務端接收之後匹配本地已有的服務方法進行處理執行。

但是在實際到落地過程中卻發現,其中的技術複雜性遠遠超出預期~~

最終結果如下圖所示:

整個項目的包結構整理

客戶端調用:

服務端使用:

ps: 這裏面的每個 api 和設計思路大部分都是模仿了 Dubbo 框架內部的源代碼設計以及部分自己的改編。

本地代理的設計

爲了能夠保證遠程方法的調用使用起來和本地方法調用一樣簡單,通常可以使用代理模式去實現。場景的代理模式有好 2 大類:靜態代理和動態代理,靜態代理需要通過硬編碼的方式實現,不現實,這裏直接不合適。

動態代理主要有以下兩種:

Java 給出了動態代理,動態代理具有如下特點:

1.Proxy 對象不需要 implements 接口;
2.Proxy 對象的生成利用 JDK 的 Api,在 JVM 內存中動態的構建 Proxy 對象。需要使用 java.lang.reflect.Proxy 類的 newProxyInstance 接口

public static <T> T getProxy(final Class interfaceClass, ReferenceConfig referenceConfig) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                //每次執行目標方法的時候都會回調到這個invoke方法處
                return null;
            }
        });
    }

JDK 動態代理要求 target 對象是一個接口的實現對象,假如 target 對象只是一個單獨的對象,並沒有實現任何接口,這時候就會用到 Cglib 代理 (Code Generation Library),即通過構建一個子類對象,從而實現對 target 對象的代理,因此目標對象不能是 final 類 (報錯),且目標對象的方法不能是 final 或 static(不執行代理功能)。

 //給目標對象創建一個代理對象
    public Object getProxyInstance() {
        //工具類
        Enhancer en = new Enhancer();
        //設置父類
        en.setSuperclass(target.getClass());
        //設置回調函數
        en.setCallback(this);
        //創建子類代理對象
        return en.create();
    }
    public Object intercept(Object object, Method method, Object[] arg2, MethodProxy proxy) throws Throwable {
        System.out.println("before");
        Object obj = method.invoke(target);
        System.out.println("after");
        return obj;
    }

我最終選擇了 JDK 作爲基本的動態代理實現方案,一開始的技術選型並沒有選擇更加完美的方案,而是採用了最爲簡單熟悉的技術。

如果讀者感興趣的話,可以閱讀我之前介紹 aop 原理的文章,內部有詳細講解 cglib 底層原理的細節。點擊這跳轉

遠程調用的數據傳輸

本地代理設計好了之後,需要考慮如何將數據發送給到服務端的問題了。底層採用的是 netty 框架,爲了避免粘包和拆包的問題,我嘗試使用了 ObjectEncoderObjectDecoder 兩個 netty 內置的組件。

關於 netty 內部出現粘包,拆包現象的解決手段,可以細看這篇文章:

https://www.cnblogs.com/rickiyang/p/12904552.html

協議體的內部需要設計哪些字段?

大概整理了一下代碼,基本結構如下所示:

public class IettyProtocol implements Serializable {
    private static final long serialVersionUID = -7523782352702351753L;
    /**
     * 魔數
     */
    protected long MAGIC = 0;
    /**
     * 客戶端的請求id
     */
    private String requestId;
    /**
     * netty專屬
     */
    private ChannelHandlerContext channelHandlerContext;
    /**
     * 0請求 1響應
     * @see CommonConstants.ReqOrRespTypeEnum
     */
    protected byte reqOrResp = 0;
    /**
     * 0需要從服務端返回數據 1不需要從服務端響應數據
     */
    protected final byte way = 0;
    /**
     * 0是心跳時間,1不是心跳事件
     */
    private byte event = 0;
    /**
     * 序列化類型
     */
    private String serializationType;
    /**
     * 狀態
     */
    private short status;
    /**
     * 返回的數據類型格式
     */
    private Type type;
    /**
     * 消息體 請求方發送的函數類型,參數信息都存在這裏, 接收方響應的信息也都存在這裏
     */
    private byte[] body;
}

稍微解釋幾個字段:

requestId 客戶端的請求 id(用於請求響應做必配使用,下文中會介紹到)

reqOrResp 協議數據包的類型(標示該數據包是請求類型還是響應類型)

type 是指調用該方法的返回數據格式類型 (例如 int,String, 返回類型在做數據的序列化轉換的時候會非常有用)

body 這裏面是核心重點,主要的調用服務名稱,參數,方法等詳細信息都會先轉換爲字節數組,然後再通過網絡將其發送出去。

如何將不同格式的數據轉換爲字節數組

數字類型

將數字類型轉換爲二進制,在之前的一篇文章中我有寫過詳細的底層實現機制,核心是通過將數字的二進制數右移 8 位,然後存入 byte 數組當中。核心代碼爲:

   /**
     * 字節轉成數字 int 大小是4個字節
     *
     * @param bytes
     * @return
     */
    public static int byteToInt(byte[] bytes) {
        if (bytes.length != 4) {
            return 0;
        }
        return (bytes[0]) & 0xff | (bytes[1] << 8) & 0xff00 | (bytes[2] << 16) & 0xff0000 | (bytes[3] << 24) & 0xff000000;
    }
    /**
     * 數字轉成字節 int 大小是4個字節
     *
     * @param n
     * @return
     */
    public static byte[] intToByte(int n) {
        byte[] buf = new byte[4];
        for (int i = 0; i < buf.length; i++) {
            buf[i] = (byte) (n >> (8 * i));
        }
        return buf;
    }

字符串類型

將字符串轉換爲對應的字符數組,然後每個數組的 char 類型使用 asc 碼映射爲數字,接下來又是迴歸到數字轉換的思路上了。

集合,複雜對象類型

這些類型可以嘗試先通過 json 轉換爲字符串,然後再將字符串轉換爲 char 數組,再轉換爲數字數組類型,後還是要回歸到數字轉換的思路上。

數據接收與響應設計

早期在做 RPC 通訊設計的時候,採用的是簡單的生產者消費者模型。下邊給出早期自己在進行實現過程中所思考的一些點:

同步發送數據

這類同步發送設計案例來看,consumer 端發送數據之後,consumer 會一直處於等待狀態,只有等到數據抵達到 provider 端並且處理完畢之後,consumer 端纔會繼續進行下去。

這樣設計的弊端很明顯:

consumer 和 provider 的吞吐量都不高,而且一旦某個接口出現了超時還會影響其他接口的調用堵塞。

consumer 端異步發送,provider 端異步接收處理

這裏需要引入兩個新的概念,io 線程和業務線程。整體設計如下圖所示:

客戶端發送數據的時候,不再是處於等待的狀態,它會只需要將數據放入到一個本地的請求隊列中即可。客戶端的 io 線程會不斷地嘗試從隊列中取出數據,然後進行網絡發送。服務端也會專門有一個 io 線程負責接收這類數據,接着將數據放入到服務端的一個隊列緩衝中,然後再交給服務端的業務線程池去慢慢消費掉服務端的緩衝隊列內部的數據。

服務端的核心設計如下:

provider 端的數據處理完畢之後該如何正確返回?

爲了解決這個問題,我嘗試閱讀了一下 Dubbo 的底層源代碼,然後借鑑了其中的設計思路進行了一波實現。

客戶端如何接收響應

其核心的本質是客戶端在發送請求到時候會生成一個唯一的 requestId,然後客戶端在發送數據之後,會有一個 Map 集合(key 是 requestId,value 是接口響應值)管理接口響應到數據,客戶端的調用線程在執行了寫入數據到發送隊列之後需要不斷監聽 Map 集合中對應 requestId 的 value 是否有值,如果超過指定時間都沒有數據,那麼就拋出超時異常,如果收到了響應數據則正常返回即可。

服務端返回響應

服務端的本地代碼正常處理完數據之後要將數據寫入一個 Map 集合中,服務端的 io 線程會不斷輪訓這份 Map 集合(key 是客戶端發送過來的 requestId,value 是本地代碼處理完之後寫入的數據),如果發現對應的 requestId 有寫好的返回數據,就會將其發送給客戶端。

整體設計大概如下圖所示:

過濾器的設計

好了基本的調用鏈路大概是如同上邊的描述給梳理出來了。接下來就是一些擴展功能模塊了。

發送過程中需要做一些裝飾包裝,以及過濾的相關功能。此時就可以採用責任鏈的方式進行設計。

過濾器部分我大概分了兩種類型,一種是消費者使用的過濾器,一種是服務提供者專屬的過濾器。

過濾器部分的設計主要是用了責任鏈的模式實現,這塊比較簡單,不打算做過多介紹了。

延時任務的設計

在微服務調用的中間件中,延時任務是一種經常會使用到的設計,例如在超時重試,定時心跳發送,註冊中心發佈失敗重試等場景下。其核心的共同點都是在當前時間戳過後的指定時間點執行某個任務。這類設計我看了下 JDK 內部的 Timer 和 DelayedQueue 設計的原理。

常規的 JDK 的 java.util.Timer 和 DelayedQueue 等工具類,可實現簡單的定時任務,底層用的是數據結構,存取複雜度都是 O(nlog(n)),無法支撐海量定時任務。

而在定時任務量大、性能要求高的場景,爲將任務存取及取消操作時間複雜度降爲 O(1),會使用時間輪方案。

在自己實現 RPC 框架中,嘗試使用了時間輪的機制來實現心跳包發送部分。

什麼是時間輪

一種高效批量管理定時任務的調度模型。時間輪一般會實現成一個環形結構,類似一個時鐘,分爲很多槽,一個槽代表一個時間間隔,每個槽使用雙向鏈表存儲定時任務。指針週期性地跳動,跳動到一個槽位,就執行該槽位的定時任務。

Dubbo 的時間輪實現位於 dubbo-common 模塊的 org.apache.dubbo.common.timer 包中,如果感興趣的朋友可以深入閱讀下內部的源代碼設計與實現。

註冊中心的引入

爲了能夠保證服務發佈之後及時通知到各個服務的調用方,註冊中心的設計必不可少。除此之外,註冊中心的角色還能夠較好地協調各個微服務調用之間的一些配置參數,例如權重,分組,版本隔離等等屬性。

在自己進行實現落地的過程中,我選擇了 zookeeper 作爲默認的註冊中心。爲了方便後期的擴展,也是參考了 Dubbo 內部關於註冊中心的實現思路,通過一個 Registry 的接口抽象,隨機擴展了一些模版類等等。大概的設計如下圖所示:

整體的服務註冊接口代碼如下:

public interface RegistryService {
    /**
     * 註冊url
     *
     * 將dubbo服務寫入註冊中心節點
     * 當出現網絡抖動的時候需要進行適當的重試做法
     * 註冊服務url的時候需要寫入持久化文件中
     *
     * @param url
     */
    void register(URL url);
    /**
     * 服務下線
     *
     * 持久化節點是無法進行服務下線操作的
     * 下線的服務必須保證url是完整匹配的
     * 移除持久化文件中的一些內容信息
     *
     * @param url
     */
    void unRegister(URL url);
    /**
     * 消費方訂閱服務
     *
     * @param urlStr
     * @param providerServiceName
     */
    void subscribe(String urlStr,String providerServiceName);
    /**
     * 更新節點屬性之後通知這裏
     *
     * @param url
     */
    void doSubscribeAfterUpdate(URL url);
    /**
     * 新增節點之後通知這裏
     *
     * @param url
     */
    void doSubscribeAfterAdd(URL url);
    /**
     * 執行取消訂閱內部的邏輯
     *
     * @param url
     */
    void doUnSubscribe(URL url);
}

爲了預防註冊中心掛了之後,服務無法進行通信,每個通信節點都會將 zk 的服務註冊節點信息提前預先持久化到本地進行暫存一份數據,從而保證一個服務的可用性。

負載均衡策略的實現

在集羣進行調用的時候,不可避免會有負載均衡的問題,這塊的設計邏輯我參考了 Dubbo 的設計思路將其通過 spi 加載組件的方式進行框架的注入。

統一抽取了一個叫做 LoadBalance 的接口,然後底層實現了具體的負載均衡策略:

public class WeightLoadBalance implements LoadBalance {
    public static Map<String, URL[]> randomWeightMap = new ConcurrentHashMap<>();
    public static Map<String, Integer> lastIndexVisitMap = new ConcurrentHashMap<>();
    @Override
    public void doSelect(Invocation invocation) {
        URL[] weightArr = randomWeightMap.get(invocation.getServiceName());
        if (weightArr == null) {
            List<URL> urls = invocation.getUrls();
            Integer totalWeight = 0;
            for (URL url : urls) {
                //weight如果設置地過大,容易造成內存佔用過高情況發生,所以weight統一限制最大大小應該爲100
                Integer weight = Integer.valueOf(url.getParameters().get("weight"));
                totalWeight += weight;
            }
            weightArr = new URL[totalWeight];
            RandomList<URL> randomList = new RandomList(totalWeight);
            for (URL url : urls) {
                int weight = Integer.parseInt(url.getParameters().get("weight"));
                for (int i = 0; i < weight; i++) {
                    randomList.randomAdd(url);
                }
            }
            int len = randomList.getRandomList().size();
            for (int i = 0; i < len; i++) {
                URL url = randomList.getRandomList().get(i);
                weightArr[i] = url;
            }
            randomWeightMap.put(invocation.getServiceName(), weightArr);
        }
        Integer lastIndex = lastIndexVisitMap.get(invocation.getServiceName());
        if (lastIndex == null) {
            lastIndex = 0;
        }
        if (lastIndex >= weightArr.length) {
            lastIndex = 0;
        }
        URL referUrl = weightArr[lastIndex];
        lastIndex++;
        lastIndexVisitMap.put(invocation.getServiceName(), lastIndex);
        invocation.setReferUrl(referUrl);
    }
}

這裏面的負載均衡實現手段並不是實時計算的思路,而是提前隨機算好一組調用順序,然後每次請求的時候按照這個已經具備隨機性的數組進行挨個輪訓發送服務調用。

這樣可以避免每次請求過來都需要進行機器實時篩選計算的性能開銷。

SPI 擴展機制的設計

其實 Spi 的加載實現部分的關鍵就是將一份配置文件按照規定格式寫好,然後通過某個 loader 對象將配置文件內部的每個類都提前加載到一份 Map 中進行管理。

下邊我給出一份自己手寫的簡單案例,但是不包含自適應 spi 加載和 spi 內部自動依賴注入的功能。

public class ExtensionLoader {
    /**
     * 存儲擴展spi的map,key是spi文件裏面寫入的key
     */
    private static Map<String, Class<?>> extensionClassMap = new ConcurrentHashMap<>();
    private static final String EXTENSION_LOADER_DIR_PREFIX = "META-INF/ietty/";
    public static  Map<String, Class<?>> getExtensionClassMap(){
        return extensionClassMap;
    }
    public void loadDirectory(Class clazz) throws IOException {
        synchronized (ExtensionLoader.class){
            String fileName = EXTENSION_LOADER_DIR_PREFIX + clazz.getName();
            ClassLoader classLoader = this.getClass().getClassLoader();
            Enumeration<URL> enumeration = classLoader.getResources(fileName);
            while (enumeration.hasMoreElements()) {
                URL url = enumeration.nextElement();
                InputStreamReader inputStreamReader = new InputStreamReader(url.openStream(), "utf-8");
                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    if(line.startsWith("#")){
                        continue;
                    }
                    String[] keyClassInstance = line.split("=");
                    try {
                        extensionClassMap.put(keyClassInstance[0],Class.forName(keyClassInstance[1],true,classLoader));
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    public static <T>Object initClassInstance(String className) {
        if(extensionClassMap!=null && extensionClassMap.size()>0){
            try {
                return (T)extensionClassMap.get(className).newInstance();
            } catch (InstantiationException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
        return null;
    }
}

底層通信組件

整套 RPC 框架的底層部分是採用了 Netty 組件進行實現的,主要的寫法其實和通用的 netty 編程沒有太大的差別,這裏我簡單貼出下代碼截圖吧:

客戶端:

服務端:

小結

可能整篇文章寫下來,很多的技術細節點和實現方式因爲篇幅問題不能很好的展示出來。但是整體設計的幾個大難點以及難點的解決思路都基本貼出來了,希望能夠對你有一定的啓發。

整個基礎中間件寫下來之後感覺頭髮掉了不少,因爲底層的細節點實在是太多了,不管是結構設計,數據併發問題,異步處理設計等諸多都需要考慮,所以感覺這是一件非常具有綜合挑戰性的事情。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/GaQqg9Vh-PaZdcgKn_R7Hw