自己動手從 0 開始實現一個分佈式 RPC 框架

前言

爲什麼要自己寫一個 RPC 框架,我覺得從個人成長上說,如果一個程序員能清楚的瞭解 RPC 框架所具備的要素,掌握 RPC 框架中涉及的服務註冊發現、負載均衡、序列化協議、RPC 通信協議、Socket 通信、異步調用、熔斷降級等技術,可以全方位的提升基本素質。雖然也有相關源碼,但是隻看源碼容易眼高手低,動手寫一個纔是自己真正掌握這門技術的最優路徑。

一  什麼是 RPC

RPC(Remote Procedure Call)遠程過程調用,簡言之就是像調用本地方法一樣調用遠程服務。目前外界使用較多的有 gRPC、Dubbo、Spring Cloud 等。相信大家對 RPC 的概念都已經很熟悉了,這裏不做過多介紹。

二  分佈式 RPC 框架要素

一款分佈式 RPC 框架離不開三個基本要素:

圍繞上面三個基本要素可以進一步擴展服務路由、負載均衡、服務熔斷降級、序列化協議、通信協議等等。

1  註冊中心

主要是用來完成服務註冊和發現的工作。雖然服務調用是服務消費方直接發向服務提供方的,但是現在服務都是集羣部署,服務的提供者數量也是動態變化的,所以服務的地址也就無法預先確定。因此如何發現這些服務就需要一個統一註冊中心來承載。

2  服務提供方(RPC 服務端)

其需要對外提供服務接口,它需要在應用啓動時連接註冊中心,將服務名及其服務元數據發往註冊中心。同時需要提供服務服務下線的機制。需要維護服務名和真正服務地址映射。服務端還需要啓動 Socket 服務監聽客戶端請求。

3  服務消費方(RPC 客戶端)

客戶端需要有從註冊中心獲取服務的基本能力,它需要在應用啓動時,掃描依賴的 RPC 服務,併爲其生成代理調用對象,同時從註冊中心拉取服務元數據存入本地緩存,然後發起監聽各服務的變動做到及時更新緩存。在發起服務調用時,通過代理調用對象,從本地緩存中獲取服務地址列表,然後選擇一種負載均衡策略篩選出一個目標地址發起調用。調用時會對請求數據進行序列化,並採用一種約定的通信協議進行 socket 通信。

三  技術選型

1  註冊中心

目前成熟的註冊中心有 Zookeeper,Nacos,Consul,Eureka,它們的主要比較如下:

本實現中支持了兩種註冊中心 Nacos 和 Zookeeper,可根據配置進行切換。

2  IO 通信框架 

本實現採用 Netty 作爲底層通信框架,Netty 是一個高性能事件驅動型的非阻塞的 IO(NIO) 框架。

3  通信協議

TCP 通信過程中會根據 TCP 緩衝區的實際情況進行包的劃分,所以在業務上認爲一個完整的包可能會被 TCP 拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的 TCP 粘包和拆包問題。所以需要對發送的數據包封裝到一種通信協議裏。

業界的主流協議的解決方案可以歸納如下:

  1. 消息定長,例如每個報文的大小爲固定長度 100 字節,如果不夠用空格補足。

  2. 在包尾特殊結束符進行分割。

  3. 將消息分爲消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段。

很明顯 1,2 都有些侷限性,本實現採用方案 3,具體協議設計如下:

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ 
|  BYTE  |        |        |        |        |        |        |             ........ 
+--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ 
|  magic | version|  type  |           content lenth           |                   content byte[]                                        |        | 
+--------+-----------------------------------------------------------------------------------------+--------------------------------------------+

4  序列化協議

本實現支持 3 種序列化協議,JavaSerializer、Protobuf 及 Hessian 可以根據配置靈活選擇。建議選用 Protobuf,其序列化後碼流小性能高,非常適合 RPC 調用,Google 自家的 gRPC 也是用其作爲通信協議。

5  負載均衡

本實現支持兩種主要負載均衡策略,隨機和輪詢,其中他們都支持帶權重的隨機和輪詢,其實也就是四種策略。

四  整體架構

五  實現

項目總體結構:

1  服務註冊發現

Zookeeper

Zookeeper 採用節點樹的數據模型,類似 linux 文件系統,/,/node1,/node2 比較簡單。

Zookeeper 節點類型是 Zookeeper 實現很多功能的核心原理,分爲持久節點臨時節點、順序節點三種類型的節點。

我們採用的是對每個服務名創建一個持久節點,服務註冊時實際上就是在 zookeeper 中該持久節點下創建了一個臨時節點,該臨時節點存儲了服務的 IP、端口、序列化方式等。

客戶端獲取服務時通過獲取持久節點下的臨時節點列表,解析服務地址數據:

