大家好,欢迎来到IT知识分享网。
一、引言
- 简要介绍 Netty 作为高性能网络框架的应用场景。
- 解码器在 Netty 中的作用:从 字节流 解码为 消息对象,并解决粘包/拆包问题。
- 本文将重点分析解码器的调用时机、继承关系和LengthFieldBasedFrameDecoder、LineBasedFrameDecoder的实现源码。
二、Netty解码器概述
2.1 什么是Netty解码器?
- 定义:解码器是 Netty 中用来从 ByteBuf 中提取消息内容的组件。
- 作用:将字节流转化为 Java 对象,以供业务层处理。
- 常见解码器:LengthFieldBasedFrameDecoder、LineBasedFrameDecoder、DelimiterBasedFrameDecoder等。
2.2 解码器的作用
- 解析协议:比如基于长度、分隔符、固定长度等协议格式。
- 解决粘包/拆包问题:保证从字节流中解析出完整的一帧数据。
三、Netty解码器的继承结构
3.1 解码器的父类:ByteToMessageDecoder
- ByteToMessageDecoder是所有Netty解码器的基类,继承自ChannelInboundHandlerAdapter。
- 它主要实现了decode()方法,负责从ByteBuf中读取数据,并填充解码后的消息。
- decode() 的默认实现是将数据存入out。
3.2 类的继承结构图:

