概述
当遇到在Handler需要执行耗时较高的操作时候,可以采用异步的方式来解决,多线程异步实现方式有两种:在Handler中添加线程池和在Context中添加线程池。任务队列并没有使用多线程,它是使用同一个线程执行IO操作和运行任务队列中的任务。
一 任务队列
这种方式运行任务队列线程和事件循环线程是同一个线程,并没有使用新的线程。
Netty的事件循环EventLoop是一个不断循环着执行读取就绪事件、处理事件、运行任务队列这三个操作的一个线程。事件循环关联了一个任务队列,用于存放耗时较长的业务处理操作。事件循环线程先读取就绪事件,如果任务队列为空则用阻塞一定时间的方式读取,如果任务队列非空,则使用非阻塞的方式读取,读取就绪事件。读取后进行处理事件,记录处理事件花费的时间,在任务队列中不断取出任务执行,默认花费在任务队列上取执行任务的时间和处理事件的时间相同,可以使用ioradio参数调整IO操作和非IO操作花费的时间比例,到时间后又去读取就绪事件。
在处理器中使用ChannelHandlerContext获取channel获取事件循环提交任务
package com.tech.netty.netty.source.async;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.HashSet;
import java.util.Set;
/**
* @author lw
* @since 2021/8/11
**/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1 创建两个线程组bossGroup workerGroup
//2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求
//3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端启动对象,进行参数配置
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //设置服务器端通道为NioServerSocketChannel
.option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接
.childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
//启动服务器并绑定一个端口
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
System.out.println("ok");
//对通道的关闭事件进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.tech.netty.netty.source.async;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @author lw
* @since 2021/8/11
**/
//自定义一个Handler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("channel={} | handlerAdded",ctx.channel().id());
}
//读取客户端发送的消息
// ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
// msg 是客户端发送的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务器读取线程 "+Thread.currentThread().getName());
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
ctx.channel().eventLoop().execute(() -> {
try {
log.info("第一个任务开始执行");
Thread.sleep(5*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
});
ctx.channel().eventLoop().execute(() -> {
try {
log.info("第二个任务开始执行");
Thread.sleep(5*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
});
ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
log.info("go on...");
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
//处理异常 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
package com.tech.netty.netty.source.async;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author lw
* @since 2021/8/11
**/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
// 创建客户端启动对象 并配置参数
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors) //设置线程组
.channel(NioSocketChannel.class) //设置客户端通道
.handler(new ChannelInitializer<SocketChannel>() { //设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("...客户端 is ready...");
//连接服务器
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
//对通道关闭事件进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
package com.tech.netty.netty.source.async;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author lw
* @since 2021/8/11
**/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪会触发该方法 给服务器发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务器", CharsetUtil.UTF_8));
}
//当通道有读取事件时触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
通过结果可以看到
handler线程和运行任务队列里任务的线程是同一个线程
任务队列里任务按序逐一取出执行
二 在Handler中定义线程池
通过在Handler中定义一个线程池,执行耗时较高的操作,IO线程执行IO操作,业务操作由业务线程来处理,提高IO效率,如果业务操作中需要进行IO操作,则在调用的时候Netty会判断当前线程是否为IO线程,如果不是则会则会将IO操作添加到对应事件循环的任务队列中,有事件循环的IO线程来运行。
package com.tech.netty.netty.source.async;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
/**
* @author lw
* @since 2021/8/11
**/
//自定义一个Handler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
static final EventExecutorGroup group=new DefaultEventExecutorGroup(16);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("channel={} | handlerAdded",ctx.channel().id());
}
//读取客户端发送的消息
// ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
// msg 是客户端发送的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.info("服务器读取线程 "+Thread.currentThread().getName());
// ByteBuf byteBuf = (ByteBuf) msg;
// log.info("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
//
// ctx.channel().eventLoop().execute(() -> {
// try {
// log.info("第一个任务开始执行");
// Thread.sleep(5*1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
// log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
// });
//
// ctx.channel().eventLoop().execute(() -> {
// try {
// log.info("第二个任务开始执行");
// Thread.sleep(5*1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
// log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
// });
//
// ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
final Object msgCop=msg;
final ChannelHandlerContext ctxCop=ctx;
//在handler中添加业务线程池 执行耗时任务
group.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
ByteBuf buff = (ByteBuf) msgCop;
byte[] bytes = new byte[buff.readableBytes()];
buff.readBytes(bytes);
String s = new String(bytes, CharsetUtil.UTF_8);
log.info("读取到消息:{}",s);
Thread.sleep(10*1000);
ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client",CharsetUtil.UTF_8));
log.info("发送完成");
return null;
}
});
group.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
ByteBuf buff = (ByteBuf) msgCop;
byte[] bytes = new byte[buff.readableBytes()];
buff.readBytes(bytes);
String s = new String(bytes, CharsetUtil.UTF_8);
log.info("1读取到消息:{}",s);
Thread.sleep(10*1000);
ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client1",CharsetUtil.UTF_8));
log.info("1发送完成");
return null;
}
});
log.info("go on...");
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//处理异常 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
执行结果:
IO线程打印了go on
两个业务线程同时开始读取消息,休眠10s 发送消息
实现异步执行。
三 在Context中指定线程池
在pipeline添加handler时,指定业务线程池,则handler中的非IO操作都由业务线程来运行,遇到IO操作将操作封装为Task提交到任务队列,由事件循环的线程来运行任务队列中的任务时执行。
package com.tech.netty.netty.source.async;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.HashSet;
import java.util.Set;
/**
* @author lw
* @since 2021/8/11
**/
public class NettyServer {
static final EventExecutorGroup group=new DefaultEventExecutorGroup(16);
public static void main(String[] args) throws InterruptedException {
//1 创建两个线程组bossGroup workerGroup
//2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求
//3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端启动对象,进行参数配置
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //设置服务器端通道为NioServerSocketChannel
.option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接
.childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(group,new NettyServerHandler());
}
});
//启动服务器并绑定一个端口
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
System.out.println("ok");
//对通道的关闭事件进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.tech.netty.netty.source.async;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
/**
* @author lw
* @since 2021/8/11
**/
//自定义一个Handler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("channel={} | handlerAdded", ctx.channel().id());
}
//读取客户端发送的消息
// ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
// msg 是客户端发送的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.info("服务器读取线程 "+Thread.currentThread().getName());
// ByteBuf byteBuf = (ByteBuf) msg;
// log.info("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
//
// ctx.channel().eventLoop().execute(() -> {
// try {
// log.info("第一个任务开始执行");
// Thread.sleep(5*1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
// log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
// });
//
// ctx.channel().eventLoop().execute(() -> {
// try {
// log.info("第二个任务开始执行");
// Thread.sleep(5*1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
// log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
// });
//
// ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
final Object msgCop = msg;
final ChannelHandlerContext ctxCop = ctx;
//在handler中添加业务线程池 执行耗时任务
// group.submit(new Callable<Object>() {
// @Override
// public Object call() throws Exception {
// ByteBuf buff = (ByteBuf) msgCop;
// byte[] bytes = new byte[buff.readableBytes()];
// buff.readBytes(bytes);
// String s = new String(bytes, CharsetUtil.UTF_8);
// log.info("读取到消息:{}",s);
// Thread.sleep(10*1000);
// ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client",CharsetUtil.UTF_8));
// log.info("发送完成");
// return null;
// }
// });
// group.submit(new Callable<Object>() {
// @Override
// public Object call() throws Exception {
// ByteBuf buff = (ByteBuf) msgCop;
// byte[] bytes = new byte[buff.readableBytes()];
// buff.readBytes(bytes);
// String s = new String(bytes, CharsetUtil.UTF_8);
// log.info("1读取到消息:{}",s);
// Thread.sleep(10*1000);
// ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client1",CharsetUtil.UTF_8));
// log.info("1发送完成");
// return null;
// }
// });
ByteBuf buff = (ByteBuf) msgCop;
byte[] bytes = new byte[buff.readableBytes()];
buff.readBytes(bytes);
String s = new String(bytes, CharsetUtil.UTF_8);
log.info("读取到消息:{}", s);
Thread.sleep(10 * 1000);
ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client", CharsetUtil.UTF_8));
log.info("发送完成");
log.info("go on...");
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//处理异常 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
这样NettyServerHandler中的操作将会使用业务线程运行,在调用IO操作时(读写数据)会提交任务队列由时间循环线程执行。
当给handler指定线程池后,会发现将任务提交到了队列
于此同时日志前缀输出不再是nioEventLoopGroup,而是 DefaultEventExecutorGroup,说明配置的线程池生效。
最后
以上就是无心金针菇为你收集整理的Netty 使用异步线程池执行耗时任务一 任务队列二 在Handler中定义线程池三 在Context中指定线程池的全部内容,希望文章能够帮你解决Netty 使用异步线程池执行耗时任务一 任务队列二 在Handler中定义线程池三 在Context中指定线程池所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复