客戶端監聽服務變化:

Nacos

Nacos 是阿里開源的微服務管理中間件,用來完成服務之間的註冊發現和配置中心,相當於 Spring Cloud 的 Eureka+Config。

不像 Zookeeper 需要利用提供的創建節點特性來實現註冊發現,Nacos 專門提供了註冊發現功能,所以其使用更加方便簡單。主要關注 NamingService 接口提供的三個方法 registerInstance、getAllInstances、subscribe;registerInstance 用來完成服務端服務註冊,getAllInstances 用來完成客戶端服務獲取,subscribe 用來完成客戶端服務變動監聽,這裏就不多做介紹,具體可參照實現源碼。

2  服務提供方 Serivce Provider

在自動配置類 OrcRpcAutoConfiguration 完成註冊中心和 RPC 啓動類(RpcBootStarter)的初始化:

服務端的啓動流程如下:

RPC 啓動(RpcBootStarter):

上面監聽 Spring 容器初始化事件時注意,由於 Spring 包含多個容器,如 web 容器和核心容器,他們還有父子關係,爲了避免重複執行註冊,只處理頂層的容器即可。

3  服務消費方 Servce Consumer

服務消費方需要在應用啓動完成前爲依賴的服務創建好代理對象,這裏有很多種方法,常見的有兩種:

本實現也採用第二種方式,處理流程如下:

BeanFactoryPostProcessor 的主要實現:

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
        throws BeansException {
        this.beanFactory = beanFactory;
        postProcessRpcConsumerBeanFactory(beanFactory, (BeanDefinitionRegistry)beanFactory);
    }
    private void postProcessRpcConsumerBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry beanDefinitionRegistry) {
        String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();
        int len = beanDefinitionNames.length;
        for (int i = 0; i < len; i++) {
            String beanDefinitionName = beanDefinitionNames[i];
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (beanClassName != null) {
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, classLoader);
                ReflectionUtils.doWithFields(clazz, new FieldCallback() {
                    @Override
                    public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                        parseField(field);
                    }
                });
            }
        }
        Iterator<Entry<String, BeanDefinition>> it = beanDefinitions.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BeanDefinition> entry = it.next();
            if (context.containsBean(entry.getKey())) {
                throw new IllegalArgumentException("Spring context already has a bean named " + entry.getKey());
            }
            beanDefinitionRegistry.registerBeanDefinition(entry.getKey(), entry.getValue());
            log.info("register OrcRpcConsumerBean definition: {}", entry.getKey());
        }
    }
    private void parseField(Field field) {
        // 獲取所有OrcRpcConsumer註解
        OrcRpcConsumer orcRpcConsumer = field.getAnnotation(OrcRpcConsumer.class);
        if (orcRpcConsumer != null) {
            // 使用field的類型和OrcRpcConsumer註解一起生成BeanDefinition
            OrcRpcConsumerBeanDefinitionBuilder beanDefinitionBuilder = new OrcRpcConsumerBeanDefinitionBuilder(field.getType(), orcRpcConsumer);
            BeanDefinition beanDefinition = beanDefinitionBuilder.build();
            beanDefinitions.put(field.getName(), beanDefinition);
        }
    }

ProxyFactory 的主要實現:

public class JdkProxyFactory implements ProxyFactory{
    @Override
    public Object getProxy(ServiceMetadata serviceMetadata) {
        return Proxy
            .newProxyInstance(serviceMetadata.getClazz().getClassLoader(), new Class[] {serviceMetadata.getClazz()},
                new ClientInvocationHandler(serviceMetadata));
    }
    private class ClientInvocationHandler implements InvocationHandler {
        private ServiceMetadata serviceMetadata;
        public ClientInvocationHandler(ServiceMetadata serviceMetadata) {
            this.serviceMetadata = serviceMetadata;
        }
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String serviceId = ServiceUtils.getServiceId(serviceMetadata);
            // 通過負載均衡器選取一個服務提供方地址
            ServiceURL service = InvocationServiceSelector.select(serviceMetadata);
            OrcRpcRequest request = new OrcRpcRequest();
            request.setMethod(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setParameters(args);
            request.setRequestId(UUID.randomUUID().toString());
            request.setServiceId(serviceId);
            OrcRpcResponse response = InvocationClientContainer.getInvocationClient(service.getServerNet()).invoke(request, service);
            if (response.getStatus() == RpcStatusEnum.SUCCESS) {
                return response.getData();
            } else if (response.getException() != null) {
                throw new OrcRpcException(response.getException().getMessage());
            } else {
                throw new OrcRpcException(response.getStatus().name());
            }
        }
    }
}

本實現只使用 JDK 動態代理,也可以使用 cglib 或 Javassist 實現以獲得更好的性能,JdkProxyFactory 中。

