我是靠谱客的博主 生动世界,这篇文章主要介绍netty学习:基于WebSocket协议开发服务端及客户端,实现双向通信,现在分享给大家,希望可以做个参考。

学习过程中有参考Redisson和RocketMq对于netty的使用

一、服务端

1、启动类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Autowired private WebsocketProperties properties; @Autowired private WebSocketChannelInitializer webSocketChannelInitializer; NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); public void start() throws InterruptedException { prepareExecutorService(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(webSocketChannelInitializer); Channel channel = serverBootstrap.bind(properties.getPort()).sync().channel(); log.info("Websocket server run at port: " + properties.getPort()); channel.closeFuture().sync(); }

2、实现ChannelInitializer(暂时未考虑ssl)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Autowired private WebsocketProperties properties; @Autowired private HttpRequestHandler httpRequestHandler; @Autowired private TextWebSocketFrameHandler textWebSocketFrameHandler; @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); SslContext sslContext = configureSslContext(); if (sslContext != null) { pipeline.addLast(sslContext.newHandler(channel.alloc())); } pipeline .addLast(new WebSocketIdleStateHandler()) .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65536)) .addLast(httpRequestHandler) .addLast(PingWebSocketFrameHandler.INSTANCE) .addLast(textWebSocketFrameHandler) .addLast(CloseWebSocketFrameHandler.INSTANCE); }

 配置的空闲检测为读15秒钟,客户端心跳请求为10秒钟一次,需要注意的是,空闲检测handler是有状态的,不能使用@Sharable标注。

利用SimpleChannelInboundHandler实现自动帧类型判断,将不同的帧包数据分别处理,更加清晰

如果需要处理二进制数据,新增一个BinaryWebSocketFrameHandler即可。

二、客户端

1、创建连接

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void start() throws InterruptedException { bootstrap = new Bootstrap(); bootstrap.group(workGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ClientChannelInitializer(this)); channel = bootstrap.connect(config.getHost(), config.getPort()).addListener(future -> { if (future.isSuccess()) { logger.info("WebSocket client connect success"); latch.countDown(); } }).sync().channel(); }

