Netty 的編碼-解碼案例
0x01:半包粘包
例如發送兩個數據包給服務器,由於服務端一次讀取到的字節數不一定的分
沒有半包和拆包:服務器分兩次讀取到兩個地理的數據包,這個情況沒有拆包和粘包的情況
-
粘包:服務器一次收到兩個數據包,在一起收到的
-
拆包:第一次讀取到完成的第一個包和第二個包的一部分內容,第二次讀取到第二個包的剩餘內容
-
整包:第一次讀取到第一包的部分內容,第二次讀取到第一個包的剩餘部分和第二個包的全部
-
多次拆包:如果接收滑窗非常小,數據量大的時候發生多次發送的接收的情況
爲什麼會出現半包和粘包
1、HTTP 中有一個 Nagle 算法,每個報文都是一段的,使用網絡發送發現網絡效率低,然後 HTTP 設置一個算法,設置到一定程度發,所以出現一些延時,提高銷量,所以形成了粘包
2、HTTP 緩衝區引起的,報文段大的時候的時候直接弄在一起發送過去。
怎麼解決
不斷的從 TCP 的緩衝區中讀取數據,每次讀取完成都需要判斷是否是一個完整的數據包
如果是讀取的數據不足以拼接成一個完整的業務數據包,那就保留該數據,繼續從 TCP 緩衝區中讀取,直到得到一個完整的數據包
定長
分隔符
基於長度的變長包
如果當前督導的數據加上已經讀取到的數據足以拼接成一個數據包,那就講已經讀取的數據拼接本次讀取的數據,構成一個完整的業務數據包傳遞到業務邏輯上,多餘的數據保留,方便下次的讀取或者數據鏈接。
0x02:Netty 常用的編碼器
- LineBasedFrameDecoder
回車換行編碼器
配合StringDecoder
- DelimiterBasedFrameDecoder
分隔符解碼器
- FixedLengthFrameDecoder
固定長度解碼器
- LengthFieldBasedFrameDecoder
不能超過1024個字節不然會報錯
基於'長度'解碼器(私有協議最常用)
0x03:拆包的類
- ByteToMessageDecoder
自解析
- LengthFieldPrepender
長度編碼器
- Netty 拆包的基類 - ByteToMessageDecoder
內部維護了一個數據累積器cumulation,每次讀取到數據都會不斷累加,然後嘗試對累加到
的數據進行拆包,拆成一個完整的業務數據包
每次都將讀取到的數據通過內存拷貝的方式, 累積到cumulation中
調用子類的 decode 方法對累積的數據嘗試進行拆包
- LengthFieldBasedFrameDecoder
參數說明
maxFrameLength:包的最大長度
lengthFieldOffset:長度屬性的起始位(偏移位),包中存放長度屬性字段的起始位置
lengthFieldLength:長度屬性的長度
lengthAdjustment:長度調節值,在總長被定義爲包含包頭長度時,修正信息長度
initialBytesToStrip:跳過的字節數,根據需要跳過lengthFieldLength個字節,以便接收端直接接受到不含“長度屬性”的內容
- LengthFieldPrepender 編碼器
參數說明
lengthFieldLength:長度屬性的字節長度
lengthIncludesLengthFieldLength:false,長度字節不算在總長度中,true,算到總長度中
編解碼器的作用就是講原始字節數據與自定義的消息對象進行互轉
Decoder(解碼器)
Encoder(編碼器)
支持業界主流的序列化框架
Protobuf
Jboss Marshalling
Java Serialization
解碼 1 拆包:把整個 ByteBuf 數據,分成一個個 ByteBuf,每個表示一個包
解碼 2 反序列化:把每個包的 ByteBuf 字節數組轉成 java object
package com.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class StickyDemoClient {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new StickyDemoClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception {
// 工作線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
// ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
// Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192,
Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast(new StickyDemoClientHandler());
}
});
// 發起異步連接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放線程池資源
group.shutdownGracefully();
}
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class StickyDemoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static String[] alphabets = {"A", "B", "C", "D", "E", "F", "G", "H", "I",
"J", "K", "L", "M", "N", "O", "P"};
@Override
public void channelActive(ChannelHandlerContext ctx) {
for(int i=0; i<10; i++) {
StringBuilder builder = new StringBuilder();
builder.append("這是第");
builder.append(i);
builder.append("條消息, 內容是:");
for(int j=0; j<100; j++) {
builder.append(alphabets[i]);
}
builder.append("......");
builder.append("#");
System.out.println(builder.toString().getBytes().length);
ctx.writeAndFlush(Unpooled.copiedBuffer(builder.toString(),
CharsetUtil.UTF_8));
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("客戶端接收到消息:" + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
public class StickyDemoDecodeHandler extends ChannelInboundHandlerAdapter {
//存放待拆包數據的緩衝區
private ByteBuf cache;
private int frameLength;
public StickyDemoDecodeHandler(int length) {
this.frameLength = length;
}
static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
ByteBuf oldCache = cache;
cache = alloc.buffer(oldCache.readableBytes() + readable);
cache.writeBytes(oldCache);
oldCache.release();
return cache;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf data = (ByteBuf) msg;
try {
//讀取每一個消息,創建緩衝區
if (cache == null) {
cache = ctx.alloc().buffer(1024);
} else {
//如果現有的緩衝區容量太小,無法容納原有數據+新讀入的數據,就擴容(重新創建一個大的,並把數據拷貝過去)
if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
cache = expandCache(ctx.alloc(), cache, data.readableBytes());
}
}
//把新的數據讀入緩衝區
cache.writeBytes(data);
//每次讀取frameLength(定長)的數據,做爲一個包,存儲起來
List<ByteBuf> output = new ArrayList<>();
while (cache.readableBytes() >= frameLength) {
output.add(cache.readBytes(frameLength));
}
//還有部分數據不夠一個包,10, 15, 一個10個,還剩5個
if (cache.isReadable()) {
cache.discardReadBytes();
}
for (int i = 0; i < output.size(); i++) {
ctx.fireChannelRead(output.get(i));
}
} finally {
data.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
public class StickyDemoDecodeHandlerV2 extends ChannelInboundHandlerAdapter {
private ByteBuf cache;
private byte delimiter; //包分隔符
public StickyDemoDecodeHandlerV2(ByteBuf delimiter) {
if (delimiter == null) {
throw new NullPointerException("delimiter");
}
if (!delimiter.isReadable()) {
throw new IllegalArgumentException("empty delimiter");
}
this.delimiter = delimiter.readByte();
;
}
static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
ByteBuf oldCache = cache;
cache = alloc.buffer(oldCache.readableBytes() + readable);
cache.writeBytes(oldCache);
oldCache.release();
return cache;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf data = (ByteBuf) msg;
try {
if (cache == null) {
cache = ctx.alloc().buffer(1024);
} else {
if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
cache = expandCache(ctx.alloc(), cache, data.readableBytes());
}
}
cache.writeBytes(data);
List<ByteBuf> output = new ArrayList<>();
int frameIndex = 0;
int frameEndIndex = 0;
int length = cache.readableBytes();
while (frameIndex <= length) {
frameEndIndex = cache.indexOf(frameIndex + 1, length, delimiter);
if (frameEndIndex == -1) {
cache.discardReadBytes();
break;
}
output.add(cache.readBytes(frameEndIndex - frameIndex));
cache.skipBytes(1);
frameIndex = frameEndIndex + 1;
}
if (cache.isReadable()) {
cache.discardReadBytes();
}
for (int i = 0; i < output.size(); i++) {
ctx.fireChannelRead(output.get(i));
}
} finally {
data.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class StickyDemoServer {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用默認值
}
}
new StickyDemoServer().bind(port);
}
public void bind(int port) throws Exception {
// 第一步:
// 配置服務端的NIO線程組
// 主線程組, 用於接受客戶端的連接,但是不做任何具體業務處理,像老闆一樣,負責接待客戶,不具體服務客戶
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作線程組, 老闆線程組會把任務丟給他,讓手下線程組去做任務,服務客戶
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 類ServerBootstrap用於配置Server相關參數,並啓動Server
ServerBootstrap b = new ServerBootstrap();
// 鏈式調用
// 配置parentGroup和childGroup
b.group(bossGroup, workerGroup)
// 配置Server通道
.channel(NioServerSocketChannel.class)
// 配置通道的ChannelPipeline
.childHandler(new ChildChannelHandler());
// 綁定端口,並啓動server,同時設置啓動方式爲同步
ChannelFuture f = b.bind(port).sync();
System.out.println(StickyDemoServer.class.getName() + " 啓動成功,在地址[" + f.channel().localAddress() + "]上等待客戶請求......");
// 等待服務端監聽端口關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { '#' })));
//ch.pipeline().addLast("framer", new StickyDemoDecodeHandler(139));
// ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
// Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast(new StickyDemoServerHandler());
}
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class StickyDemoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(
"服務器接收到消息:" + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
// ctx.write(Unpooled.copiedBuffer("#", CharsetUtil.UTF_8));
//compositeBuffer.addComponent(in);
// ByteBuf buf = ctx.alloc().directBuffer();
// buf.writeBytes("#".getBytes());
// CompositeByteBuf compositeBuffer = ctx.alloc().compositeBuffer();
// compositeBuffer.addComponents(true, in, buf);
// ctx.write(compositeBuffer);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
source:https://www.yuque.com/yangxinlei/lodfss/nguvm0
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/sbbz1Esa6-yAkFRyOWUrvw