大家好,欢迎来到IT知识分享网。
前言
本章重点:
- ByteBuf零拷贝
- netty拆包粘包
- LengthFieldBasedFrameDecoder解码器
一、ByteBuf详解
下面这段代码演示了ByteBuf的创建以及内容的打印,这里显示出了和普通ByteBuffer最大的区别之一,就是ByteBuf可以自动扩容,默认长度是256,如果内容长度超过阈值时,会自动触发扩容。
public class NettyByteBufExample { public static void main(String[] args) { ByteBuf buf=ByteBufAllocator.DEFAULT.buffer(); //构建一个ByteBuf System.out.println("=======before ======"); log(buf); //构建一个字符串 StringBuilder stringBuilder=new StringBuilder(); for (int i = 0; i < 400; i++) { stringBuilder.append("-"+i); } buf.writeBytes(stringBuilder.toString().getBytes()); System.out.println("=======after ======"); buf.readShort(); //读取2个字节 buf.readByte(); //读取一个字节 log(buf); } private static void log(ByteBuf buf){ StringBuilder sb=new StringBuilder(); sb.append(" read index:").append(buf.readerIndex()); //读索引 sb.append(" write index:").append(buf.writerIndex()); //写索引 sb.append(" capacity :").append(buf.capacity()) ; //容量 ByteBufUtil.appendPrettyHexDump(sb,buf); System.out.println(sb.toString()); } }
1.1 ByteBuf创建的方法有两种
1.第一种,创建基于堆内存的ByteBuf:
ByteBuf buffer=ByteBufAllocator.DEFAULT.heapBuffer(10);
2.第二种,创建基于直接内存(堆外内存)的ByteBuf(默认情况下用的是这种):
Java中的内存分为两个部分,一部分是不需要jvm管理的直接内存,也被称为堆外内存。堆外内存就是把内存对象分配在JVM堆意外的内存区域,这部分内存不是虚拟机管理,而是由操作系统来管理,这样可以减少垃圾回收对应用程序的影响。
ByteBufAllocator.DEFAULT.directBuffer(10);
直接内存的好处是读写性能会高一些:
- 如果数据放在堆中,此时需要把java堆空间中的数据放到远程服务器,首先要把堆中的数据拷贝到直接内存(堆外内存)然后再发送。
- 如果是把数据直接存储到堆外内存中,发送的时候就少了一个复制步骤。
但是它也有缺点,由于缺少了JVM的内存管理,所以需要我们自己来维护堆外内存,防止内存溢出。
另外,需要注意的是,ByteBuf默认采用了池化技术来创建。关于池化技术在前面的课程中已经重复讲过,它的核心思想是实现对象的复用,从而减少对象频繁创建销毁带来的性能开销。
池化功能是否开启,可以通过下面的环境变量来控制,其中unpooled表示不开启。
-Dio.netty.allocator.type={unpooled|pooled}
1.1 ByteBuf的存储结构

在ByteBuf中,有两个指针:
- readerIndex: 读指针,每读取一个字节,readerIndex自增加1。ByteBuf里面总共有witeIndex-readerIndex个字节可读,当readerIndex和writeIndex相等的时候,ByteBuf不可读
- writeIndex: 写指针,每写入一个字节,writeIndex自增加1,直到增加到capacity后,可以触发扩容后继续写入。
- ByteBuf中还有一个maxCapacity最大容量,默认的值是 Integer.MAX_VALUE ,当ByteBuf写入数据时,如果容量不足时,会触发扩容,直到capacity扩容到maxCapacity。
1.2 ByteBuf中常用的方法
对于ByteBuf来说,常见的方法就是写入和读取。
1.2.1 Write相关方法
对于write方法来说,ByteBuf提供了针对各种不同数据类型的写入,比如:
- writeChar,写入char类型
- writeInt,写入int类型
- writeFloat,写入float类型
- writeBytes, 写入nio的ByteBuffer
- writeCharSequence, 写入字符串
public class NettyByteBufExample { public static void main(String[] args) { ByteBuf buf=ByteBufAllocator.DEFAULT.buffer(); //构建一个ByteBuf System.out.println("=======before ======"); log(buf); //构建一个字符串 StringBuilder stringBuilder=new StringBuilder(); for (int i = 0; i < 400; i++) { stringBuilder.append("-"+i); } buf.writeBytes(stringBuilder.toString().getBytes()); System.out.println("=======after ======"); buf.readShort(); //读取2个字节 buf.readByte(); //读取一个字节 log(buf); } private static void log(ByteBuf buf){ StringBuilder sb=new StringBuilder(); sb.append(" read index:").append(buf.readerIndex()); //读索引 sb.append(" write index:").append(buf.writerIndex()); //写索引 sb.append(" capacity :").append(buf.capacity()) ; //容量 ByteBufUtil.appendPrettyHexDump(sb,buf); System.out.println(sb.toString()); } }
1.2.2 扩容
当向ByteBuf写入数据时,发现容量不足时,会触发扩容,而具体的扩容规则是:
构建ByteBuf时,如不传入容量参数,默认容量大小为256
假设ByteBuf初始容量是10:
- 如果写入后数据大小未超过512个字节,则选择下一个16的整数倍进行库容。 比如写入数据后大小为12,则扩容后的capacity是16。
- 如果写入后数据大小超过512个字节,则选择下一个2n。 比如写入后大小是512字节,则扩容后的capacity是210=1024 。(因为29=512,长度已经不够了)
- 扩容不能超过max capacity,否则会报错。
1.3 Reader相关方法
reader方法也同样针对不同数据类型提供了不同的操作方法:
- readByte ,读取单个字节
- readInt , 读取一个int类型
- readFloat ,读取一个float类型
1.2.1处代码打印出来的结果如下:
从下面结果中可以看到,读完一个字节后,这个字节就变成了废弃部分,再次读取的时候只能读取 未读取的部分数据。
另外,如果想重复读取哪些已经读完的数据,这里提供了两个方法来实现标记和重置:
public static void main(String[] args) { //由JVM来管理内存 ByteBuf buf=ByteBufAllocator.DEFAULT.heapBuffer(); //堆内存 buf.writeBytes(new byte[]{1,2,3,4}); log(buf); buf.writeInt(5); log(buf); System.out.println("开始进行读取操作"); buf.markReaderIndex(); //标记索引位置. markWriterIndex() byte b=buf.readByte(); System.out.println(b); buf.resetReaderIndex(); //重新回到标记位置 log(buf); }
另外,如果想不改变读指针位置来获得数据,在ByteBuf中提供了 get 开头的方法,这个方法基于索引位置读取,并且允许重复读取的功能。
1.4 ByteBuf的零拷贝机制
需要说明一下,ByteBuf的零拷贝机制和我们之前提到的操作系统层面的零拷贝不同,操作系统层面的零拷贝:是我们要把一个文件发送到远程服务器时,需要从内核空间拷贝到用户空间,再从用户空间拷贝到内核空间的网卡缓冲区发送,导致拷贝次数增加。
而ByteBuf中的零拷贝思想也是相同,都是减少数据复制提升性能。如下图所示,假设有一个原始ByteBuf,我们想对这个ByteBuf其中的两个部分的数据进行操作。按照正常的思路,我们会创建两个新的ByteBuf,然后把原始ByteBuf中的部分数据拷贝到两个新的ByteBuf中,但是这种会涉及到数据拷贝,在并发量较大的情况下,会影响到性能。
ByteBuf中提供了一个slice方法,这个方法可以在不做数据拷贝的情况下对原始ByteBuf进行拆分,使用方法如下:
public static void main(String[] args) { ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(); buf.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,10}); log(buf); //从buf这个总的数据中,分别拆分5个字节保存到两个ByteBuf中 //零拷贝机制 (浅克隆) ByteBuf bb1=buf.slice(0,5); ByteBuf bb2=buf.slice(5,5); log(bb1); log(bb2); System.out.println("修改原始数据"); buf.setByte(2,8); log(bb1); }
打印结果:
1.5 ByteBuf拷贝的几种方法
在前面的案例中我们经常用到Unpooled工具类,它是同了非池化的ByteBuf的创建、组合、复制等操作。
假设有一个协议数据,它有头部和消息体组成,这两个部分分别放在两个ByteBuf中:
下面截图中包含了三种拷贝方式:
1.5.1 Unpooled
我们希望把header和body合并成一个ByteBuf,通常的做法是:
ByteBuf allBuf=Unpooled.buffer(header.readableBytes()+body.readableBytes()); allBuf.writeBytes(header); allBuf.writeBytes(body);
在这个过程中,我们把header和body拷贝到了新的allBuf中,这个过程在无形中增加了两次数据拷贝操作。那有没有更高效的方法减少拷贝次数来达到相同目的呢?
1.5.2 CompositeByteBuf组件
在Netty中,提供了一个CompositeByteBuf组件,它提供了这个功能:
public static void main(String[] args) { ByteBuf header= ByteBufAllocator.DEFAULT.buffer(); header.writeBytes(new byte[]{1,2,3,4,5}); ByteBuf body=ByteBufAllocator.DEFAULT.buffer(); body.writeBytes(new byte[]{6,7,8,9,10}); /* ByteBuf total= Unpooled.buffer(header.readableBytes()+body.readableBytes()); total.writeBytes(header); total.writeBytes(body);*/ //从逻辑成面构建了一个总的buf数据。 //第二个零拷贝实现 /* CompositeByteBuf compositeByteBuf=Unpooled.compositeBuffer(); compositeByteBuf.addComponents(true,header,body); log(compositeByteBuf);*/ //Unpooled ByteBuf total=Unpooled.wrappedBuffer(header,body); log(total); header.setByte(2,9); log(total); }
之所以CompositeByteBuf能够实现零拷贝,是因为在组合header和body时,并没有对这两个数据进行复制,而是通过CompositeByteBuf构建了一个逻辑整体,里面仍然是两个真实对象,也就是有一个指针指向了同一个对象,所以这里类似于浅拷贝的实现。
1.5.3 wrappedBuffer
在Unpooled工具类中,提供了一个wrappedBuffer方法,来实现CompositeByteBuf零拷贝功能。使用方法如下。
1.5.4 copiedBuffer
1.6 内存释放(可以不讲)
针对不同的ByteBuf创建,内存释放的方法不同。
- UnpooledHeapByteBuf,使用JVM内存,只需要等待GC回收即可
- UnpooledDirectByteBuf,使用对外内存,需要特殊方法来回收内存
- PooledByteBuf和它的之类使用了池化机制,需要更复杂的规则来回收内存
- 每个ByteBuf对象的初始计数为1
- 调用release方法时,计数器减一,如果计数器为0,ByteBuf被回收
- 调用retain方法时,计数器加一,表示调用者没用完之前,其他handler即时调用了release也不会造成回收。
- 当计数器为0时,底层内存会被回收,这时即使ByteBuf对象还存在,但是它的各个方法都无法正常使用
二、Netty中的拆包粘包问题
TCP传输协议是基于数据流传输的,而基于流化的数据是没有界限的,当客户端向服务端发送数据时,可能会把一个完整的数据报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大报文进行发送。
在这样的情况下,有可能会出现图3-1所示的情况。
- 服务端恰巧读到了两个完整的数据包 A 和 B,没有出现拆包/粘包问题;
- 服务端接收到 A 和 B 粘在一起的数据包,服务端需要解析出 A 和 B;
- 服务端收到完整的 A 和 B 的一部分数据包 B-1,服务端需要解析出完整的 A,并等待读取完整的 B数据包;
- 服务端接收到 A 的一部分数据包 A-1,此时需要等待接收到完整的 A 数据包;
- 数据包 A 较大,服务端需要多次才可以接收完数据包 A。
由于存在拆包/粘包问题,接收方很难界定数据包的边界在哪里,所以可能会读取到不完整的数据导致数据解析出现问题。
三、应用层定义通信协议
如何解决拆包和粘包问题呢?
一般我们会在应用层定义通信协议。其实思想也很简单,就是通信双方约定一个通信报文协议,服务端收到报文之后,按照约定的协议进行解码,从而避免出现粘包和拆包问题。
其实大家把这个问题往深度思考一下就不难发现,之所以在拆包粘包之后导致收到消息端的内容解析出现错误,是因为程序无法识别一个完整消息,也就是不知道如何把拆包之后的消息组合成一个完整消息,以及将粘包的数据按照某个规则拆分形成多个完整消息。所以基于这个角度思考,我们只需要针对消息做一个通信双方约定的识别规则即可。
3.1 消息长度固定
每个数据报文都需要一个固定的长度,当接收方累计读取到固定长度的报文后,就认为已经获得了一个完整的消息,当发送方的数据小于固定长度时,则需要空位补齐。
如下图所示,假设我们固定消息长度是4,那么没有达到长度的报文,需要通过一个空位来补齐,从而使得消息能够形成一个整体。
这种方式很简单,但是缺点也很明显,对于没有固定长度的消息,不清楚如何设置长度,而且如果长度设置过大会造成字节浪费,长度太小又会影响消息传输,所以一般情况下不会采用这种方式。
3.2 特定分隔符
既然没办法通过固定长度来分割消息,那能不能在消息报文中增加一个分割符呢?然后接收方根据特定的分隔符来进行消息拆分。比如我们采用\r\n来进行分割,如图3-3所示。

3.3 消息长度加消息内容加分隔符
基于消息长度+消息内容+分隔符的方式进行数据通信,这个之前大家在Redis中学习过,redis的报文协议定义如下。
*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nmic
可以发现消息报文包含三个维度:
- 消息长度
- 消息分隔符
- 消息内容
这种方式在项目中是非常常见的协议,首先通过消息头中的总长度来判断当前一个完整消息所携带的参数个数。然后在消息体中,再通过消息内容长度以及消息体作为一个组合,最后通过\r\n进行分割。服务端收到这个消息后,就可以按照该规则进行解析得到一个完整的命令进行执行。
3.4 Zookeeper中的消息协议
协议的响应头中的xid和上文中提到的请求头中的xid是一致的,响应中只是将请求中的xid原值返回。zxid代表ZooKeeper服务器上当前最新的事务ID。err则是一个错误码,当请求处理过程中出现异常情况时,会在这个错误码中标识出来。协议的响应体部分是指响应的主体内容部分,包含了响应的所有返回数据。不同的响应类型,其响应体部分的结构是不同的。
四、Netty中的编解码器
在Netty中,默认帮我们提供了一些常用的编解码器用来解决拆包粘包的问题。下面简单演示几种解码器的使用。
4.1 FixedLengthFrameDecoder解码器
固定长度解码器FixedLengthFrameDecoder的原理很简单,就是通过构造方法设置一个固定消息大小frameLength,无论接收方一次收到多大的数据,都会严格按照frameLength进行解码。
如果累计读取的长度大小为frameLength的消息,那么解码器会认为已经获取到了一个完整的消息,如果消息长度小于frameLength,那么该解码器会一直等待后续数据包的达到,知道获得指定长度后返回。
使用方法如下,在3.3节中演示的代码的Server端,增加一个FixedLengthFrameDecoder,长度为10:
ServerBootstrap serverBootstrap=new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new FixedLengthFrameDecoder(10)) //增加解码器 .addLast(new SimpleServerHandler()); } });
4.2 DelimiterBasedFrameDecoder解码器
特殊分隔符解码器: DelimiterBasedFrameDecoder,它有以下几个属性:
- delimiters,delimiters指定特殊分隔符,参数类型是ByteBuf,ByteBuf可以传递一个数组,意味着我们可以同时指定多个分隔符,但最终会选择长度最短的分隔符进行拆分。
比如接收方收到的消息体为:
hello\nworld\r\n
此时指定多个分隔符 \n 和 \r\n ,那么最终会选择最短的分隔符解码,得到如下数据
hello | world |
- maxLength,表示报文的最大长度限制,如果超过maxLength还没检测到指定分隔符,将会抛出TooLongFrameException。
- failFast,表示容错机制,它与maxLength配合使用。如果failFast=true,当超过maxLength后会立刻抛出TooLongFrameException,不再进行解码。如果failFast=false,那么会等到解码出一个完整的消息后才会抛出TooLongFrameException
- stripDelimiter,它的作用是判断解码后的消息是否去除分隔符,如果stripDelimiter=false,而
制定的特定分隔符是 \n ,那么数据解码的方式如下。
hello\nworld\r\n
当stripDelimiter=false时,解码后得到:
hello\n | world\r\n
4.3 LengthFieldBasedFrameDecoder解码器
LengthFieldBasedFrameDecoder是长度域解码器,它是解决拆包粘包最常用的解码器,基本上能覆盖大部分基于长度拆包的场景。其中开源的消息中间件RocketMQ就是使用该解码器进行解码的。
首先来说明一下该解码器的核心参数:
- lengthFieldOffset,长度字段的偏移量,也就是存放长度数据的起始位置
- lengthFieldLength,长度字段锁占用的字节数
- lengthAdjustment,在一些较为复杂的协议设计中,长度域不仅仅包含消息的长度,还包含其他数据比如版本号、数据类型、数据状态等,这个时候我们可以使用lengthAdjustment进行修正,它的值=包体的长度值-长度域的值
- initialBytesToStrip,解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
- lengthFieldEndOffset,长度字段结束的偏移量, 该属性的值=lengthFieldOffset+lengthFieldLength
上面这些参数理解起来比较难,我们通过几个案例来说明一下。
4.3.1 消息长度+消息内容的解码
假设存在下图所示的由长度和消息内容组成的数据包,其中length表示报文长度,用16进制表示,共占用2个字节,那么该协议对应的编解码器参数设置如下。
- lengthFieldOffset=0, 因为Length字段就在报文的开始位置
- lengthFieldLength=2,协议设计的固定长度为2个字节
- lengthAdjustment=0,Length字段质保函消息长度,不需要做修正
- initialBytesToStrip=0,解码内容是Length+content,不需要跳过任何初始字节。

4.3.2 截断解码结果
如果我们希望解码后的结果中只包含消息内容,其他部分不变,如图3-7所示。对应解码器参数组合如下:
- lengthFieldOffset=0,因为Length字段就在报文开始位置
- lengthFieldLength=2 , 协议设计的固定长度
- lengthAdjustment=0, Length字段只包含消息长度,不需要做任何修正
- initialBytesToStrip=2, 跳过length字段的字节长度,解码后ByteBuf只包含Content字段。
4.3.3 长度字段包含消息内容
如下图所示,如果Length字段中包含Length字段自身的长度以及Content字段所占用的字节数,那么Length的值为0x00d(2+11=13字节),在这种情况下解码器的参数组合如下:
- lengthFieldOffset=0,因为Length字段就在报文开始的位置
- lengthFieldLength=2,协议设计的固定长度
- lengthAdjustment=-2,长度字段为13字节,需要减2才是拆包所需要的长度。
- initialBytesToStrip=0,解码后内容依然是Length+Content,不需要跳过任何初始字节

4.3.4 基于长度字段偏移的解码
如下图所示,Length字段已经不再是报文的起始位置,Length字段的值是0x000b,表示content字段占11个字节,那么此时解码器的参数配置如下:
- lengthFieldOffset=2,需要跳过Header所占用的2个字节,才是Length的起始位置
- lengthFieldLength=2,协议设计的固定长度
- lengthAdjustment=0,Length字段只包含消息长度,不需要做任何修正
- initialBytesToStrip=0,解码后内容依然是Length+Content,不需要跳过任何初始字节

4.3.5 基于长度偏移和长度修正解码
- lengthFieldOffset=1,需要跳过hdr1所占用的1个字节,才是Length的起始位置
- lengthFieldLength=2,协议设计的固定长度
- lengthAdjustment=1,由于hdr2+content一共占了1+11=12字节,所以Length字段值(11字节)加上lengthAdjustment(1)才能得到hdr2+Content的内容(12字节)
- initialBytesToStrip=3,解码后跳过hdr1和length字段,共3个字节

4.4 解码器实战
比如我们定义如下消息头,客户端通过该消息协议发送数据,服务端收到该消息后需要进行解码。

先定义客户端,其中Length部分,可以使用Netty自带的LengthFieldPrepender来实现,它可以计算当前发送消息的二进制字节长度,然后把该长度添加到ByteBuf的缓冲区头中。
public class LengthFieldBasedFrameDecoderClient { public static void main(String[] args) { EventLoopGroup workGroup=new NioEventLoopGroup(); Bootstrap b=new Bootstrap(); b.group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() //如果协议中的第一个字段为长度字段, // netty提供了LengthFieldPrepender编码器, // 它可以计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中 .addLast(new LengthFieldPrepender(2,0,false)) //使用StringEncoder,在通过writeAndFlush时,不需要自己转化成ByteBuf //StringEncoder会自动做这个事情 .addLast(new StringEncoder()) .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("i am request!"); ctx.writeAndFlush("i am a another request!"); } }); } }); try { ChannelFuture channelFuture=b.connect("localhost",8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { workGroup.shutdownGracefully(); } } }
上述代码运行时,会得到两个报文。
下面是Server端的代码,增加了LengthFieldBasedFrameDecoder解码器,其中有两个参数的值如下:
- lengthFieldLength:2 , 表示length所占用的字节数为2
- initialBytesToStrip: 2 , 表示解码后跳过length的2个字节,得到content内容
public class LengthFieldBasedFrameDecoderServer { public static void main(String[] args) { EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap=new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,2,0,2)) .addLast(new StringDecoder()) .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("receive message:"+msg); } }); } }); ChannelFuture channelFuture=serverBootstrap.bind(8080).sync(); //绑定端口 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
4.4.1 我的实践
客户端:
服务端:
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/112561.html