3.3 解码器的调用顺序
- ChannelPipeline的工作流程:
- 数据通过 ChannelPipeline 传递,每个 ChannelHandler 会处理接收到的数据。
- 解码器作为 ChannelInboundHandler 的一部分,被放置在 ChannelPipeline 中来处理入站数据。
- 顺序:数据从 ChannelPipeline 中的每个 handler 顺序传递,解码器的执行会在数据流到达时被调用。
四、解码器的调用时机
4.1 ChannelPipeline的调用流程
- 数据进入ChannelPipeline:每个 ChannelHandler(包括解码器)依次处理入站数据。
- 解码器的作用时机:
- 在数据传输到达某个 handler 时,解码器会被触发。
- 一旦解码器读取到足够的数据,它会触发 channelRead(),将解码后的数据传递到下一个handler。
- 4.2 decode()方法的执行时机
- decode() 方法在解码器中被自动调用,Netty 会根据当前的 ByteBuf 数据调用 decode() 处理。
- decode() 会读取传入的 ByteBuf 并解码,只处理可读取的部分,并将解码后的对象添加到 out中。
4.3什么时候调用 decode()?
- 解码器会在channelRead() 被触发时调用 decode() 方法。
- 当接收到的ByteBuf数据足够解析时,decode()会被自动执行。
- 解码器也会在cumulation(内部缓存区)数据积累完毕后调用 decode()。
五、LineBasedFrameDecoder源码分析
5.1 类定义与构造方法
- 类定义:LineBasedFrameDecoder是一个基于行分隔符的帧解码器,通常用于处理类似HTTP、IRC 等协议。
- 构造方法:maxLength:设置消息的最大长度,用于防止无限制读取数据。delimiter:行分隔符(通常为 \n 或 \r\n)。
5.2 成员属性
1.maxLength:代表一个消息的最大允许的长度。
2.failFast:布尔类型的属性,如果设置为 true,当解码器发现一个消息超过了最大允许的长度(例如,消息太长,无法在缓冲区中存储),它会立刻抛出异常,并放弃该消息的解码处理。如果设置为 false,解码器会等到处理完整个消息或者收到足够的数据后再进行判断,不会立即抛出异常。
3.stripDelimiter: 是一个布尔类型的属性,通常在基于分隔符的解码器中使用,用来决定是否从解码出的消息中移除分隔符(例如,\n或 \r\n)。它的作用是:如果stripDelimiter = true,解码器会去掉消息中的分隔符,只返回有效的数据部分。例如,LineBasedFrameDecoder会去掉每行的换行符(\n 或 \r\n), 如果stripDelimiter = false,解码器会保留分隔符,解码出的消息将包括分隔符。
4.discarding: 是一个布尔值属性,通常在处理缓冲区时使用,表示是否正在丢弃数据。如果一个缓冲区正在丢弃数据(比如数据已经超出了处理范围,或者暂时不需要处理),discarding 的值会被设置为true。
5.discardedBytes: discardedBytes是一个整数属性,表示已经丢弃的字节数。它用来记录自上次丢弃数据以来,已经丢弃了多少字节。
6.offset: offset是一个整数属性,表示数据在缓冲区中的偏移量。它通常指示从数据的起始位置到某个指定位置的距离。
5.3 源码分析
当我们在一个ChannelPipeline中添加LineBasedFrameDecoder时候会调用其channelRead方法,这个方法是其父类ByteToMessageDecoder的方法,该方法源码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { first = cumulation == null; cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { try { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); fireChannelRead(ctx, out, size); } finally { out.recycle(); } } } else { ctx.fireChannelRead(msg); } }
从对象池中借出一个CodecOutputList实例,如果没有可用的旧对象就新建一个,调用cumulator.cumulate方法,cumulator是一个匿名类,它的源码是这样的:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { if (!cumulation.isReadable() && in.isContiguous()) { cumulation.release(); return in; } try { final int required = in.readableBytes(); if (required > cumulation.maxWritableBytes() || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || cumulation.isReadOnly()) { return expandCumulation(alloc, cumulation, in); } cumulation.writeBytes(in, in.readerIndex(), required); in.readerIndex(in.writerIndex()); return cumulation; } finally { in.release(); } } }
MERGE_CUMULATOR是字节缓冲累加器逻辑(Cumulator),Netty使用它在处理半包、拆包(如 TCP 粘包)时,把多次接收到的ByteBuf数据累积到一个连续的缓冲区中。为后续的解码器提供一个连续的数据区域。
if (!cumulation.isReadable() && in.isContiguous())
- cumulation.isReadable():判断旧缓冲区是否还有数据,如果没有说明可以丢掉。
- in.isContiguous():判断in是否是连续内存(如非 CompositeBuffer)。
如果旧缓冲区是空的并且新的缓冲区是连续内存,使用新数据的in作为cumulation,cumulation.isReadable()为false说明上次的数据已经全部消费掉了,没有剩余数据,也就是当前解码起点是新的包头,不需要拼接历史数据了。
final int required = in.readableBytes(); if (required > cumulation.maxWritableBytes() || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || cumulation.isReadOnly()) { return expandCumulation(alloc, cumulation, in); }
这段代码说明了三种情况说明旧缓冲区无法继续写入新数据,需要替换或扩容:
- maxWritableBytes:写空间不足;
- refCnt() > 1:被多个地方引用,不安全,不能原地扩容;
- isReadOnly():是只读缓冲区(如 Unpooled.wrappedBuffer(byte[]);
cumulation.writeBytes(in, in.readerIndex(), required); 将 in的数据写入cumulation,此时cumulation 成为数据总汇。
设置 in.readerIndex = writerIndex:表示in的数据已经全部读完了,下次不能再读,最后释放in,调用的是in.release()。
接着调用callDecode方法,传入ChannelHandlerContext ctx、上面计算得出的ByteBuf cumulation、Object msg,该方法源码如下:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { final int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break; } } int oldInputLength = in.readableBytes(); decodeRemovalReentryProtection(ctx, in, out); if (ctx.isRemoved()) { break; } if (out.isEmpty()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
调用in.readableBytes()读取出字节数,然后调用
decodeRemovalReentryProtection方法,该方法源码如下:
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; try { decode(ctx, in, out); } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { fireChannelRead(ctx, out, out.size()); out.clear(); handlerRemoved(ctx); } } }
这个方法调用了decode方法,该方法源码如下:
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
又调用重载方法decode方法,该方法源码如下:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (!discarding) { if (eol >= 0) { final ByteBuf frame; final int length = eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; if (length > maxLength) { buffer.readerIndex(eol + delimLength); fail(ctx, length); return null; } if (stripDelimiter) { frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else { final int length = buffer.readableBytes(); if (length > maxLength) { discardedBytes = length; buffer.readerIndex(buffer.writerIndex()); discarding = true; offset = 0; if (failFast) { fail(ctx, "over " + discardedBytes); } } return null; } } else { if (eol >= 0) { final int length = discardedBytes + eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; buffer.readerIndex(eol + delimLength); discardedBytes = 0; discarding = false; if (!failFast) { fail(ctx, length); } } else { discardedBytes += buffer.readableBytes(); buffer.readerIndex(buffer.writerIndex()); offset = 0; } return null; } }
这里面又调用了findEndOfLine方法,该方法主要是找是否有以\n结尾或者\r结尾的字符,该方法的源码如下:
private int findEndOfLine(final ByteBuf buffer) { int totalLength = buffer.readableBytes(); int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF); if (i >= 0) { offset = 0; if (i > 0 && buffer.getByte(i - 1) == '\r') { i--; } } else { offset = totalLength; } return i; }
如果找到了i就会是大于或者等于0,把offset=0,如果没有找到i是小于0的,offset等于读取到的字符长度,最终返回i。
- 如果不是丢弃状态并且 i < 0的情况会执行下面的逻辑:
- final int length = buffer.readableBytes(); if (length > maxLength) { discardedBytes = length; buffer.readerIndex(buffer.writerIndex()); discarding = true; offset = 0; if (failFast) { fail(ctx, “over “ + discardedBytes); } } return null;
- 读取字节如果大于maxLength,记录被丢弃的字节数(discardedBytes = length),将读指针跳到写指针,跳过这段数据(readerIndex = writerIndex),进入丢弃状态discarding = true,表示接下来数据都不要了,如果 failFast == true,则立刻触发异常(调用 fail()),最后返回 null,表示这一帧数据不继续往下传。
- 如果小于maxLength直接返回null。
- 如果不是丢弃状态并且 i > 0的情况会执行下面的逻辑:
- if (eol >= 0) { final ByteBuf frame; final int length = eol – buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == ‘\r’? 2 : 1; if (length > maxLength) { buffer.readerIndex(eol + delimLength); fail(ctx, length); return null; } if (stripDelimiter) { frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { frame = buffer.readRetainedSlice(length + delimLength); } return frame; }
- 计算length的值,它是当前读取位置到换行符之间的字节长度(即这一行的正文内容长度),判断换行符的长度是 1 字节(\n)还是 2 字节(\r\n),如果是 \r\n,总长度是2,即delimLength=2,否则即delimLength=1,判断这行数据是否超过最大长度,如果是 跳过整行(包括分隔符),调用 fail(ctx, length) 抛出异常或处理,返回 null 表示这帧数据无效。
判断stripDelimiter如果是true,表示不要分隔符数据,只读取正文,如果是false则表示连同换行符也作为数据的一部分读出,返回成功解析的一帧数据并添加到List
六、LengthFieldBasedFrameDecoder源码分析
6.1 类定义与构造方法
- 类定义:LengthFieldBasedFrameDecoder是Netty提供的一个非常常用的分包/粘包处理解码器。它通过读取报文中某个长度字段来决定每一帧(frame)的边界,解决了TCP粘包/拆包问题,它会读取设置的某个长度字段,根据这个字段的值,截取出一帧完整的消息,然后交给后续的处理器。
- 构造方法:
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip )
6.2 成员属性
变量名 |
说明 |
maxFrameLength |
最大允许帧长度,防止攻击 |
lengthFieldOffset |
长度字段的起始偏移 |
lengthFieldLength |
长度字段的字节长度 |
lengthAdjustment |
从长度字段值到帧末尾需要调整的偏移量 |
initialBytesToStrip |
解码后要丢弃的字节数量(一般用于跳过头部) |
failFast |
是否在超长帧读到长度字段时就立即抛异常(否则等读完整) |
discardingTooLongFrame |
是否正在丢弃超长帧(状态标记) |
1. maxFrameLength:最大帧长度(防止攻击或异常)
- 类型:int
- 作用:控制每一帧最大允许的长度。超出就认为数据异常(如恶意客户端粘包攻击),会抛 TooLongFrameException。
- 示例:你设为 1024,表示最多每帧 1024 字节,超出丢弃。
2.lengthFieldOffset:长度字段的起始偏移
- 类型:int
- 作用:告诉 Netty 从第几个字节开始是长度字段。
- 示例:如果前 2 字节是“魔数”或“标志位”,长度字段从第 3 字节开始,那就设置为 2。
3. lengthFieldLength:长度字段的长度
- 类型:int
- 作用:告诉 Netty,长度字段占几个字节(1/2/4/8)。
- 常见设置:
- 1:byte
- 2:short
- 4:int(最常见)
- 8:long
4. lengthAdjustment:长度补偿(纠偏)
- 类型:int
- 作用:长度字段是否包含其他字段(如头部)?
- 如果长度字段的值只表示数据体长度(不含头),那这个参数设置为 0;
- 如果长度字段的值包含了头部,比如“总长度 = 头部 + 数据”,那需要减去头部长度(比如 -6)。
示例: 协议结构:
魔数(2) |
长度(4) |
数据(N bytes) |
如果长度字段(4)表示的数据长度 不包含前面的6个字节,那 adjustment 就是 0。
如果长度字段包含整个包(比如长度字段 = N+6),那你就要设置 adjustment = -6。
5. initialBytesToStrip:解码后跳过的字节数
- 类型:int
- 作用:解码成功后,是否跳过前面的某些字节?比如魔数、版本号之类你不想处理,可以 strip 掉。
- 示例:
- 如果只关心业务体,头部不需要,就设置为头部长度,比如跳过前 6 字节。
- 如果想完整保留头部,就设为 0。
最常见的协议是这样的:
长度字段(4) |
实际数据(N bytes) |
表示长度字段表示的是 “数据体的长度”,不包含自己。
new LengthFieldBasedFrameDecoder( 1024, // 最大帧长度 0, // 长度字段从第0字节开始 4, // 长度字段占4字节 0, // 长度字段就代表数据体,不需要补偿 4 // 跳过长度字段,只保留数据体 );
6.3 decode源码分析
它的源码是这样的:
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (discardingTooLongFrame) { discardingTooLongFrame(in); } if (in.readableBytes() < lengthFieldEndOffset) { return null; } int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < 0) { failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset); } frameLength += lengthAdjustment + lengthFieldEndOffset; if (frameLength < lengthFieldEndOffset) { failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset); } if (frameLength > maxFrameLength) { exceededFrameLength(in, frameLength); return null; } int frameLengthInt = (int) frameLength; if (in.readableBytes() < frameLengthInt) { return null; } if (initialBytesToStrip > frameLengthInt) { failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip); } in.skipBytes(initialBytesToStrip); int readerIndex = in.readerIndex(); int actualFrameLength = frameLengthInt - initialBytesToStrip; ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); in.readerIndex(readerIndex + actualFrameLength); return frame; }
读取ByteBuf中数据,判断是否小于长度字段,如果小于说明一条完整消息还没来,先返回 null,等待下次调用,从指定偏移读取“未调整的帧长度”,也就是字节流中原始写入的那个长度字段的值, 长度不能是负数,非法协议,抛异常或关闭连接,计算完整帧的总长度frameLength,如果调整后的总长度都比长度字段末尾还小,明显出错,协议格式错误,超过最大允许的帧长,丢弃或报错处理。
if (in.readableBytes() < frameLengthInt) { return null; }
这段代码代表读取的数据如果小于当前数据还不够整帧,就返回null等下一次数据到达再处理。
int readerIndex = in.readerIndex(); int actualFrameLength = frameLengthInt - initialBytesToStrip; ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); in.readerIndex(readerIndex + actualFrameLength); return frame;
- ByteBuf中提取一段完整的帧(消息体)并返回。
- Netty会把返回的这个frame继续往下传给下游handler。
之后的逻辑就又回到ByteToMessageDecoder中了和上面的逻辑一样。
七、总结
- 解码器的调用时机:解码器在ChannelPipeline的数据流中起到重要作用,解码的调用时机是由 decode() 方法自动触发的。
- 继承关系:Netty解码器通过继承ByteToMessageDecoder类实现,解码器通过多态处理不同类型的数据流。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/185063.html