概述
概述
Channel接口针对Channel的读入和写出IO事件的处理,定义了两个拓展接口:ChannelInboundHandler用于定义对读入IO事件的处理,ChannelOutboundHandler用于定义写出IO事件的处理。
ChannelInboundHandler
ChannelInboundHandler接口定义
/**
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user
* to hook in to state changes easily.
*/
public interface ChannelInboundHandler extends ChannelHandler {
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when the current {@link Channel} has read a message from the peer.
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
ChannelInboundHandlerAdapter
-
ChannelInboundHandler的默认实现类,方便使用,用户通过拓展该类,只重写与自身关注的IO事件的处理的方法。ChannelInboundHandlerAdapter的每个方法的默认实现都是通过ChannelHandlerContext将IO事件或接收到的数据,传给所在的ChannelPipeline的下一个ChannelInboundHandler:
-
业务处理逻辑处理:用户可以通过拓展ChannelInboundHandlerAdapter,重写相应的方法来,生成新的子类的方式来定义业务需要的处理逻辑,Netty默认针对特定功能的处理,提供了一些ChannelInboundHandler的实现类,详见下面分析。
-
对于从Channel读入的数据,在调用channelRead方法处理时,默认实现也是传给下一个ChannelInboundHandler处理,不会销毁该数据对象,释放掉该数据所占用的空间的:
如果数据只需在当前ChannelInboundHandler处理,而不需要继续往下传输,则可以调用ReferenceCountUtil.release(msg)手动释放掉,或者拓展SimpleChannelInboundHandler类。 -
SimpleChannelInboundHandler
SimpleChannelInboundHandler为ChannelInboundHandlerAdapter的一个拓展实现,重写了channelRead方法,并提供了一个抽象方法channelRead0(在netty 5.0之后channelRead0方法名称变成了messageReceived)供用户实现自身的数据处理逻辑:
针对Channel读入的数据的处理,提供了两个功能:- 特定类型数据处理:用户通过拓展实现SimpleChannelInboundHandler来定义业务处理的ChannelInboundHandler时,通过指定泛型参数I,限制该ChannelInboundHandler只处理Channel读入的类型为I的数据;
- 自动释放数据:如果Channel读入的数据的类型与泛型参数I匹配,则在该ChannelInboundHandler处理掉了,在finally中,根据调用构造函数创建ChannelInboundHandler时,是否需要自动释放数据(默认为true),来自动进行数据空间的释放。
- 数据的保留:如果用户在实现channelRead0方法自定义数据处理逻辑时,需要将该数据传给下一个ChannelInboundHandler,则需要调用ReferenceCountUtil.retain(msg)方法,原理是将msg的引用计数加1,因为ReferenceCountUtil.release(msg)是将msg的引用计数减1,同时当引用计数变成0时,释放该数据:
public class StringHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String message)
throws Exception {
System.out.println(message);
// 不释放数据,交给下一个ChannelInboundHandler继续处理
ReferenceCountUtil.retain(message);
ctx.fireChannelRead(message);
}
}
ChannelOutboundHandler
ChannelOutboundHandler接口定义
/**
* {@link ChannelHandler} which will get notified for IO-outbound-operations.
*/
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* Called once a bind operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a connect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the connect operation is made
* @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a disconnect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a close operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
* Called once a write operation is made. The write operation will write the messages through the
* {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
* {@link Channel#flush()} is called
*
* @param ctx the {@link ChannelHandlerContext} for which the write operation is made
* @param msg the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
* Called once a flush operation is made. The flush operation will try to flush out all previous written messages
* that are pending.
*
* @param ctx the {@link ChannelHandlerContext} for which the flush operation is made
* @throws Exception thrown if an error occurs
*/
void flush(ChannelHandlerContext ctx) throws Exception;
}
ChannelOutboundHandlerAdapter
与ChannelInboundHandlerAdapter的作用类似,方法默认实现也是通过ChannelHandlerContext将写出的数据,交给下一个ChannelOutboundHandler处理。
ChannelDuplexHandler
同时具有ChannelInboundHandler和ChannelInboundHandler的功能,可以实现相应方法,对Channel的读入数据和写出数据都进行处理,默认实现也是交给下一个ChannelHandler处理。
Netty提供的ChannelHandler实现
Netty在handler子模块,针对不同的功能,包括流量控制,数据flush,ip过滤,日志,ssl,大数据流处理,超时心跳检测,拥塞控制,提供了相应的ChannelHandler实现类,如图:
- 示例:心跳检测与长连接维持
Netty源码分析-基于Netty的心跳检测机制实现长连接
最后
以上就是老实舞蹈为你收集整理的Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler的全部内容,希望文章能够帮你解决Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复