概述
学习过程中有参考Redisson和RocketMq对于netty的使用
一、服务端
1、启动类
@Autowired
private WebsocketProperties properties;
@Autowired
private WebSocketChannelInitializer webSocketChannelInitializer;
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
public void start() throws InterruptedException {
prepareExecutorService();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(webSocketChannelInitializer);
Channel channel = serverBootstrap.bind(properties.getPort()).sync().channel();
log.info("Websocket server run at port: " + properties.getPort());
channel.closeFuture().sync();
}
2、实现ChannelInitializer(暂时未考虑ssl)
@Autowired
private WebsocketProperties properties;
@Autowired
private HttpRequestHandler httpRequestHandler;
@Autowired
private TextWebSocketFrameHandler textWebSocketFrameHandler;
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
SslContext sslContext = configureSslContext();
if (sslContext != null) {
pipeline.addLast(sslContext.newHandler(channel.alloc()));
}
pipeline
.addLast(new WebSocketIdleStateHandler())
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(httpRequestHandler)
.addLast(PingWebSocketFrameHandler.INSTANCE)
.addLast(textWebSocketFrameHandler)
.addLast(CloseWebSocketFrameHandler.INSTANCE);
}
配置的空闲检测为读15秒钟,客户端心跳请求为10秒钟一次,需要注意的是,空闲检测handler是有状态的,不能使用@Sharable标注。
利用SimpleChannelInboundHandler实现自动帧类型判断,将不同的帧包数据分别处理,更加清晰
如果需要处理二进制数据,新增一个BinaryWebSocketFrameHandler即可。
二、客户端
1、创建连接
public void start() throws InterruptedException {
bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ClientChannelInitializer(this));
channel = bootstrap.connect(config.getHost(), config.getPort()).addListener(future -> {
if (future.isSuccess()) {
logger.info("WebSocket client connect success");
latch.countDown();
}
}).sync().channel();
}
2、在ChannelInitializer中实现了心跳发送和断线重连(基于时间轮延时发送和断线重连)
private final ConnectionWatchDog connectionWatchDog;
private final PingConnectionHandler pingConnectionHandler;
private final ClientConfig config;
public ClientChannelInitializer(WebsocketClient client) {
config = client.getConfig();
if (config.getPingConnectionInterval() > 0) {
this.pingConnectionHandler = new PingConnectionHandler(config);
} else {
this.pingConnectionHandler = null;
}
this.connectionWatchDog = new ConnectionWatchDog(client);
}
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(pingConnectionHandler)
.addLast(connectionWatchDog)
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebsocketClientHandShakerHandler(config.getUri()))
.addLast(new WebsocketMessageHandler());
}
@Sharable
private static class WebsocketClientHandShakerHandler extends ChannelInboundHandlerAdapter {
private final WebSocketClientHandshaker handshaker;
private ChannelPromise channelPromise;
WebsocketClientHandShakerHandler(URI uri) {
DefaultHttpHeaders headers = new DefaultHttpHeaders();
handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, headers);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof FullHttpMessage)) {
ctx.fireChannelRead(msg);
return;
}
try {
try {
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
ctx.pipeline().remove(this);
channelPromise.setSuccess();
}
} catch (Exception e) {
channelPromise.setFailure(e);
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
channelPromise = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
handshaker.handshake(ctx.channel());
}
}
@Sharable
private static class PingConnectionHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(PingConnectionHandler.class);
private final ClientConfig config;
public PingConnectionHandler(ClientConfig config) {
this.config = config;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
sendPing(ctx);
ctx.fireChannelActive();
}
private void sendPing(ChannelHandlerContext ctx) {
config.getTimer().newTimeout(t -> {
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(new PingWebSocketFrame());
logger.info("[{}] ping.", ctx.channel().id());
// 添加下一个延时任务
sendPing(ctx);
}
}, config.getPingConnectionInterval(), TimeUnit.MILLISECONDS);
}
}
@Sharable
private static class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
Logger logger = LoggerFactory.getLogger(WebsocketMessageHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) {
if (msg instanceof PongWebSocketFrame) {
logger.info("[{}] websocket client received pong.", ctx.channel().id());
} else if (msg instanceof CloseWebSocketFrame) {
logger.info("[{}] websocket client received close.", ctx.channel().id());
} else if (msg instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) msg).text();
logger.info("[{}] websocket client received message: {}", ctx.channel().id(), text);
if (!StringUtils.hasLength(text)) {
logger.error("received empty message");
return;
}
// handle message
System.out.println("[接收]:" + text);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("[{}] websocket client caught exception: {}", ctx.channel().id(), cause);
ctx.channel().close();
}
}
@Sharable
private static class ConnectionWatchDog extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchDog.class);
private final WebsocketClient client;
private final Bootstrap bootstrap;
private final Timer timer;
private final ClientConfig config;
private final int max = 12;
public ConnectionWatchDog(WebsocketClient client) {
this.client = client;
this.bootstrap = client.getBootstrap();
this.config = client.getConfig();
this.timer = config.getTimer();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
logger.info("[watchDog][{}] client active", ctx.channel().id());
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("[watchDog][{}] client inactive", ctx.channel().id());
reconnect(1);
ctx.fireChannelInactive();
}
private void reconnect(final int attempts) {
int timeout = 2 << attempts;
if (bootstrap.config().group().isShuttingDown()) {
return;
}
try {
timer.newTimeout(t -> tryReConnect(Math.min(max, attempts + 1)), timeout, TimeUnit.SECONDS);
} catch (Exception e) {
// do nothing
}
}
private void tryReConnect(final int nextAttempts) {
if (bootstrap.config().group().isShuttingDown()) {
return;
}
bootstrap.connect(config.getHost(), config.getPort()).addListener((ChannelFutureListener) f -> {
if (bootstrap.config().group().isShuttingDown()) {
return;
}
if (f.isSuccess()) {
Channel channel = f.channel();
client.refreshChannel(channel);
logger.info("[watchDog][{}] reconnect success", channel.id());
return;
}
logger.warn("[watchDog] reconnect failed. try later...");
reconnect(nextAttempts);
});
}
}
具体各类型的帧包处理器也可以拆分到单独文件,例如服务端
这样就实现了客户端和服务端的双向通信,并具有基本的心跳保活和断线重连机制。
注意事项:
1、如果既要提供web服务,又要开放netty服务,那么SpringApplication可能导致其中web服务无法建立连接,可以通过使用CommandLineRunnner或者ApplicationRunner中启动netty服务
2、在SimpleChannelInboundHandler中,channelRead时,如果需要复用ByteBuf,当继续向下传递时,需要retain增加一次引用计数,否则会导致IllegalReferenceCountException
3、业务处理最好单独定义个线程池处理,否则长耗时操作会阻塞EventLoop的执行
GitHub地址:netty-im-websocket
最后
以上就是生动世界为你收集整理的netty学习:基于WebSocket协议开发服务端及客户端,实现双向通信的全部内容,希望文章能够帮你解决netty学习:基于WebSocket协议开发服务端及客户端,实现双向通信所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复