我是靠谱客的博主 生动世界,最近开发中收集的这篇文章主要介绍netty学习:基于WebSocket协议开发服务端及客户端,实现双向通信,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

学习过程中有参考Redisson和RocketMq对于netty的使用

一、服务端

1、启动类

    @Autowired
    private WebsocketProperties properties;
    @Autowired
    private WebSocketChannelInitializer webSocketChannelInitializer;

    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();

    public void start() throws InterruptedException {
        prepareExecutorService();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(webSocketChannelInitializer);

        Channel channel = serverBootstrap.bind(properties.getPort()).sync().channel();
        log.info("Websocket server run at port: " + properties.getPort());
        channel.closeFuture().sync();
    }

2、实现ChannelInitializer(暂时未考虑ssl)

    @Autowired
    private WebsocketProperties properties;
    @Autowired
    private HttpRequestHandler httpRequestHandler;
    @Autowired
    private TextWebSocketFrameHandler textWebSocketFrameHandler;

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        SslContext sslContext = configureSslContext();
        if (sslContext != null) {
            pipeline.addLast(sslContext.newHandler(channel.alloc()));
        }
        pipeline
                .addLast(new WebSocketIdleStateHandler())
                .addLast(new HttpServerCodec())
                .addLast(new HttpObjectAggregator(65536))
                .addLast(httpRequestHandler)
                .addLast(PingWebSocketFrameHandler.INSTANCE)
                .addLast(textWebSocketFrameHandler)
                .addLast(CloseWebSocketFrameHandler.INSTANCE);
    }

 配置的空闲检测为读15秒钟,客户端心跳请求为10秒钟一次,需要注意的是,空闲检测handler是有状态的,不能使用@Sharable标注。

利用SimpleChannelInboundHandler实现自动帧类型判断,将不同的帧包数据分别处理,更加清晰

如果需要处理二进制数据,新增一个BinaryWebSocketFrameHandler即可。

二、客户端

1、创建连接

public void start() throws InterruptedException {
        bootstrap = new Bootstrap();
        bootstrap.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ClientChannelInitializer(this));

        channel = bootstrap.connect(config.getHost(), config.getPort()).addListener(future -> {
            if (future.isSuccess()) {
                logger.info("WebSocket client connect success");
                latch.countDown();
            }
        }).sync().channel();
    }

2、在ChannelInitializer中实现了心跳发送和断线重连(基于时间轮延时发送和断线重连)