2、在ChannelInitializer中实现了心跳发送和断线重连(基于时间轮延时发送和断线重连)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
private final ConnectionWatchDog connectionWatchDog; private final PingConnectionHandler pingConnectionHandler; private final ClientConfig config; public ClientChannelInitializer(WebsocketClient client) { config = client.getConfig(); if (config.getPingConnectionInterval() > 0) { this.pingConnectionHandler = new PingConnectionHandler(config); } else { this.pingConnectionHandler = null; } this.connectionWatchDog = new ConnectionWatchDog(client); } @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(pingConnectionHandler) .addLast(connectionWatchDog) .addLast(new HttpClientCodec()) .addLast(new HttpObjectAggregator(65536)) .addLast(new WebsocketClientHandShakerHandler(config.getUri())) .addLast(new WebsocketMessageHandler()); } @Sharable private static class WebsocketClientHandShakerHandler extends ChannelInboundHandlerAdapter { private final WebSocketClientHandshaker handshaker; private ChannelPromise channelPromise; WebsocketClientHandShakerHandler(URI uri) { DefaultHttpHeaders headers = new DefaultHttpHeaders(); handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, headers); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (!(msg instanceof FullHttpMessage)) { ctx.fireChannelRead(msg); return; } try { try { if (!handshaker.isHandshakeComplete()) { handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg); ctx.pipeline().remove(this); channelPromise.setSuccess(); } } catch (Exception e) { channelPromise.setFailure(e); } } finally { ReferenceCountUtil.release(msg); } } @Override public void handlerAdded(ChannelHandlerContext ctx) { channelPromise = ctx.newPromise(); } @Override public void channelActive(ChannelHandlerContext ctx) { handshaker.handshake(ctx.channel()); } } @Sharable private static class PingConnectionHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(PingConnectionHandler.class); private final ClientConfig config; public PingConnectionHandler(ClientConfig config) { this.config = config; } @Override public void channelActive(ChannelHandlerContext ctx) { sendPing(ctx); ctx.fireChannelActive(); } private void sendPing(ChannelHandlerContext ctx) { config.getTimer().newTimeout(t -> { if (ctx.channel().isActive()) { ctx.channel().writeAndFlush(new PingWebSocketFrame()); logger.info("[{}] ping.", ctx.channel().id()); // 添加下一个延时任务 sendPing(ctx); } }, config.getPingConnectionInterval(), TimeUnit.MILLISECONDS); } } @Sharable private static class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> { Logger logger = LoggerFactory.getLogger(WebsocketMessageHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) { if (msg instanceof PongWebSocketFrame) { logger.info("[{}] websocket client received pong.", ctx.channel().id()); } else if (msg instanceof CloseWebSocketFrame) { logger.info("[{}] websocket client received close.", ctx.channel().id()); } else if (msg instanceof TextWebSocketFrame) { String text = ((TextWebSocketFrame) msg).text(); logger.info("[{}] websocket client received message: {}", ctx.channel().id(), text); if (!StringUtils.hasLength(text)) { logger.error("received empty message"); return; } // handle message System.out.println("[接收]:" + text); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("[{}] websocket client caught exception: {}", ctx.channel().id(), cause); ctx.channel().close(); } } @Sharable private static class ConnectionWatchDog extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchDog.class); private final WebsocketClient client; private final Bootstrap bootstrap; private final Timer timer; private final ClientConfig config; private final int max = 12; public ConnectionWatchDog(WebsocketClient client) { this.client = client; this.bootstrap = client.getBootstrap(); this.config = client.getConfig(); this.timer = config.getTimer(); } @Override public void channelActive(ChannelHandlerContext ctx) { logger.info("[watchDog][{}] client active", ctx.channel().id()); ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) { logger.info("[watchDog][{}] client inactive", ctx.channel().id()); reconnect(1); ctx.fireChannelInactive(); } private void reconnect(final int attempts) { int timeout = 2 << attempts; if (bootstrap.config().group().isShuttingDown()) { return; } try { timer.newTimeout(t -> tryReConnect(Math.min(max, attempts + 1)), timeout, TimeUnit.SECONDS); } catch (Exception e) { // do nothing } } private void tryReConnect(final int nextAttempts) { if (bootstrap.config().group().isShuttingDown()) { return; } bootstrap.connect(config.getHost(), config.getPort()).addListener((ChannelFutureListener) f -> { if (bootstrap.config().group().isShuttingDown()) { return; } if (f.isSuccess()) { Channel channel = f.channel(); client.refreshChannel(channel); logger.info("[watchDog][{}] reconnect success", channel.id()); return; } logger.warn("[watchDog] reconnect failed. try later..."); reconnect(nextAttempts); }); } }

具体各类型的帧包处理器也可以拆分到单独文件,例如服务端

这样就实现了客户端和服务端的双向通信,并具有基本的心跳保活和断线重连机制。

注意事项:

1、如果既要提供web服务,又要开放netty服务,那么SpringApplication可能导致其中web服务无法建立连接,可以通过使用CommandLineRunnner或者ApplicationRunner中启动netty服务

2、在SimpleChannelInboundHandler中,channelRead时,如果需要复用ByteBuf,当继续向下传递时,需要retain增加一次引用计数,否则会导致IllegalReferenceCountException

3、业务处理最好单独定义个线程池处理,否则长耗时操作会阻塞EventLoop的执行

GitHub地址:netty-im-websocket

最后

以上就是生动世界最近收集整理的关于netty学习:基于WebSocket协议开发服务端及客户端,实现双向通信的全部内容,更多相关netty学习:基于WebSocket协议开发服务端及客户端内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部