手把手帶你寫一箇中高級程序員必會的分佈式 RPC 框架

一. 概述

什麼是 RPC?

那爲什麼要有 RPC,HTTP 不好麼?

項目總體結構

整體架構

接下來,分別解釋上述的過程

二. 自定義註解

服務的提供者和消費者公用一個接口,@ServiceExpose 是爲了暴露服務,放在生產者的某個實現類上;@ServiceReference 是爲了引用服務,放在消費者的需要注入的屬性上。

二. 啓動配置

主要是加載一些 rpc 相關的配置類,使用 SpringBoot 自動裝配。可以使用 SPI 機制加入一些自定義的類,放到指定文件夾中。

三. rpc 接口注入 / rpc 服務掃描

這裏主要就是通過反射獲得對應註解的屬性 / 類,進行服務暴露 / 服務引用。這裏需要關注的是什麼時候進行服務暴露 / 引用?如下:

那麼怎麼知道 Spring IOC 刷新完成,這裏就使用一個 Spring 提供的監聽器,當 Spring IOC 刷新完成,就會觸發監聽器。

四. 服務註冊到 ZK / 從 Zk 獲得服務

Zookeeper 採用節點樹的數據模型,類似 linux 文件系統,/,/node1,/node2 比較簡單。不懂 Zookeeper 請移步:Zookeeper 原理

我們採用的是對每個服務名創建一個持久節點,服務註冊時實際上就是在 zookeeper 中該持久節點下創建了一個臨時節點,該臨時節點存儲了服務的 IP、端口、序列化方式等。

客戶端獲取服務時通過獲取持久節點下的臨時節點列表,解析服務地址數據:

客戶端監聽服務變化:

五. 生成代理類對象

這裏使用 JDK 的動態代理,也可以使用 cglib 或者 Javassist(dobbo 使用)。

public class ClientProxyFactory {
    /**
     * 獲取代理對象,綁定 invoke 行爲
     *
     * @param clazz 接口 class 對象
     * @param <T>   類型
     * @return 代理對象
     */public <T> T getProxyInstance(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            final Random random = new Random();

            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 第一步:通過服務發現機制選擇一個服務提供者暴露的服務
                String serviceName = clazz.getName();
                final List<ServiceInfo> serviceInfos = serviceDiscovery.listServices(serviceName);
                logger.info("Rpc server instance list: {}", serviceInfos);
                if (CollectionUtils.isEmpty(serviceInfos)) {
                    throw new RpcException("No rpc servers found.");
                }

                // TODO: 這裏模擬負載均衡,從多個服務提供者暴露的服務中隨機挑選一個,後期寫方法實現負載均衡
                final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));

                // 第二步:構造 rpc 請求對象
                final RpcRequest rpcRequest = new RpcRequest();
                rpcRequest.setServiceName(serviceName);
                rpcRequest.setMethod(method.getName());
                rpcRequest.setParameterTypes(method.getParameterTypes());
                rpcRequest.setParameters(args);

                // 第三步:編碼請求消息, TODO: 這裏可以配置多種編碼方式
                byte[] data = messageProtocol.marshallingReqMessage(rpcRequest);

                // 第四步:調用 rpc client 開始發送消息
                byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo);

                // 第五步:解碼響應消息
                final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse);

                // 第六步:解析返回結果進行處理
                if (rpcResponse.getException() != null) {
                    throw rpcResponse.getException();
                }
                return rpcResponse.getRetValue();
            }
        });
    }
}

六. 負載均衡

本實現支持兩種主要負載均衡策略,隨機和輪詢,其中他們都支持帶權重的隨機和輪詢,其實也就是四種策略。

七. Netty 通信

服務端和客戶端基本一樣,這裏只展示服務端的代碼。代理對象在 Spring 啓動的時候就生成了,但是沒有調用,每一個調用 (請求) 都會生成一個 Netty 的連接。

