问题与背景
使用传统的socket编程写代码难免有这样那样的bug,在一个netty项目中,想要做socket客户端,在引入别的项目就有点花蛇添足,这时候,就需要基于netty封装一个通用的socket客户端,只需要改变一下解码器编码器,就能使用。
解决方案
这是一个基于固定字符结尾的socket报文的案例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; @Data @Slf4j public class BaseNettyMVBoxClient { private String remoteIp; private Integer remotePort; //局部变量化关键变量,方便使用 private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf; private class PrintHandler extends SimpleChannelInboundHandler<String>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String o) throws Exception { log.debug("收到下位机信息:" + o); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } public BaseNettyMVBoxClient(String ip, Integer port) { this.remoteIp = ip; this.remotePort = port; group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .option(ChannelOption.SO_KEEPALIVE, true) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO))//开启日志 .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { ByteBuf byteBuf = Unpooled.copiedBuffer("$===+++end+++===$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(4096,byteBuf)); // 设置字符串解码器,将buf转化为字符串,默认是u8。handler接收到的就是string类型的 sc.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); sc.pipeline().addLast(new PrintHandler()); } } /** * 客户端对象执行连接的逻辑 */ public void connect(){ try { this.cf = b.connect(this.remoteIp, this.remotePort).sync(); log.debug("远程服务器已经连接, 可以进行数据交换.."); } catch (Exception e) { //e.printStackTrace(); log.error("连接下位机出现异常!"); } } /** * 实现断线重连,发送数据通过cf对象,就可以启用重连 */ public ChannelFuture getChannelFuture(){ if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public void sendMsg(ByteBuf msg){ log.debug("BaseNettyMVBoxClient执行sendMsg方法,byte数组大小:" + msg.array().length); try { this.getChannelFuture().channel().writeAndFlush(msg).sync(); //当通道关闭了,就继续往下走 用这个就永久阻塞了 //this.getChannelFuture().channel().closeFuture().sync(); }catch (Exception e){ log.error("sendMsg发生了异常:" + e.getMessage()); } } public void shutdown(){ if (this.cf != null){ this.cf.channel().close(); } if (this.group!=null){ this.group.shutdownGracefully(); } if(this.b != null){ this.b = null; } log.debug("释放BaseNettyMVBoxClient资源"); } }
使用过程中的注意事项
1.channel(NioSocketChannel.class)和handler(new ChannelInitializer()中泛型用的类要严格对应,不对应的话,用socket工具当server接收收不到数据。
2.针对收不到数据的情况,如果是channel(NioSocketChannel.class)和handler(new ChannelInitializer<’SocketChannel‘>())这种组合的话,需要在调用sendMsg之后,线程暂停几秒,才能成功发送。感觉就是如果使用SocketChannel去初始化,非阻塞的特性就发挥不出来了,必须等待着发送完毕,所以需要等几秒,如果直接往下执行,SocketChannel是阻塞的,直接就关闭了,数据发不出去,也就报错了。
后续改动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; @Data @Slf4j public class BaseNettyMVBoxClient { private String remoteIp; private Integer remotePort; //局部变量化关键变量,方便使用 private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf; private class PrintHandler extends SimpleChannelInboundHandler<String>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String o) throws Exception { log.debug("收到下位机信息:" + o); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } public BaseNettyMVBoxClient(String ip, Integer port) { this.remoteIp = ip; this.remotePort = port; group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO))//开启日志 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf byteBuf = Unpooled.copiedBuffer("$===+++end+++===$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(4096,byteBuf)); // 设置字符串解码器,将buf转化为字符串,默认是u8。handler接收到的就是string类型的 sc.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); sc.pipeline().addLast(new PrintHandler()); } // @Override // protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf byteBuf = Unpooled.copiedBuffer("$===+++end+++===$".getBytes()); // sc.pipeline().addLast(new DelimiterBasedFrameDecoder(4096,byteBuf)); 设置字符串解码器,将buf转化为字符串,默认是u8。handler接收到的就是string类型的 // sc.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); // sc.pipeline().addLast(new PrintHandler()); // } }); } /** * 客户端对象执行连接的逻辑 */ private void connect(){ try { this.cf = b.connect(this.remoteIp, this.remotePort).sync(); log.debug("远程服务器已经连接, 可以进行数据交换.."); } catch (Exception e) { //e.printStackTrace(); log.error("连接下位机出现异常!"); } } /** * 实现断线重连,发送数据通过cf对象,就可以启用重连 */ private ChannelFuture getChannelFuture(){ if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public void sendMsg(ByteBuf msg){ log.debug("BaseNettyMVBoxClient执行sendMsg方法,byte数组大小:" + msg.array().length); try { this.getChannelFuture().channel().writeAndFlush(msg); //当通道关闭了,就继续往下走 用这个就永久阻塞了 //this.getChannelFuture().channel().closeFuture().sync(); }catch (Exception e){ log.error("sendMsg发生了异常:" + e.getMessage()); } } public void shutdown(){ if (this.cf != null){ this.cf.channel().close(); } if (this.group!=null){ this.group.shutdownGracefully(); } if(this.b != null){ this.b = null; } log.debug("释放BaseNettyMVBoxClient资源"); } }
代码后续进行了一些改动,包括加入超时时间以及初始化时改回SocketChannel。 翻了一下官网几乎都是这个组合(nio.class + socketchannel),仔细思考了一下出现这种情况(用tcp工具当server,然后junit测试发送,不加线程睡眠就发不过去)的原因,因为junit执行完毕之后,就把jvm关了,但是实际上channel中的数据没有发送过去,毕竟时nio非阻塞,所以只要保证发送完之后,不马上关闭jvm,就能保证数据的送达。
如果想测试,就加NioSocketChannel的初始化+send时候的sync就好了。
最后
以上就是细腻服饰最近收集整理的关于基于netty封装socket客户端问题与背景解决方案使用过程中的注意事项后续改动的全部内容,更多相关基于netty封装socket客户端问题与背景解决方案使用过程中内容请搜索靠谱客的其他文章。
发表评论 取消回复