我是靠谱客的博主 无心金针菇,最近开发中收集的这篇文章主要介绍Netty 使用异步线程池执行耗时任务一 任务队列二 在Handler中定义线程池三 在Context中指定线程池,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

当遇到在Handler需要执行耗时较高的操作时候,可以采用异步的方式来解决,多线程异步实现方式有两种:在Handler中添加线程池和在Context中添加线程池。任务队列并没有使用多线程,它是使用同一个线程执行IO操作和运行任务队列中的任务。

一 任务队列

        这种方式运行任务队列线程和事件循环线程是同一个线程,并没有使用新的线程。

       Netty的事件循环EventLoop是一个不断循环着执行读取就绪事件、处理事件、运行任务队列这三个操作的一个线程。事件循环关联了一个任务队列,用于存放耗时较长的业务处理操作。事件循环线程先读取就绪事件,如果任务队列为空则用阻塞一定时间的方式读取,如果任务队列非空,则使用非阻塞的方式读取,读取就绪事件。读取后进行处理事件,记录处理事件花费的时间,在任务队列中不断取出任务执行,默认花费在任务队列上取执行任务的时间和处理事件的时间相同,可以使用ioradio参数调整IO操作和非IO操作花费的时间比例,到时间后又去读取就绪事件。

在处理器中使用ChannelHandlerContext获取channel获取事件循环提交任务
package com.tech.netty.netty.source.async;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.util.HashSet;
import java.util.Set;

/**
 * @author lw
 * @since 2021/8/11
 **/
public class NettyServer {

    public static void main(String[] args) throws InterruptedException {

        //1 创建两个线程组bossGroup workerGroup
        //2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求
        //3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务器端启动对象,进行参数配置
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)  //设置两个线程组
                    .channel(NioServerSocketChannel.class)  //设置服务器端通道为NioServerSocketChannel
                    .option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接
                    .childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            //启动服务器并绑定一个端口
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            System.out.println("ok");
            //对通道的关闭事件进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.tech.netty.netty.source.async;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * @author lw
 * @since 2021/8/11
 **/
//自定义一个Handler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("channel={} | handlerAdded",ctx.channel().id());
    }

    //读取客户端发送的消息
    // ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
    // msg 是客户端发送的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服务器读取线程 "+Thread.currentThread().getName());
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));

        ctx.channel().eventLoop().execute(() -> {
            try {
                log.info("第一个任务开始执行");
                Thread.sleep(5*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
            log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
        });

        ctx.channel().eventLoop().execute(() -> {
            try {
                log.info("第二个任务开始执行");
                Thread.sleep(5*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
            log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
        });

        ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
        log.info("go on...");
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    //处理异常 一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
package com.tech.netty.netty.source.async;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author lw
 * @since 2021/8/11
 **/
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客户端需要一个事件循环组
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            // 创建客户端启动对象 并配置参数

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)  //设置线程组
                    .channel(NioSocketChannel.class)  //设置客户端通道
                    .handler(new ChannelInitializer<SocketChannel>() {  //设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("...客户端 is ready...");
            //连接服务器
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
            //对通道关闭事件进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventExecutors.shutdownGracefully();
        }

    }
}
package com.tech.netty.netty.source.async;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author lw
 * @since 2021/8/11
 **/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //当通道就绪会触发该方法 给服务器发送数据
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务器", CharsetUtil.UTF_8));
    }

    //当通道有读取事件时触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
        ctx.close();
    }
}
通过结果可以看到

handler线程和运行任务队列里任务的线程是同一个线程
任务队列里任务按序逐一取出执行

二 在Handler中定义线程池

通过在Handler中定义一个线程池,执行耗时较高的操作,IO线程执行IO操作,业务操作由业务线程来处理,提高IO效率,如果业务操作中需要进行IO操作,则在调用的时候Netty会判断当前线程是否为IO线程,如果不是则会则会将IO操作添加到对应事件循环的任务队列中,有事件循环的IO线程来运行。

package com.tech.netty.netty.source.async;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

/**
 * @author lw
 * @since 2021/8/11
 **/
//自定义一个Handler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    static final EventExecutorGroup group=new DefaultEventExecutorGroup(16);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("channel={} | handlerAdded",ctx.channel().id());
    }

    //读取客户端发送的消息
    // ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
    // msg 是客户端发送的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        log.info("服务器读取线程 "+Thread.currentThread().getName());
