我是靠谱客的博主 和谐奇异果,这篇文章主要介绍Netty(十三)——ChannelHandler之意,现在分享给大家,希望可以做个参考。

       上篇我们讲述了消息容器ChannelPipeline的相关知识,这篇我们来看下由它管理的,负责对I/O事件或者I/O操作进行拦截和处理,可以选择性的拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递,也是由我们亲自实现的ChannelHandler业务处理。而Netty也帮我们实现一些通用的Handler,我们直接用就可以,前边写的例子中应该都有体会。我们还是来先看知识思维导图吧:

        一,类关系继承图:由于Netty提供了很多Handler,例如:对I/O操作和事件进行预处理的,编解码的,流量整形、读写超时、日志等等,这里给出几个典型的编解码Handler的讲解,也是前边写的示例中我们用的。先看下类的关系图吧:

       二,ChannelHandlerAdapter功能说明:看上边关系图,接口ChannelHandler提供了I/O操作和事件触发的一些方法,而ChannelHandlerAdapter类,则对所有接口实现都是事件透传,如果我们某个Handler相对某个或者某些事进行处理,只需要覆盖ChannelHandlerAdapter对应的方法即可,其它的直接继承父类ChannelHandlerAdapter即可。看下ChannelHandlerAdapter的实现事件怎么透传:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//说明:被@Skip注解的方法不会被调用,直接忽略; //@Sharable:注解后多个ChannelPipeline公用一个ChannelHandler. /** * Return {@code true} if the implementation is {@link Sharable} and so can be added * to different {@link ChannelPipeline}s. */ public boolean isSharable() { /** * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of * {@link Thread}s are quite limited anyway. * * See <a href="See https://github.com/netty/netty/issues/2289">#2289</a>. */ Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; } /** * Calls {@link ChannelHandlerContext#read()} to forward * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Skip @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } /** * Calls {@link ChannelHandlerContext#write(Object)} to forward * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Skip @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }

       三,具体的实现类:

      1,ByteToMessageDecoder:通过名字也很容易理解,就是为了方便我们将ByteBuf解码成业务POJO对象,为一个抽象的工具类,我们只需要继承它并实现decode方法即可。但是ByteToMessageDecoder并没有考虑TCP的粘包和拆包等问题,读半包需要我们自己处理。所以一般会继承它的子类来处理。看下源码吧:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//1,读信息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //如果msg为ByteBuf,则进行解码,否则直接透传 if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; //判断cumulation是否为空,来判断是否缓存了半包消息。 first = cumulation == null; if (first) { //为空则是没有半包消息,直接复制 cumulation = data; } else { //有半包消息,则需要将data复制进行,进行组合。 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //进行解码 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } } /** * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @param out the {@link List} to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); int oldInputLength = in.readableBytes(); //调用用户实现的decode方法 decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } //1,如果用户解码器没有消费ByteBuf,则说明是半包消息,需要继续读取后续的数据,直接退出循环;2,如果用户解码器消费了ByteBuf,说明可以继续进行; if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } //3,如果用户解码器没有消费ByteBuf,但是却多解码出一个或多个对象,则为异常 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } //4,如果是单条消息解码器,第一次解码完成之后直接退出循环。 if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }

        2,MessageToMessageDecoder:通过名字可以知道它是将一个对象解码为其他对象,称作为二次解码器。ByteBuffer——>Java对象(httpRquest对象)——>其他Java(POJO)对象。相当于第二个环节。用户还是只需要实现decode方法即可。由于是对象到另个对象处理,没有半包问题,相对来说简单些。看下源码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { //进行类型判断 if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; try { //调用我们写的decode方法 decode(ctx, cast, out); } finally { //释放msg对象 ReferenceCountUtil.release(cast); } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } /** * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next * {@link ChannelHandler} in the {@link ChannelPipeline}. */ public boolean acceptInboundMessage(Object msg) throws Exception { return matcher.match(msg); }

       3,LengthFieldBasedFrameDecoder:为了解决粘包拆包导致的拆包问题,在以前的例子中 Netty(二)——粘包、拆包解决之道我们用过LineBasedFramDecoder和DelimiterBasedFrameDecoder来处理半包。这里说下LengthFieldBasedFrameDecoder。回顾一下前边如何区分一个整包信息:a,固定长度,不足的前边补零,解码器只需要读取固定长度的字节处理即可;b,通过回车、换行符区分消息;c,通过分隔符区分整包消息;d,通过指定长度来标识整包消息等 。  而LengthFieldBasedFrameDecoder就是通过长度进行区分的。适用通常由消息长度字段+消息体组成的协议。看下源码吧:

可以通过四个字段的不同值来指定切字段的方式
lengthFieldOffset长度字段的偏移量
lengthFieldLength字段的长度量
lengthAdjustment要添加到长度字段值的补偿值
initialBytesToStrip从解码帧中去除的第一段的字节数
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/** * 核心构造方法 * Creates a new instance. * * @param byteOrder * the {@link ByteOrder} of the length field * @param maxFrameLength * the maximum length of the frame. If the length of the frame is * greater than this value, {@link TooLongFrameException} will be * thrown. * @param lengthFieldOffset * the offset of the length field * @param lengthFieldLength * the length of the length field * @param lengthAdjustment * the compensation value to add to the value of the length field * @param initialBytesToStrip * the number of first bytes to strip out from the decoded frame * @param failFast * If <tt>true</tt>, a {@link TooLongFrameException} is thrown as * soon as the decoder notices the length of the frame will exceed * <tt>maxFrameLength</tt> regardless of whether the entire frame * has been read. If <tt>false</tt>, a {@link TooLongFrameException} * is thrown after the entire frame that exceeds <tt>maxFrameLength</tt> * has been read. */ public LengthFieldBasedFrameDecoder( ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { if (byteOrder == null) { throw new NullPointerException("byteOrder"); } if (maxFrameLength <= 0) { throw new IllegalArgumentException( "maxFrameLength must be a positive integer: " + maxFrameLength); } if (lengthFieldOffset < 0) { throw new IllegalArgumentException( "lengthFieldOffset must be a non-negative integer: " + lengthFieldOffset); } if (initialBytesToStrip < 0) { throw new IllegalArgumentException( "initialBytesToStrip must be a non-negative integer: " + initialBytesToStrip); } if (lengthFieldOffset > maxFrameLength - lengthFieldLength) { throw new IllegalArgumentException( "maxFrameLength (" + maxFrameLength + ") " + "must be equal to or greater than " + "lengthFieldOffset (" + lengthFieldOffset + ") + " + "lengthFieldLength (" + lengthFieldLength + ")."); } this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; } //进行解码,解码成功加入到list中 @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } /** * 解码的具体实现,根据指定长度进行相应的切割,来保证整包 * Create a frame out of the {@link ByteBuf} and return it. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could * be created. */ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes()); in.skipBytes(localBytesToDiscard); bytesToDiscard -= localBytesToDiscard; this.bytesToDiscard = bytesToDiscard; failIfNecessary(false); } if (in.readableBytes() < lengthFieldEndOffset) { return null; } int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < 0) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameException( "negative pre-adjustment length field: " + frameLength); } frameLength += lengthAdjustment + lengthFieldEndOffset; if (frameLength < lengthFieldEndOffset) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameException( "Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + lengthFieldEndOffset); } if (frameLength > maxFrameLength) { long discard = frameLength - in.readableBytes(); tooLongFrameLength = frameLength; if (discard < 0) { // buffer contains more bytes then the frameLength so we can discard all now in.skipBytes((int) frameLength); } else { // Enter the discard mode and discard everything received so far. discardingTooLongFrame = true; bytesToDiscard = discard; in.skipBytes(in.readableBytes()); } failIfNecessary(true); return null; } // never overflows because it's less than maxFrameLength int frameLengthInt = (int) frameLength; if (in.readableBytes() < frameLengthInt) { return null; } if (initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException( "Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip); } in.skipBytes(initialBytesToStrip); // extract frame int readerIndex = in.readerIndex(); int actualFrameLength = frameLengthInt - initialBytesToStrip; ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); in.readerIndex(readerIndex + actualFrameLength); return frame; }

        4,MessageToByteEncoder:负责将POJO对象编码为ByteBuf,用户只需继承此类,实现encode方法即可。相对容易理解,看源码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { //判断当前编码器是否支持要发送的消息类型 if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { //调用实现直接编码 encode(ctx, cast, buf); } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) { //发送 ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { //释放缓冲区 buf.release(); } } } /**对于直接内存分配ioBuffer(堆外内存),对于堆内存通过heapBuffer进行分配 * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}. * Sub-classes may override this method to returna {@link ByteBuf} with a perfect matching {@code initialCapacity}. */ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { return ctx.alloc().ioBuffer(); } else { return ctx.alloc().heapBuffer(); } }

        5,MessageToMessageEncoder:将一个POJO对象编码成另一个对象,例如:POJO Java对象——>HttpResponse对象——>ByteBuf。我们也是继承此类,实现encode接口方法即可。看源码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { RecyclableArrayList out = null; try { if (acceptOutboundMessage(msg)) { out = RecyclableArrayList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { encode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } if (out.isEmpty()) { out.recycle(); out = null; throw new EncoderException( StringUtil.simpleClassName(this) + " must produce at least one message."); } } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable t) { throw new EncoderException(t); } finally { if (out != null) { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { ctx.write(out.get(0), promise); } else if (sizeMinusOne > 0) { // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure // See https://github.com/netty/netty/issues/2525 ChannelPromise voidPromise = ctx.voidPromise(); boolean isVoidPromise = promise == voidPromise; for (int i = 0; i < sizeMinusOne; i ++) { ChannelPromise p; if (isVoidPromise) { p = voidPromise; } else { p = ctx.newPromise(); } ctx.write(out.get(i), p); } ctx.write(out.get(sizeMinusOne), promise); } out.recycle(); } } }

      6,LengthFieldPrePender:如果协议中第一个字段为长度字段,此编码器可以计算当前待发送消息的二进制字节长度,添加到ByteBuf的缓冲区头中,编码后的消息组成:长度字段+原消息。也是和上边相对应的,看下源码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/** * 构造方法 * Creates a new instance. * * @param lengthFieldLength the length of the prepended length field. * Only 1, 2, 3, 4, and 8 are allowed. * @param lengthAdjustment the compensation value to add to the value * of the length field * @param lengthIncludesLengthFieldLength * if {@code true}, the length of the prepended * length field is added to the value of the * prepended length field. * * @throws IllegalArgumentException * if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8 */ public LengthFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength) { if (lengthFieldLength != 1 && lengthFieldLength != 2 && lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 8) { throw new IllegalArgumentException( "lengthFieldLength must be either 1, 2, 3, 4, or 8: " + lengthFieldLength); } this.lengthFieldLength = lengthFieldLength; this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength; this.lengthAdjustment = lengthAdjustment; } //编码实现 @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { int length = msg.readableBytes() + lengthAdjustment; if (lengthIncludesLengthFieldLength) { length += lengthFieldLength; } //调整后的长度<0,剖出异常 if (length < 0) { throw new IllegalArgumentException( "Adjusted frame length (" + length + ") is less than zero"); } //对消息长度所占字节进行判断: switch (lengthFieldLength) { case 1: //1最大可以为255个字节2^8,writeByte if (length >= 256) { throw new IllegalArgumentException( "length does not fit into a byte: " + length); } out.add(ctx.alloc().buffer(1).writeByte((byte) length)); break; case 2: //2最大为2^16字节,writeShort if (length >= 65536) { throw new IllegalArgumentException( "length does not fit into a short integer: " + length); } out.add(ctx.alloc().buffer(2).writeShort((short) length)); break; case 3: //3最大为2^24字节,writeMedium if (length >= 16777216) { throw new IllegalArgumentException( "length does not fit into a medium integer: " + length); } out.add(ctx.alloc().buffer(3).writeMedium(length)); break; case 4: //4,writeInt out.add(ctx.alloc().buffer(4).writeInt(length)); break; case 8: //8,writeLong out.add(ctx.alloc().buffer(8).writeLong(length)); break; default: throw new Error("should not reach here"); } //添加到List<Object> out中 out.add(msg.retain()); }

       好,这篇讲了ChannelHandler的相关知识,主要讲述了三个解码器和对应的编码器,对比着看这些源码其实会很容易的。我们不仅可以使用netty提供的一些默认的Handler,还可以根据这些Handler的源码学习,来写出适合自己业务更加合理的Handler。好,继续中……

最后

以上就是和谐奇异果最近收集整理的关于Netty(十三)——ChannelHandler之意的全部内容,更多相关Netty(十三)——ChannelHandler之意内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(82)

评论列表共有 0 条评论

立即
投稿
返回
顶部