private final ConnectionWatchDog connectionWatchDog;
    private final PingConnectionHandler pingConnectionHandler;
    private final ClientConfig config;

    public ClientChannelInitializer(WebsocketClient client) {
        config = client.getConfig();
        if (config.getPingConnectionInterval() > 0) {
            this.pingConnectionHandler = new PingConnectionHandler(config);
        } else {
            this.pingConnectionHandler = null;
        }
        this.connectionWatchDog = new ConnectionWatchDog(client);
    }

    @Override
    protected void initChannel(SocketChannel ch) {
        ch.pipeline()
                .addLast(pingConnectionHandler)
                .addLast(connectionWatchDog)
                .addLast(new HttpClientCodec())
                .addLast(new HttpObjectAggregator(65536))
                .addLast(new WebsocketClientHandShakerHandler(config.getUri()))
                .addLast(new WebsocketMessageHandler());
    }

    @Sharable
    private static class WebsocketClientHandShakerHandler extends ChannelInboundHandlerAdapter {
        private final WebSocketClientHandshaker handshaker;
        private ChannelPromise channelPromise;

        WebsocketClientHandShakerHandler(URI uri) {
            DefaultHttpHeaders headers = new DefaultHttpHeaders();
            handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, headers);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (!(msg instanceof FullHttpMessage)) {
                ctx.fireChannelRead(msg);
                return;
            }
            try {
                try {
                    if (!handshaker.isHandshakeComplete()) {
                        handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
                        ctx.pipeline().remove(this);
                        channelPromise.setSuccess();
                    }
                } catch (Exception e) {
                    channelPromise.setFailure(e);
                }
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            channelPromise = ctx.newPromise();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            handshaker.handshake(ctx.channel());
        }
    }

    @Sharable
    private static class PingConnectionHandler extends ChannelInboundHandlerAdapter {
        private static final Logger logger = LoggerFactory.getLogger(PingConnectionHandler.class);
        private final ClientConfig config;

        public PingConnectionHandler(ClientConfig config) {
            this.config = config;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            sendPing(ctx);
            ctx.fireChannelActive();
        }

        private void sendPing(ChannelHandlerContext ctx) {
            config.getTimer().newTimeout(t -> {
                if (ctx.channel().isActive()) {
                    ctx.channel().writeAndFlush(new PingWebSocketFrame());
                    logger.info("[{}] ping.", ctx.channel().id());
                    // 添加下一个延时任务
                    sendPing(ctx);
                }
            }, config.getPingConnectionInterval(), TimeUnit.MILLISECONDS);
        }
    }

    @Sharable
    private static class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

        Logger logger = LoggerFactory.getLogger(WebsocketMessageHandler.class);

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) {
            if (msg instanceof PongWebSocketFrame) {
                logger.info("[{}] websocket client received pong.", ctx.channel().id());
            } else if (msg instanceof CloseWebSocketFrame) {
                logger.info("[{}] websocket client received close.", ctx.channel().id());
            } else if (msg instanceof TextWebSocketFrame) {
                String text = ((TextWebSocketFrame) msg).text();
                logger.info("[{}] websocket client received message: {}", ctx.channel().id(), text);
                if (!StringUtils.hasLength(text)) {
                    logger.error("received empty message");
                    return;
                }
                // handle message
                System.out.println("[接收]:" + text);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.error("[{}] websocket client caught exception: {}", ctx.channel().id(), cause);
            ctx.channel().close();
        }
    }

    @Sharable
    private static class ConnectionWatchDog extends ChannelInboundHandlerAdapter {
        private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchDog.class);

        private final WebsocketClient client;
        private final Bootstrap bootstrap;
        private final Timer timer;
        private final ClientConfig config;
        private final int max = 12;

        public ConnectionWatchDog(WebsocketClient client) {
            this.client = client;
            this.bootstrap = client.getBootstrap();
            this.config = client.getConfig();
            this.timer = config.getTimer();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            logger.info("[watchDog][{}] client active", ctx.channel().id());
            ctx.fireChannelActive();
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            logger.info("[watchDog][{}] client inactive", ctx.channel().id());
            reconnect(1);
            ctx.fireChannelInactive();
        }

        private void reconnect(final int attempts) {
            int timeout = 2 << attempts;
            if (bootstrap.config().group().isShuttingDown()) {
                return;
            }
            try {
                timer.newTimeout(t -> tryReConnect(Math.min(max, attempts + 1)), timeout, TimeUnit.SECONDS);
            } catch (Exception e) {
                // do nothing
            }
        }

        private void tryReConnect(final int nextAttempts) {
            if (bootstrap.config().group().isShuttingDown()) {
                return;
            }
            bootstrap.connect(config.getHost(), config.getPort()).addListener((ChannelFutureListener) f -> {
                if (bootstrap.config().group().isShuttingDown()) {
                    return;
                }
                if (f.isSuccess()) {
                    Channel channel = f.channel();
                    client.refreshChannel(channel);
                    logger.info("[watchDog][{}] reconnect success", channel.id());
                    return;
                }
                logger.warn("[watchDog] reconnect failed. try later...");
                reconnect(nextAttempts);
            });
        }
    }

具体各类型的帧包处理器也可以拆分到单独文件,例如服务端

这样就实现了客户端和服务端的双向通信,并具有基本的心跳保活和断线重连机制。

注意事项:

1、如果既要提供web服务,又要开放netty服务,那么SpringApplication可能导致其中web服务无法建立连接,可以通过使用CommandLineRunnner或者ApplicationRunner中启动netty服务

2、在SimpleChannelInboundHandler中,channelRead时,如果需要复用ByteBuf,当继续向下传递时,需要retain增加一次引用计数,否则会导致IllegalReferenceCountException

3、业务处理最好单独定义个线程池处理,否则长耗时操作会阻塞EventLoop的执行

GitHub地址:netty-im-websocket

最后

以上就是生动世界为你收集整理的netty学习:基于WebSocket协议开发服务端及客户端,实现双向通信的全部内容,希望文章能够帮你解决netty学习:基于WebSocket协议开发服务端及客户端,实现双向通信所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(35)

评论列表共有 0 条评论

立即
投稿
返回
顶部