//        ByteBuf byteBuf = (ByteBuf) msg;
//        log.info("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
//
//        ctx.channel().eventLoop().execute(() -> {
//            try {
//                log.info("第一个任务开始执行");
//                Thread.sleep(5*1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
//            log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
//        });
//
//        ctx.channel().eventLoop().execute(() -> {
//            try {
//                log.info("第二个任务开始执行");
//                Thread.sleep(5*1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
//            log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
//        });
//
//        ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
        final Object msgCop=msg;
        final ChannelHandlerContext ctxCop=ctx;
        //在handler中添加业务线程池 执行耗时任务
        group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                ByteBuf buff = (ByteBuf) msgCop;
                byte[] bytes = new byte[buff.readableBytes()];
                buff.readBytes(bytes);
                String s = new String(bytes, CharsetUtil.UTF_8);
                log.info("读取到消息:{}",s);
                Thread.sleep(10*1000);
                ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client",CharsetUtil.UTF_8));
                log.info("发送完成");
                return null;
            }
        });
        group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                ByteBuf buff = (ByteBuf) msgCop;
                byte[] bytes = new byte[buff.readableBytes()];
                buff.readBytes(bytes);
                String s = new String(bytes, CharsetUtil.UTF_8);
                log.info("1读取到消息:{}",s);
                Thread.sleep(10*1000);
                ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client1",CharsetUtil.UTF_8));
                log.info("1发送完成");
                return null;
            }
        });
        log.info("go on...");
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    //处理异常 一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

执行结果:

IO线程打印了go on
两个业务线程同时开始读取消息,休眠10s 发送消息

实现异步执行。

三 在Context中指定线程池

在pipeline添加handler时,指定业务线程池,则handler中的非IO操作都由业务线程来运行,遇到IO操作将操作封装为Task提交到任务队列,由事件循环的线程来运行任务队列中的任务时执行。

package com.tech.netty.netty.source.async;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.HashSet;
import java.util.Set;

/**
 * @author lw
 * @since 2021/8/11
 **/
public class NettyServer {

    static final EventExecutorGroup group=new DefaultEventExecutorGroup(16);

    public static void main(String[] args) throws InterruptedException {

        //1 创建两个线程组bossGroup workerGroup
        //2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求
        //3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务器端启动对象,进行参数配置
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)  //设置两个线程组
                    .channel(NioServerSocketChannel.class)  //设置服务器端通道为NioServerSocketChannel
                    .option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接
                    .childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(group,new NettyServerHandler());
                        }
                    });
            //启动服务器并绑定一个端口
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            System.out.println("ok");
            //对通道的关闭事件进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.tech.netty.netty.source.async;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

/**
 * @author lw
 * @since 2021/8/11
 **/
//自定义一个Handler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("channel={} | handlerAdded", ctx.channel().id());
    }

    //读取客户端发送的消息
    // ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
    // msg 是客户端发送的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        log.info("服务器读取线程 "+Thread.currentThread().getName());
//        ByteBuf byteBuf = (ByteBuf) msg;
//        log.info("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
//
//        ctx.channel().eventLoop().execute(() -> {
//            try {
//                log.info("第一个任务开始执行");
//                Thread.sleep(5*1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
//            log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
//        });
//
//        ctx.channel().eventLoop().execute(() -> {
//            try {
//                log.info("第二个任务开始执行");
//                Thread.sleep(5*1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
//            log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
//        });
//
//        ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
        final Object msgCop = msg;
        final ChannelHandlerContext ctxCop = ctx;
        //在handler中添加业务线程池 执行耗时任务
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//                ByteBuf buff = (ByteBuf) msgCop;
//                byte[] bytes = new byte[buff.readableBytes()];
//                buff.readBytes(bytes);
//                String s = new String(bytes, CharsetUtil.UTF_8);
//                log.info("读取到消息:{}",s);
//                Thread.sleep(10*1000);
//                ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client",CharsetUtil.UTF_8));
//                log.info("发送完成");
//                return null;
//            }
//        });
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//                ByteBuf buff = (ByteBuf) msgCop;
//                byte[] bytes = new byte[buff.readableBytes()];
//                buff.readBytes(bytes);
//                String s = new String(bytes, CharsetUtil.UTF_8);
//                log.info("1读取到消息:{}",s);
//                Thread.sleep(10*1000);
//                ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client1",CharsetUtil.UTF_8));
//                log.info("1发送完成");
//                return null;
//            }
//        });

        ByteBuf buff = (ByteBuf) msgCop;
        byte[] bytes = new byte[buff.readableBytes()];
        buff.readBytes(bytes);
        String s = new String(bytes, CharsetUtil.UTF_8);
        log.info("读取到消息:{}", s);
        Thread.sleep(10 * 1000);
        ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client", CharsetUtil.UTF_8));
        log.info("发送完成");

        log.info("go on...");
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    //处理异常 一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

这样NettyServerHandler中的操作将会使用业务线程运行,在调用IO操作时(读写数据)会提交任务队列由时间循环线程执行。

当给handler指定线程池后,会发现将任务提交到了队列

于此同时日志前缀输出不再是nioEventLoopGroup,而是 DefaultEventExecutorGroup,说明配置的线程池生效。

 

最后

以上就是无心金针菇为你收集整理的Netty 使用异步线程池执行耗时任务一 任务队列二 在Handler中定义线程池三 在Context中指定线程池的全部内容,希望文章能够帮你解决Netty 使用异步线程池执行耗时任务一 任务队列二 在Handler中定义线程池三 在Context中指定线程池所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部