4  IO 模塊

UML 圖如下:

結構比較清晰,分三大模塊:客戶端調用適配模塊、服務端請求響應適配模塊和 Netty IO 服務模塊。

客戶端調用適配模塊

此模塊比較簡單,主要是爲客戶端調用時建立服務端接,並將連接存入緩存,避免後續同服務調用重複建立連接,連接建立成功後發起調用。下面是 DefaultInvocationClient 的實現:

服務端請求響應適配模塊

服務請求響應模塊也比較簡單,是根據請求中的服務名,從緩存中獲取服務元數據,然後從請求中獲取調用的方法和參數類型信息,反射獲取調用方法信息。然後從 spring context 中獲取 bean 進行反射調用。

Netty IO 服務模塊

Netty IO 服務模塊是核心,稍複雜一些,客戶端和服務端主要處理流程如下:

其中,重點是這四個類的實現:NettyNetClient、NettyNetServer、NettyClientChannelRequestHandler 和 NettyServerChannelRequestHandler,上面的 UML 圖和下面流程圖基本上講清楚了它們的關係和一次請求的處理流程,這裏就不再展開了。

下面重點講一下編碼解碼器。

在技術選型章節中,提及了採用的通信協議,定義了私有的 RPC 協議:

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ 
|  BYTE  |        |        |        |        |        |        |             ........ 
+--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ 
|  magic | version|  type  |           content lenth           |                   content byte[]                                        |        | 
+--------+-----------------------------------------------------------------------------------------+--------------------------------------------+

編碼器的實現如下:

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMsg protocolMsg, ByteBuf byteBuf)
    throws Exception {
    // 寫入協議頭
    byteBuf.writeByte(ProtocolConstant.MAGIC);
    // 寫入版本
    byteBuf.writeByte(ProtocolConstant.DEFAULT_VERSION);
    // 寫入請求類型
    byteBuf.writeByte(protocolMsg.getMsgType());
    // 寫入消息長度
    byteBuf.writeInt(protocolMsg.getContent().length);
    // 寫入消息內容
    byteBuf.writeBytes(protocolMsg.getContent());
}

解碼器的實現如下:

/**
 * 協議開始的標誌 magic + version + type + length 佔據7個字節
 */
public final int BASE_LENGTH = 7;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
    throws Exception {
    // 可讀字節小於基本長度,無法解析出payload長度,返回
    if (byteBuf.readableBytes() < BASE_LENGTH) {
        return;
    }
    // 記錄包頭開始的index
    int beginIndex;
    while (true) {
        // 記錄包頭開始的index
        beginIndex = byteBuf.readerIndex();
        // 標記包頭開始的index
        byteBuf.markReaderIndex();
        // 讀到了協議頭魔數,結束循環
        if (byteBuf.readByte() == ProtocolConstant.MAGIC) {
            break;
        }
        // 未讀到包頭,略過一個字節
        // 每次略過一個字節,去讀取包頭信息的開始標記
        byteBuf.resetReaderIndex();
        byteBuf.readByte();
        /**
         * 當略過,一個字節之後,數據包的長度,又變得不滿足
         * 此時結束。等待後面的數據到達
         */
        if (byteBuf.readableBytes() < BASE_LENGTH) {
            return;
        }
    }
    // 讀取版本號
    byte version = byteBuf.readByte();
    // 讀取消息類型
    byte type = byteBuf.readByte();
    // 讀取消息長度
    int length = byteBuf.readInt();
    // 判斷本包是否完整
    if (byteBuf.readableBytes() < length) {
        // 還原讀指針
        byteBuf.readerIndex(beginIndex);
        return;
    }
    byte[] data = new byte[length];
    byteBuf.readBytes(data);
    ProtocolMsg msg = new ProtocolMsg();
    msg.setMsgType(type);
    msg.setContent(data);
    list.add(msg);
}

六  測試

在本人 MacBook Pro 13 寸,4 核 I5,16g 內存,使用 Nacos 註冊中心,啓動一個服務器,一個客戶端情況下,採用輪詢負載均衡策略的情況下,使用 Apache ab 測試。

在啓用 8 個線程發起 10000 個請求的情況下,可以做到 18 秒完成所有請求,qps550:

在啓用 100 個線程發起 10000 個請求的情況下,可以做到 13.8 秒完成所有請求,qps724:

七  總結

在實現這個 RPC 框架的過程中,我也重新學習了很多知識,比如通信協議、IO 框架等。也橫向學習了當前最熱的 gRPC,藉此又看了很多相關的源碼,收穫很大。後續我也會繼續維護升級這個框架,比如引入熔斷降級等機制,做到持續學習持續進步。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/cTDdIUU773kvxfabxTJQGQ