手把手帶你寫一箇中高級程序員必會的分佈式 RPC 框架
◆ 一. 概述
什麼是 RPC?
-
遠程服務調用
-
官方:一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的思想
-
通俗一點:客戶端在不知道調用細節的情況下,調用存在於遠程計算機上的某個對象,就像調用本地應用程序中的對象一樣。
-
市面上常見的 rpc 框架:dobbo,springCloud,gRPC...
那爲什麼要有 RPC,HTTP 不好麼?
-
因爲 RPC 和 HTTP 就不是一個層級的東西,所以嚴格意義上這兩個沒有可比性,也不應該來作比較。
-
HTTP 只是傳輸協議,協議只是規範了一定的交流格式
-
RPC 對比的是本地過程調用,是用來作爲分佈式系統之間的通信,它可以用 HTTP 來傳輸,也可以基於 TCP 自定義協議傳輸。
-
HTTP 協議比較冗餘,所以 RPC 大多都是基於 TCP 自定義協議,定製化的纔是最適合自己的。
項目總體結構
整體架構
接下來,分別解釋上述的過程
◆二. 自定義註解
服務的提供者和消費者公用一個接口,@ServiceExpose 是爲了暴露服務,放在生產者的某個實現類上;@ServiceReference 是爲了引用服務,放在消費者的需要注入的屬性上。
-
Target:指定被修飾的 Annotation 可以放置的位置 (被修飾的目標)
-
@Target(ElementType.TYPE) // 接口、類
-
@Target(ElementType.FIELD) // 屬性
-
@Target(ElementType.METHOD) // 方法
-
@Target(ElementType.PARAMETER) // 方法參數
-
@Target(ElementType.CONSTRUCTOR) // 構造函數
-
@Target(ElementType.LOCAL_VARIABLE) // 局部變量
-
@Target(ElementType.ANNOTATION_TYPE) // 註解
-
@Target(ElementType.PACKAGE) // 包
-
Retention:定義註解的保留策略
-
@Retention(RetentionPolicy.SOURCE) // 註解僅存在於源碼中,在 class 字節碼文件中不包含
-
@Retention(RetentionPolicy.CLASS) // 默認的保留策略,註解會在 class 字節碼文件中存在,但運行時無法獲得
-
@Retention(RetentionPolicy.RUNTIME) // 註解會在 class 字節碼文件中存在,在運行時可以通過反射獲取到
-
Documented:指定被修飾的該 Annotation 可以被 javadoc 工具提取成文檔
-
Inherited:指定被修飾的 Annotation 將具有繼承性
◆二. 啓動配置
主要是加載一些 rpc 相關的配置類,使用 SpringBoot 自動裝配。可以使用 SPI 機制加入一些自定義的類,放到指定文件夾中。
◆三. rpc 接口注入 / rpc 服務掃描
這裏主要就是通過反射獲得對應註解的屬性 / 類,進行服務暴露 / 服務引用。這裏需要關注的是什麼時候進行服務暴露 / 引用?如下:
-
客戶端:一般有倆種方案
-
餓漢式:餓漢式是通過實現 Spring 的 InitializingBean 接口中的 afterPropertiesSet 方法,容器通過調用 ReferenceBean 的 afterPropertiesSet 方法時引入服務。(在 Spring 啓動時,給所有的屬性注入實現類,包含遠程和本地的實現類) 懶漢式:只有當這個服務被注入到其他類中時啓動引入流程,也就是說用到了纔會開始服務引入。在應用的 Spring IOC 容器刷新完畢 (spring Context 初始化) 之後,掃描所有的 Bean,將 Bean 中帶有 @ServiceExpose/@ServiceReference 註解的 field 獲取到,然後創建 field 類型的代理對象,創建完成後,將代理對象 set 給此 field。後續就通過該代理對象創建服務端連接,併發起調用。(dubbo 默認)
-
服務端:與懶漢式一樣。
那麼怎麼知道 Spring IOC 刷新完成,這裏就使用一個 Spring 提供的監聽器,當 Spring IOC 刷新完成,就會觸發監聽器。
◆四. 服務註冊到 ZK / 從 Zk 獲得服務
Zookeeper 採用節點樹的數據模型,類似 linux 文件系統,/,/node1,/node2 比較簡單。不懂 Zookeeper 請移步:Zookeeper 原理
我們採用的是對每個服務名創建一個持久節點,服務註冊時實際上就是在 zookeeper 中該持久節點下創建了一個臨時節點,該臨時節點存儲了服務的 IP、端口、序列化方式等。
客戶端獲取服務時通過獲取持久節點下的臨時節點列表,解析服務地址數據:
客戶端監聽服務變化:
◆五. 生成代理類對象
這裏使用 JDK 的動態代理,也可以使用 cglib 或者 Javassist(dobbo 使用)。
public class ClientProxyFactory {
/**
* 獲取代理對象,綁定 invoke 行爲
*
* @param clazz 接口 class 對象
* @param <T> 類型
* @return 代理對象
*/public <T> T getProxyInstance(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
final Random random = new Random();
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 第一步:通過服務發現機制選擇一個服務提供者暴露的服務
String serviceName = clazz.getName();
final List<ServiceInfo> serviceInfos = serviceDiscovery.listServices(serviceName);
logger.info("Rpc server instance list: {}", serviceInfos);
if (CollectionUtils.isEmpty(serviceInfos)) {
throw new RpcException("No rpc servers found.");
}
// TODO: 這裏模擬負載均衡,從多個服務提供者暴露的服務中隨機挑選一個,後期寫方法實現負載均衡
final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));
// 第二步:構造 rpc 請求對象
final RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setServiceName(serviceName);
rpcRequest.setMethod(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
// 第三步:編碼請求消息, TODO: 這裏可以配置多種編碼方式
byte[] data = messageProtocol.marshallingReqMessage(rpcRequest);
// 第四步:調用 rpc client 開始發送消息
byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo);
// 第五步:解碼響應消息
final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse);
// 第六步:解析返回結果進行處理
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
return rpcResponse.getRetValue();
}
});
}
}
◆六. 負載均衡
本實現支持兩種主要負載均衡策略,隨機和輪詢,其中他們都支持帶權重的隨機和輪詢,其實也就是四種策略。
◆七. Netty 通信
服務端和客戶端基本一樣,這裏只展示服務端的代碼。代理對象在 Spring 啓動的時候就生成了,但是沒有調用,每一個調用 (請求) 都會生成一個 Netty 的連接。
public class NettyRpcServer extends RpcServer {
@Override
public void start() {
// 創建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 創建服務端的啓動對象
ServerBootstrap serverBootstrap = new ServerBootstrap()
// 設置兩個線程組
.group(bossGroup, workerGroup)
// 設置服務端通道實現類型
.channel(NioServerSocketChannel.class)
// 服務端用於接收進來的連接,也就是boosGroup線程, 線程隊列大小
.option(ChannelOption.SO_BACKLOG, 100)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// child 通道,worker 線程處理器
.childHandler(new ChannelInitializer<SocketChannel>() {
// 給 pipeline 管道設置自定義的處理器
@Override
public void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new NettyServerHandler());
}
});
// 綁定端口號,同步啓動服務
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channel = channelFuture.channel();
// 對關閉通道進行監聽,變爲同步
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("server error.", e);
} finally {
// 釋放線程組資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
實現具體 handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//當通道就緒就會觸發該方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//進行記錄
logger.info("channel active: {}", ctx);
}
//讀取數據實際(這裏我們可以讀取客戶端發送的消息)
@Override
public void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//將數據讀到buffer中
final ByteBuf msgBuf = (ByteBuf) msg;
final byte[] reqBytes = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(reqBytes);
}
//數據讀取完畢
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//使用反射獲找到目標方法進行返回
final byte[] respBytes = requestHandler.handleRequest(reqBytes);
ctx.writeAndFlush(respBytes);
}
//處理異常, 一般是需要關閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
◆八. 序列化協議
對計算機網絡稍微有一點了解的同學都知道,數據在網絡中傳輸是二進制的:01010101010101010,類似這種,只有二進制數據才能在網絡中傳輸。但是在編碼之前我們一般先進行序列化,目的是爲了優化傳輸的數據量。因爲有的數據太大,需要進行空間優化。
那麼我們來區分一下序列化和編碼:我畫一張圖大家都全明白了
定義一個序列化協議,放入作爲一個 handler 放入 pipeline 中。
Netty 支持多種序列化,比如 jdk,Json,ProtoBuf 等,這裏使用 ProtoBuf,其序列化後碼流小性能高,非常適合 RPC 調用。接下來看怎麼使用 ProtoBuf?
-
- 編寫需要序列化的類 xxx.proto:ProtoBuf 有自己的語法規則 (自行百度)
-
- 通過官網提供的 protoc.exe 生成對應的 Java 代碼
-
- 前面通過工具生成的代碼(AnimalProto)已經幫我們封裝好了序列化和反序列化的方法,我們只需要調用對應方法即可
引入 Protobuf 的依賴
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.4.1</version>
</dependency>
序列化:
/**
* 調用對象構造好的Builder,完成屬性賦值和序列化操作
* @return
*/
public static byte[] protobufSerializer(){
AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder();
builder.setId(1L);
builder.setName("小豬");
List<String> actions = new ArrayList<>();
actions.add("eat");
actions.add("run");
builder.addAllActions(actions);
return builder.build().toByteArray();
}
反序列化:
/**
* 通過調用parseFrom則完成反序列化
* @param bytes
* @return
* @throws InvalidProtocolBufferException
*/
public static Animal deserialize(byte[] bytes) throws Exception {
AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes);
Animal animal = new Animal();
animal.setId(pAnimal.getId());
animal.setName(pAnimal.getName());
animal.setActions(pAnimal.getActionsList());
return animal;
}
測試:
public static void main(String[] args) throws Exception {
byte[] bytes = serializer();
Animal animal = deserialize(bytes);
System.out.println(animal);
}
以下看到是能正常序列化和反序列化的:
◆九. 通信協議
通信協議主要是解決網絡傳輸問題,比如 TCP 拆包粘包問題。
TCP 問題:
-
TCP 拆包粘包主要就是把一些數據合併或者分割開進行發送,這時候有的數據就不完整,有的數據就多出一部分,就會造成問題。一般使用 TCP 協議都需要考慮拆包粘包問題
-
tcp 粘包和半包問題就是因爲滑動窗口。因爲不管你的數據是多少長度, 怎麼分割每一條數據。但是 tcp 只按照我滑動窗口的長度發送。
-
本質是因爲 TCP 是流式協議,消息無邊界。
解決方案:業界的主流協議的解決方案可以歸納如下
-
消息定長:例如每個報文的大小爲固定長度 100 字節,如果不夠用空格補足。(定長解碼器)
-
在包尾加特殊結束符進行分割。(分隔符編碼器)
- 消息長度 + 消息:將消息分爲消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段。Netty 自帶:
- 自定義編解碼器
這裏只是列舉出來編碼過程,解碼是逆過程。(說白了,編碼就是找着固定的格式進行寫入,解碼就是照着固定的格式讀)
恭喜你,已經學會寫 RPC 框架了,想深入瞭解的朋友可以參照源碼。進行學習,升級。
來源:https://www.cnblogs.com/monkey-xuan/p/15893604.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/r_zzTFS883EA3xe3eizc1g