public class NettyRpcServer extends RpcServer {
   
    @Override
    public void start() {
        // 創建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 創建服務端的啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    // 設置兩個線程組
                    .group(bossGroup, workerGroup)
                    // 設置服務端通道實現類型
                    .channel(NioServerSocketChannel.class)
                    // 服務端用於接收進來的連接,也就是boosGroup線程, 線程隊列大小
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // child 通道,worker 線程處理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // 給 pipeline 管道設置自定義的處理器
                        @Override
                        public void initChannel(SocketChannel channel) {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            // 綁定端口號,同步啓動服務
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channel = channelFuture.channel();
            // 對關閉通道進行監聽,變爲同步
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("server error.", e);
        } finally {
            // 釋放線程組資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

實現具體 handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    //當通道就緒就會觸發該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //進行記錄
        logger.info("channel active: {}", ctx);
    }

    //讀取數據實際(這裏我們可以讀取客戶端發送的消息)
    @Override
    public void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
        //將數據讀到buffer中
        final ByteBuf msgBuf = (ByteBuf) msg;
        final byte[] reqBytes = new byte[msgBuf.readableBytes()];
        msgBuf.readBytes(reqBytes);
    }

    //數據讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //使用反射獲找到目標方法進行返回
        final byte[] respBytes = requestHandler.handleRequest(reqBytes);
        ctx.writeAndFlush(respBytes);
    }

    //處理異常, 一般是需要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

八. 序列化協議

對計算機網絡稍微有一點了解的同學都知道,數據在網絡中傳輸是二進制的:01010101010101010,類似這種,只有二進制數據才能在網絡中傳輸。但是在編碼之前我們一般先進行序列化,目的是爲了優化傳輸的數據量。因爲有的數據太大,需要進行空間優化。

那麼我們來區分一下序列化和編碼:我畫一張圖大家都全明白了

定義一個序列化協議,放入作爲一個 handler 放入 pipeline 中。

Netty 支持多種序列化,比如 jdk,Json,ProtoBuf 等,這裏使用 ProtoBuf,其序列化後碼流小性能高,非常適合 RPC 調用。接下來看怎麼使用 ProtoBuf?

引入 Protobuf 的依賴

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.4.1</version>
</dependency>

序列化:

/**
 * 調用對象構造好的Builder,完成屬性賦值和序列化操作
 * @return
 */
public static byte[] protobufSerializer(){
    AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder();
    builder.setId(1L);
    builder.setName("小豬");
    List<String> actions = new ArrayList<>();
    actions.add("eat");
    actions.add("run");
    builder.addAllActions(actions);
    return builder.build().toByteArray();
}

反序列化:

/**
 * 通過調用parseFrom則完成反序列化
 * @param bytes
 * @return
 * @throws InvalidProtocolBufferException
 */
public static Animal deserialize(byte[] bytes) throws Exception {
    AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes);
    Animal animal = new Animal();
    animal.setId(pAnimal.getId());
    animal.setName(pAnimal.getName());
    animal.setActions(pAnimal.getActionsList());
    return animal;
}

測試:

public static void main(String[] args) throws Exception {
    byte[] bytes = serializer();
    Animal animal = deserialize(bytes);
    System.out.println(animal);
}

以下看到是能正常序列化和反序列化的:

九. 通信協議

通信協議主要是解決網絡傳輸問題,比如 TCP 拆包粘包問題。

TCP 問題:

解決方案:業界的主流協議的解決方案可以歸納如下

這裏只是列舉出來編碼過程,解碼是逆過程。(說白了,編碼就是找着固定的格式進行寫入,解碼就是照着固定的格式讀)

恭喜你,已經學會寫 RPC 框架了,想深入瞭解的朋友可以參照源碼。進行學習,升級。

來源:https://www.cnblogs.com/monkey-xuan/p/15893604.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/r_zzTFS883EA3xe3eizc1g