我是靠谱客的博主 迅速白开水,最近开发中收集的这篇文章主要介绍深入浅出Netty之四 Client请求处理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前2篇分析了echo server端的运行机制,本篇同样以echo client为例,分析netty的nio客户端的运行机制。

总体来说client端和server端的处理是类似的,NioWorker是重用的,也就意味着client和server的读写机制是一样的,都是通过worker线程来管理的。所不同的是Boss线程,server端的boss线程一个bind端口起一个,主要负责接收新请求,而client端的boss线程是一个可配置的数组,一个connect端口分配一个,主要负责connect过程,如果connect成功则将channle注册到worker线程中处理。在Client同样有PipelineSink,叫做NioClientSocketPipelineSink,也是负责底层IO和pipeline之间的交互。

EchoClient代码:

 


// 初始化Bootstrap和NioClientSocketChannelFactory,这一步将启动nioWorker线程,并初始化NioClientSocketPipelineSink,并将Boss线程创建
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// 用户自定义的pipeline工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new EchoClientHandler(firstMessageSize));
}
});
// 异步创建连接
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
//等待连接关闭
future.getChannel().getCloseFuture().awaitUninterruptibly();
// 关闭资源,线程池等
bootstrap.releaseExternalResources();

具体connect过程:

一.创建client的channel

 

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
......
//拿用户自定义的pipeline
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// 从ChannelFactory中,创建Channel,对于client来说factory是NioClientSocketChannelFactory,Channel是NioClientSocketChannel
Channel ch = getFactory().newChannel(pipeline);
// 通过channel连接
return ch.connect(remoteAddress);
}

 二.创建channel时,分配worker

 

 public SocketChannel newChannel(ChannelPipeline pipeline) {
return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());
}

 三.创建内部的SocketChannel,触发ChannelOpen事件,不过echo client的handler没有对channelOpen事件做处理

 

NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) {
//创建socketChannel,并配置成异步模式
super(null, factory, pipeline, sink, newSocket(), worker);
//触发open事件
fireChannelOpen(this);
}

 四.创建socketChannel过程

 

private static SocketChannel newSocket() {
SocketChannel socket;
//创建SocketChannel
try {
socket = SocketChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
boolean success = false;
//使用异步模式
try {
socket.configureBlocking(false);
success = true;
} catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e);
} finally {
......
}
return socket;
}

 五.通过Channel进行connect

 

 public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
//是否连接成功的future
ChannelFuture future = future(channel, true);
//触发connected的downstream事件,对于echo client来说,由于没有downstream handler,所以直接被PipelineSink处理了
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.CONNECTED, remoteAddress));
return future;
}

 六.NioClientSocketPipelineSink拿到CONNECTED的downstream事件,处理connect

 

case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
private void connect(
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
//尝试连接,如果成功,则直接将channel注册到worker线程中
if (channel.channel.connect(remoteAddress)) {
channel.worker.register(channel, cf);
}
//尝试连接失败,则将channel注册到某个boss线程中处理,该boss线程会接管该channel的connect过程
else {
channel.getCloseFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
if (!cf.isDone()) {
cf.setFailure(new ClosedChannelException());
}
}
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
nextBoss().register(channel);
}
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.worker.close(channel, succeededFuture(channel));
}
}

七.具体register过程

void register(NioClientSocketChannel channel) {
//client的注册任务,让boss线程监听connect事件
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
synchronized (startStopLock) {
//初始化boss线程,并启动之
if (!started) {
//
try {
//创建一个Selector
this.selector = selector =
Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// 启动boss线程
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(this,
"New I/O client boss #" + id + '-' + subId));
success = true;
} finally {
......
started = true;
//异步提交注册任务
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
//根据超时时间,启动一个超时提醒任务
int timeout = channel.getConfig().getConnectTimeoutMillis();
if (timeout > 0) {
if (!channel.isConnected()) {
channel.timoutTimer = timer.newTimeout(wakeupTask,
timeout, TimeUnit.MILLISECONDS);
}
}
}

八.register之后,主线程就返回了,Boss线程异步执行

 


public void run() {
boolean shutdown = false;
int selectReturnsImmediately = 0;
Selector selector = this.selector;
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
for (;;) {
wakenUp.set(false);
try {
long beforeSelect = System.nanoTime();
//select,500ms超时
int selected = SelectorUtil.select(selector);
.......
//处理注册任务,将channel注册到自己的selector范围中
processRegisterTaskQueue();
//处理IO就绪事件,这里是connect事件
processSelectedKeys(selector.selectedKeys());
// 处理超时,如果超时,则设置future失败
long currentTimeNanos = System.nanoTime();
processConnectTimeout(selector.keys(), currentTimeNanos);
......
}

 

 九.注册任务执行

 

public void run() {
try {
//注册一个connect key到boss的selector上
channel.channel.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));
}
//设置超时点
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
}

 

 十.boss处理IO,client是connect就绪

 

private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
......
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
if (!k.isValid()) {
close(k);
continue;
}
try {
//如果connect就位,判断connect是否确实完成
if (k.isConnectable()) {
connect(k);
}
} catch (Throwable t) {
......
}
}
}
private void connect(SelectionKey k) throws IOException {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
//如果连接成功,则将channel注册上worker线程中处理读写
if (ch.channel.finishConnect()) {
k.cancel();
if (ch.timoutTimer != null) {
ch.timoutTimer.cancel();
}
ch.worker.register(ch, ch.connectFuture);
}
}

 

 十一.注册过程

 

public void run() {
......
//默认监听read事件
synchronized (channel.interestOpsLock) {
channel.channel.register(
selector, channel.getRawInterestOps(), channel);
}
//通知等待线程成功
if (future != null) {
channel.setConnected();
future.setSuccess();
}
......
//触发ChannelConnected事件
fireChannelConnected(channel, remoteAddress);
......
}
}

 十二.echo client handler处理channelConnected事件,开始发送数据

 

 public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
// Send the first message.
Server will not send anything here
// because the firstMessage's capacity is 0.
e.getChannel().write(firstMessage);
}

 十三.write之后的过程client端和server是一样的,可以参考前一篇server端的读写

 

 

最后

以上就是迅速白开水为你收集整理的深入浅出Netty之四 Client请求处理的全部内容,希望文章能够帮你解决深入浅出Netty之四 Client请求处理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部