概述
对于read方法,一般经过的ChannelInboundHandler:
ChannelPipeLine->ByteToMessageDecoder (首先字节流分割为完整的一个报文)-->MessageToMessageDecoder (报文转换为对象)
可以有多个ByteToMessageDecoder和MessageToMessageDecoder,进一步的细粒度的对象转换。
/**
* Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
* its {@link ChannelPipeline}.
* 处理IO事件、拦截IO操作或跳转到下一个ChannelHandler处理
* <h3>子类型</h3>
* ChannelHandler 本身未提供IO操作的方法,你通常需要实现它的两个子接口:
* 1.ChannelInboundHandler 处理inbound IO事件
* 2.ChannelOutboundHandler 处理outbound IO操作。
* 或者使用方使的Adapter class:
* 1.ChannelInboundHandlerAdapter 处理inbound IO事件
* 2.ChannelOutboundHandlerAdapter 处理outbound IO操作。
* 3.ChannelDuplexHandler 处理inbound IO事件 和 outbound IO事件
*
* <h3>The context object</h3>
* 每一个ChannelHandler 都对应一个独立的ChannelHandlerContext(ChannelHandlerContext持有ChannelHandler)
* ChannelHandlerContext 是ChannelPipeline与ChannelHandler 的粘合剂。
* 事件来源于ChannelPipeline-》ChannelHandlerContext-》 ChannelHandler。io方法
* 上一个(或下一个)ChannelHandlerContext-》 ChannelHandler。io方法
* 上一个(或下一个)ChannelHandlerContext-》 ChannelHandler。io方法
* ChannelHandlerContext的attr(AttributeKey)可以保存状态信息(线程安全)
* <h3>使用AttributeKey 保存状态信息</h3>
*
*
* <h4>Using {@link AttributeKey}s</h4>
*
* public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
* private final AttributeKey<Boolean> auth = AttributeKey.valueOf("auth")};
* public void channelRead(ChannelHandlerContext ctx, Message message) {
* Attribute<Boolean> attr = ctx.attr(auth);
* Channel ch = ctx.channel();
* if (message instanceof LoginMessage) {
* authenticate((LoginMessage) o);
* attr.set(true);
* } else (message instanceof GetDataMessage) {
* if (Boolean.TRUE.equals(attr.get())) {
* ch.write(fetchSecret((GetDataMessage) o));
* } else {
* fail();
* }
* }
* }
* ...
* }
*
* <h4>The {@code @Sharable} annotation</h4>
* 如果一个ChannelHandler被标记@Sharable,该ChannelHandler可以被添加到一个或多个ChannelPipeline多次。
* 该ChannelHandler必须是线程安全的。
* 如果一个ChannelHandler未标记@Sharable,每添加到ChannelPipeline之前,需要创建一个新的实例,再添到ChannelPipeline中
*
*/
public interface ChannelHandler {
/**
* 当ChannelHandler被添加到ChannelPipeline,并准备好处理IO事件时(register完成后),触发。
* 被调用时机:
* 1. Bootstrap b;b.handler(new ChannelInitializer<SocketChannel>() {//ignore} 当regisger操作(在selector上注
* 册channel)完成后触发
* 2.channel.pipeline.addxxxx(ChannelHandler)方法时,如果已注册的,则立即触发否则等注册后触发
**/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* 当ChannelHandler被从ChannelPipeline移除时(不处理IO事件)
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
* 如果一个ChannelHandler被标记@Sharable,该ChannelHandler可以被添加到一个或多个ChannelPipeline多次。
* 该ChannelHandler必须是线程安全的。
* 如果一个ChannelHandler未标记@Sharable,每添加到ChannelPipeline之前,需要创建一个新的实例,再添到ChannelPipeline中
*
**/
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
/**
* io事件状态改变的回调方法,用户可以实现接口来得到状态改变通知
*/
public interface ChannelInboundHandler extends ChannelHandler {
/**
* 当Channel注册到EventLoop时触发(只会触发一次)
* register事件:即 在jdk的 selector上注册channel , javaChannel().register(eventLoop().selector, 0, this(NioSocketChannel));
* 注册完成后触发channelRegistered
* 事件源:Bootstrap b; b.connect(serverIp, port)时异步触发register,register完成后,触发channelRegistered
*
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* 当Channel从EventLoop注销时触发
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* 当Channel是活动时触发
* 活动:网络connect ==true
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
*
* 当已注册的Channel,不再活动,已到生命周期结束时触发
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
×
* 当Channel从对端接收到数据时触发
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
*
* 当用户事件被触发时调用
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
*
* 当channel的可写入状态改变时触发,可以通过 Channel.isWritable()检查是否可以写入
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
*
* 当前生异常时触发
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
/**
*
* 一个抽像的类实现了ChannelInboundHandler的所有方法,
* 实现仅仅是把事件转发给下一个ChannelHandler,即本类不处理任务事件,可以在子类中
* 覆盖方法实现逻辑。目的:子类可以覆盖需要的方法,其它方法采用默认实现--简单。
* </p>
*/
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
/**
*
* 一个特殊的ChannelInboundHandler实现,目的:提供一个简单的方法,当Channel被注册到EventLoop时触发,用于初始化Channel
* 通常用于Bootstrap#handler(ChannelHandler)、ServerBootstrap#handler(ChannelHandler)、ServerBootstrap#childHandler(ChannelHandler)
* 用于设置Channel的ChannelPipeline,添加子ChannelHandler.
* 子类需要实现initChannel方法
*
*
* 示例 :
* public class MyChannelInitializer extends {@link ChannelInitializer} {
* public void initChannel({@link Channel} channel) {
* channel.pipeline().addLast("myHandler", new MyHandler());
* }
* }
* ServerBootstrap serverBootstrap = ...;
* serverBootstrap.childHandler(new MyChannelInitializer());
*/
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
/**
* 当Channel被注册到EventLoop时触发,initChannel方法中需要对Channel初始化,这时Channel.pipeline中只有一个
* ChannelInitializer(Bootstrap#handler(ChannelInitializer)时添加),
* initChannel方法执行完程后--(已经添加了N个ChannelHandler),自动的从Channel.pipeline移除这个ChannelHandler,
* 原因:初始化对于channel只需要调用一次
*/
protected abstract void initChannel(C ch) throws Exception;
/*
* ignore other methods
*/
}
/**
*
* 本类只处理指定类型的消息体,如果是指定类型的,则channelRead0方法被触发,否则忽略由后续的ChannelHandler处理。
*
* 例如:下面这个类只处理String类型的消息
*
* <pre>
* public class StringHandler extends SimpleChannelInboundHandler<String>{
* protected void channelRead0(ChannelHandlerContext ctx, String message)
* throws Exception{
* System.out.println(message);
* //如果需要触发后面ChannelHander接着处理,需要调用ctx.fireChannelRead
* }
* }
* </pre>
*
*
*/
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
/**
*
* 当消息类型为I时被调用
*/
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}
/**
* {@link ChannelInboundHandlerAdapter} which decodes from one message to an other message.
*
* 消息解码器:将消息由将指定类型的消息转换为另一种类型,如果不是指定类型忽略。
* 例如:下面实现将字符串类型转换为整型(转换为它的长度值)
*
* public class StringToIntegerDecoder extends MessageToMessageDecoder<String>{
* public void decode(ChannelHandlerContext ctx, String message,
* List<Object> out) throws Exception{
* out.add(message.length());
* }
* }
* </pre>
*
*
*/
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {//如果是指定类型
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);//解码
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
for (int i = 0; i < size; i ++) {//List中每一个对象由后续ChannelHandler处理
ctx.fireChannelRead(out.getUnsafe(i));
}
out.recycle();
}
}
/**
* 是指定类型的消息,则调用decode方法进行解码,解码后的对象需要添加到List<Object> out中,以使后续ChannelHandler处理。
*/
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
/**
* 将Base64编码的ByteBuf类型--》解码--》ByteBuf
* 注意:此ChannelHandler之前需要使用适当的另一个ByteToMessageDecoder解码器。例如DelimiterBasedFrameDecoder
* 原因:
*
* // Decoders
* pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters#nulDelimiter()));
* pipeline.addLast("base64Decoder", new Base64Decoder());
*
* // Encoder
* pipeline.addLast("base64Encoder", new Base64Encoder());
* </pre>
*/
@Sharable
public class Base64Decoder extends MessageToMessageDecoder<ByteBuf> {
private final Base64Dialect dialect;
public Base64Decoder() {
this(Base64Dialect.STANDARD);
}
public Base64Decoder(Base64Dialect dialect) {
if (dialect == null) {
throw new NullPointerException("dialect");
}
this.dialect = dialect;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect));
}
}
/**
* 解码一个ByteBuf--》String
* 注意:此ChannelHandler之前需要使用适当的另一个ByteToMessageDecoder解码器。例如DelimiterBasedFrameDecodera或LineBasedFrameDecoder
* // Decoders
* pipeline.addLast("frameDecoder", new {@link LineBasedFrameDecoder}(80));
* pipeline.addLast("stringDecoder", new {@link StringDecoder}(CharsetUtil.UTF_8));
*
* // Encoder
* pipeline.addLast("stringEncoder", new {@link StringEncoder}(CharsetUtil.UTF_8));
* 之后的ChannelHandler可以使用String来代替ByteBuf 做为消息
* void channelRead(ChannelHandlerContext ctx, String msg) {
* ch.write("Did you say '" + msg + "'?n");
* }
* </pre>
*/
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
/*
* ignore some methods
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
}
/**
* 将接收到的ByteBuf转换为字节数组
*
* // Decoders
* pipeline.addLast("frameDecoder",
* new {@link LengthFieldBasedFrameDecoder}(1048576, 0, 4, 0, 4));
* pipeline.addLast("bytesDecoder",
* new {@link ByteArrayDecoder}());
*
* // Encoder
* pipeline.addLast("frameEncoder", new {@link LengthFieldPrepender}(4));
* pipeline.addLast("bytesEncoder", new {@link ByteArrayEncoder}());
*
*/
public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// copy the ByteBuf content to a byte array
byte[] array = new byte[msg.readableBytes()];
msg.getBytes(0, array);
out.add(array);
}
}
/**
*
* 一个抽像的类:将消息 由ByteBuf类型 解码为另一个类型
*
* <h3>Frame detection</h3>
* <p>
* 一般情况下网络发来的的字节流,由pipeline中最早添加的DelimiterBasedFrameDecoder、FixedLengthFrameDecoder
* LengthFieldBasedFrameDecoder、LineBasedFrameDecoder 进行数据段分割。
*
* <p>
* 如果自定义的数据结构(报文),不在上面四种类型范围内,则需要新建一个报文解码器(可以实现ByteToMessageDecoder )。
* 通过检查ByteBuf.readableBytes()确认是否接收到完整的报文,如果是完整的则转换为另一个消息类型,否则
* 不要修改 reader index ,允许更多的字节接收,下次调用时再判断是否完整的数据段。
*
* 检查完整的报文时,不要修改reader index,可以使用 ByteBuf.getInt(int)
*
* 注意:子类一定不要标记为 @Sharable
*
* 一些 ByteBuf方法如ByteBuf.readBytes(int)的返回值没有释放或存入List中、可能引起内存泄漏。建议使用ByteBuf#readSlice(int)
*
*/
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
//积累的ByteBuf,保存上次未读完+本次新增的字节
ByteBuf cumulation;
/**
* 从ByteBuf in 中读取完整的报文,转换为另一的消息对象,添加到List<Object> out中
* 如果没有in中数据不完整 ,不需改变其read index .
* 这个方法可能被多次调用,直到有in中有完整的报文,所以在ByteToMessageDecoder内部有一个状态的ByteBuf cumulation
* 保存积累的字节,因此 ByteToMessageDecoder 不可以是共享的(@Sharable)
*/
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}
/**
*
* 固定长度报文解码器 FixedLengthFrameDecoder 扩展了 ByteToMessageDecoder
* 例如:以下从网络上接收到4个数据包:
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
* 报文固定长度为3,则对数据包中的字节进行分割:
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* 就能获得完整的数据段。
*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
}
/**
* 完整报文中,报文头有一个长度字段,占用几个字节,它指定了报文的字节长度,这样就可以
* 根据字节长度来判断完整的报文
* LengthFieldBasedFrameDecoder 就是此类报文的解码器。
*
* 报文第1,2个字节为长度字段,指定报文长度,长度值不包括这长度字段本身:
* lengthFieldOffset = 0 长度字段在报文中的偏移量
* lengthFieldLength = 2 长度字段本身长度
* lengthAdjustment = 0
* initialBytesToStrip = 0 获取报文时,丢弃前几个字节? 0:是不丢弃
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
*
*
* 报文第1,2个字节为长度字段,指定报文长度,长度值不包括这长度字段本身:
*
* lengthFieldOffset = 0 长度字段在报文中的偏移量
* lengthFieldLength = 2 长度字段本身长度
* lengthAdjustment = 0
* initialBytesToStrip = 2 获取报文时,丢弃前几个字节? 2:即丢度长度字段
*
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
*
*
* 大多数情况下,长度字段值为报文长度(不含长度字段本身),有一些协议,长度字段
* 是报文长度+长度字段本身长度,这种情况下,就需要为lengthAdjustment设置非0值。
* 因为报文长度 = 长度字段值- 长度字段本身长度
*
*
* lengthFieldOffset = 0 长度字段在报文中的偏移量
* lengthFieldLength = 2 长度字段本身长度
* lengthAdjustment = -2 长度字段-2=报文长度
* initialBytesToStrip = 0
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
* </pre>
*
*
* 下面示例报文头部多了一个header ,长度字段从第3个字节开始
*
* lengthFieldOffset = 长度字段在报文中的偏移量 2
* lengthFieldLength = 3 长度字段本身长度
* lengthAdjustment = 0
* initialBytesToStrip = 0 需要保留 header 所以不丢弃任务字节
*
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
* | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
* </pre>
*
*
* 下面示例中在长度字段和报文之间有一个header ,但长度字段值不包括header
* 报文长度= 长度字段值+ header 长度
* lengthFieldOffset = 0 长度字段在报文中的偏移量
* lengthFieldLength = 3 长度字段本身长度
* lengthAdjustment = 2 长度字段值+2 (包括header)
* initialBytesToStrip = 0
*
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
* | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
*
*/
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
/**
* 构建器
*
* @param maxFrameLength 报文的最大长度,如果超过长度,则抛出TooLongFrameException
* @param lengthFieldOffset 长度字段在报文中的偏移量
*
* @param lengthFieldLength 长度字段长度(字节数)
*
* @param lengthAdjustment 调整报文长度 在长度字段上增加或减少数
*
* @param initialBytesToStrip 丢弃前几个字节? 一般用于移除长度字段 0 不移除
*
*/
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
this(
maxFrameLength,
lengthFieldOffset, lengthFieldLength, lengthAdjustment,
initialBytesToStrip, true);
}
}
/**
* 以换行分割的报文解码器
* 换行符:n 或 rn 都处理
*/
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
/**
* Creates a new decoder.
* @param maxLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
*/
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength;
this.failFast = failFast;
this.stripDelimiter = stripDelimiter;
}
}
最后
以上就是谦让外套为你收集整理的Netty4.1源码 :ChannelInboundHandler的全部内容,希望文章能够帮你解决Netty4.1源码 :ChannelInboundHandler所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复