我是靠谱客的博主 无情大地,最近开发中收集的这篇文章主要介绍Netty的数据通信场景(实用),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

又一个星期,上周一直忙着做项目,都没时间学习了。。。趁着周末女朋友和闺蜜去约会,我自己在家继续学习一下Netty了,感觉已经学了好久了哈哈哈。

之前关于Netty的基础基本已经学得差不多了。但是我们需要了解下在真正项目应用中如何去考虑Netty的使用,大体上对于一些参数都是根据服务器性能决定的。这个不是最主要的。我们需要考虑的问题是两台机器(甚至是多台)使用Netty是怎样进行通信的。

下面大概分为三种:

1、第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态,当然这前提是服务器性能足够的好,并且我们客户端数量也是比较少的情况下,就是推荐这种方式的。

2、第二种,一次性批量提交数据,采用短连接的方式,也就是我们会根据数据保存在本地临时缓存区或者临时表里,当达到临界值时进行一次性批量提交,又或者根据定时任务轮询提交,这种情况的弊端就是做不到实时性传输,在对实时性要求不高的应用程序中可以推荐使用。

3、第三种,我们可以使用一种特殊的长连接:在指定某一时间内,服务器与某台客户端没有任何通信的话,则断开连接;下次连接则是客户端向服务器发送请求的时候,才再次建立连接。但是这种模式我们需要考虑两个因素:

      1)如何在超时(即服务端和客户端没有任何通信)后关闭通道?关闭通道后我们又如何再次建立连接?

      2)客户端宕机时,我们无需考虑,因为下次客户端重启之后我们就可以与服务器建立连接了。但是服务器宕机呢?我们的客户端如何与服务器进行连接呢?

 

下面我们先解决第一个问题咯,就是关于超时后关闭通道。

这个倒不用担心,Netty有自己的超时机制。简单介绍一下,因为我自己都没深入了解到。

Netty 的超时类型 IdleState 主要分为:

  • ALL_IDLE : 一段时间内没有数据接收或者发送
  • READER_IDLE : 一段时间内没有数据接收
  • WRITER_IDLE : 一段时间内没有数据发送

在 Netty 的 timeout 包下,主要类有:

  • IdleStateEvent : 超时的事件
  • IdleStateHandler : 超时状态处理
  • ReadTimeoutHandler : 读超时状态处理
  • WriteTimeoutHandler : 写超时状态处理

其中 IdleStateHandler 包含了读写超时状态处理,比如

private static final int READ_IDEL_TIME_OUT = 4; // 读超时 private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时 private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时 new IdleStateHandler(READ_IDEL_TIME_OUT, WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS));

上述例子,在 IdleStateHandler 中定义了读超时的时间是 4 秒, 写超时的时间是 5 秒,其他所有的超时时间是 7 秒。

既然 IdleStateHandler 包括了读写超时状态处理,那么很多时候 ReadTimeoutHandler 、 WriteTimeoutHandler 都可以不用使用。

但是我个人觉得,我喜欢更细腻(细粒度)地掌控我的代码,只要读超时就只有读超时,只要写超时就只有写超时。所以,下面的代码就简单用一下ReadTimeoutHandler读超时状态处理器。那先简单介绍一下ReadTimeoutHandler的两个构造函数。

1、new ReadTimeoutHandler(int timeoutSeconds):参数表示timeoutSeconds秒后如果没读到数据就断开连接通道

2、new ReadTimeoutHandler(long timeout,TimeUnit unit):这个有两个参数,第一个是表示数量,而第二个是表示单位,例如timeout为5,而unit为TimeUnit.SECONDS,那么就是5秒后没读到数据就判断为读超时,然后断开连接的通道。注意一下:TimeUnit是concurrent包下的。

 

那介绍玩就先上个测试代码玩玩咯。其实和之前的文章的代码都是差不多的,唯一的就是加了超时处理器,还有就是客户端改为单例的。

服务端Server:

public class Server{
	
	public static void main(String[] args) throws InterruptedException {
		//1、创建一个线程组来处理服务器接收客户端连接
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		//2、创建一个线程组来进行网络通信
		EventLoopGroup workGroup = new NioEventLoopGroup();
		//3、创建辅助工具类Bootstrap,用于服务端通道的配置
		ServerBootstrap sb = new ServerBootstrap();
		sb.group(bossGroup, workGroup)
		.handler(new LoggingHandler())  //设置日志
		.channel(NioServerSocketChannel.class)  //指定NIO模式
		.option(ChannelOption.SO_BACKLOG, 1014) //设置tcp缓冲区大小
		.option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲区大小
		.option(ChannelOption.SO_RCVBUF,32*1024) //设置接收缓冲区大小
		.option(ChannelOption.SO_KEEPALIVE, true) //保持连接(长连接)
		.childHandler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				//添加mashalling的编解码
				ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder());
				ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder());
				ch.pipeline().addLast(new ServerHandler()); //配置具体的数据处理类
			}
		});
		
		ChannelFuture f = sb.bind(9876).sync(); //绑定端口,客户端连接需要知道端口
		//异步等待关闭(记得不是close()方法)
		f.channel().closeFuture().sync();
		
		//当通道关闭后,将线程组也关闭
		bossGroup.shutdownGracefully();
		workGroup.shutdownGracefully();
		
	}

}

 服务端Handler:

public class ServerHandler extends ChannelHandlerAdapter{

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//因为有marshalling编解码器,所以可以直接将msg强转为RequestEntity
		RequestEntity entity = (RequestEntity)msg;
		System.out.println(entity.getId()+" "+entity.getName()+" "+entity.getMessageContent());
		ResponseEntity response = new ResponseEntity(entity.getId(), "服务端响应"+entity.getId());
		response.setMessageContent("这是服务端给客户端的响应消息");
		ctx.writeAndFlush(response);
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();  //出现异常的话就关闭
	}
	
}

到了改头换面的客户端了,今次的客户端,我们做成了单例,因为一般场景做单例是最合适的了。

public class Client {
	private static class SingletonHolder{
		static final Client instance = new Client();
	}
	
	private static Client getInstance(){
		return SingletonHolder.instance;
	}
	private EventLoopGroup workGroup;
	private Bootstrap bs;
	private ChannelFuture cf;

	private Client(){
		//和服务端不一样,客户端只需要一个线程组来处理网络通信即可
		workGroup = new NioEventLoopGroup();
		//创建辅助类,和服务器的不一样,服务端的是ServerBootStrap,而客户端的是BootStrap
		bs = new Bootstrap();
		bs.group(workGroup)
		.channel(NioSocketChannel.class) //设置tcp缓冲区大小
		.handler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				//添加mashalling的编解码
				ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder());
				ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder());
				ch.pipeline().addLast(new ClientHandler());
			}
		});
	}
	
	public void connect(){  //连接服务端
		try {
			this.cf = bs.connect("127.0.0.1", 9876).sync();
			System.out.println("客户端已远程连接到服务端,可以进行数据的交换....");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public ChannelFuture getChannelFuture(){ //获取连接通道
		return this.cf;
	}
	
	public static void main(String[] args) throws Exception {
		final Client c =  Client.getInstance();
		c.connect();
		ChannelFuture cf = c.getChannelFuture();
		//给服务端写数据
		for(int i=1;i<=4;i++){
			RequestEntity request = new RequestEntity(Integer.toString(i),"客户端请求"+i);
			request.setMessageContent("这是客户端向服务端的第"+i+"次请求");
			cf.channel().writeAndFlush(request);
			TimeUnit.SECONDS.sleep(4); //休眠四秒
		}
		
		cf.channel().closeFuture().sync(); //异步等待关闭通道

		System.out.println("断开连接,主线程结束");
	}
}

客户端Handler:

public class ClientHandler extends ChannelHandlerAdapter{

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//直接将msg强转为ResponseEntity
		ResponseEntity entity = (ResponseEntity) msg;
		System.out.println(entity.getId()+" "+entity.getName()+" "+entity.getMessageContent());
		//最后如果没有回写记得释放
		ReferenceCountUtil.release(msg);
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
	
}

请求和响应实体类:

public class RequestEntity implements Serializable{

	private static final long serialVersionUID = 1L;
	private String id;
	private String name;
	private String messageContent;
	
	public RequestEntity(String id, String name) {
		super();
		this.id = id;
		this.name = name;
	}
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getMessageContent() {
		return messageContent;
	}
	public void setMessageContent(String messageContent) {
		this.messageContent = messageContent;
	}
}
public class ResponseEntity implements Serializable{

	private static final long serialVersionUID = 1L;
	private String id;
	private String name;
	private String messageContent;
	
	public ResponseEntity(String id, String name) {
		super();
		this.id = id;
		this.name = name;
	}
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getMessageContent() {
		return messageContent;
	}
	public void setMessageContent(String messageContent) {
		this.messageContent = messageContent;
	}
}

最后记得我们自己写的Marshalling工厂,这个就不放出来了,可以到前面的文章去找。

1、都说了加超时处理器,但是上面的代码并没有加,而且是不是一定要服务器和客户端都加呢,还是只加其中一个就好了。我们尝试一下吧。先是客户端加一个。

在initChannel方法中加:

//主要为了减少服务端资源占用
ch.pipeline().addLast(new ReadTimeoutHandler(5));  //此构造函数的参数是多少秒

先启动服务器,再启动客户端,结果:

客户端:
客户端已远程连接到服务端,可以进行数据的交换....
0 服务端响应0 这是服务端给客户端的响应消息
1 服务端响应1 这是服务端给客户端的响应消息
2 服务端响应2 这是服务端给客户端的响应消息
3 服务端响应3 这是服务端给客户端的响应消息
4 服务端响应4 这是服务端给客户端的响应消息
断开连接,主线程结束   //五秒后,因为客户端没读到服务端返回的响应,所以关闭了通道
服务端:
0 客户端请求0 这是客户端向服务端的第0次请求
1 客户端请求1 这是客户端向服务端的第1次请求
2 客户端请求2 这是客户端向服务端的第2次请求
3 客户端请求3 这是客户端向服务端的第3次请求
4 客户端请求4 这是客户端向服务端的第4次请求
io.netty.handler.timeout.ReadTimeoutException

 我们可以看到,最后客户端在写完第四次数据后,五秒内没读到服务器有返回的响应,所以五秒后就断开了连接了。

2、那么如果我们只设置服务器的读超时呢,记得把上面的客户端的读超时先注释掉:

同理在initChannel方法里面添加超时处理器:

 

ch.pipeline().addLast(new ReadTimeoutHandler(5));

也是记得先执行服务器,再运行客户端,结果:

客户端:
客户端已远程连接到服务端,可以进行数据的交换....
0 服务端响应0 这是服务端给客户端的响应消息
1 服务端响应1 这是服务端给客户端的响应消息
2 服务端响应2 这是服务端给客户端的响应消息
3 服务端响应3 这是服务端给客户端的响应消息
4 服务端响应4 这是服务端给客户端的响应消息
断开连接,主线程结束 //五秒后,因为服务端没读到客户端的消息,所以客户端被服务端关闭了通道
服务端:
0 客户端请求0 这是客户端向服务端的第0次请求
1 客户端请求1 这是客户端向服务端的第1次请求
2 客户端请求2 这是客户端向服务端的第2次请求
3 客户端请求3 这是客户端向服务端的第3次请求
4 客户端请求4 这是客户端向服务端的第4次请求
io.netty.handler.timeout.ReadTimeoutException

我们可以看到,最后客户端在写完第四次数据后还是被断开连接了,这是五秒内服务器没读到有客户端写的数据,所以五秒后就断开了连接了。

所以说,其实在哪里设置超时处理器都是没问题的,但是一般建议两方都设置一下咯,最后还是主要要看业务和场景需求。

 

2、那么超时处理已经没问题了,但是通道关闭后,客户端如何再次和服务器建立连接呢。

下面我们在关闭通道后加个子线程测试一下通道的状态。

new Thread(new Runnable(){

			@Override
			public void run() {
				try {
					ChannelFuture cf = c.getChannelFuture();
					System.out.println("通道是否活跃:"+cf.channel().isActive());
					System.out.println("通道是否打开:"+cf.channel().isOpen());
					RequestEntity request  = new RequestEntity("5", "客户端的请求5");
					request.setMessageContent("这是客户端向服务端的第5次请求");
					cf.channel().writeAndFlush(request);
					cf.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} //异步等待关闭通道
				System.out.println("子线程结束");
			}
			
		}).start();  //尝试再启动一个子线程去给服务端写数据

我们看一下执行结果:

客户端:
客户端已远程连接到服务端,可以进行数据的交换....
1 服务端响应1 这是服务端给客户端的响应消息
2 服务端响应2 这是服务端给客户端的响应消息
3 服务端响应3 这是服务端给客户端的响应消息
4 服务端响应4 这是服务端给客户端的响应消息
进入子线程....
断开连接,主线程结束
通道是否活跃:false
通道是否打开:false
子线程结束
服务端:
1 客户端请求1 这是客户端向服务端的第1次请求
2 客户端请求2 这是客户端向服务端的第2次请求
3 客户端请求3 这是客户端向服务端的第3次请求
4 客户端请求4 这是客户端向服务端的第4次请求
io.netty.handler.timeout.ReadTimeoutException

小结:很明显,上面客户端再次连接服务端是失败的,因为通道的状态都是不活跃和没打开,因为在那四次循环发完数据的五秒后,通道就已经被关闭了,所以客户端肯定是不能连接到服务端的。那么我们究竟怎么才能再次连接服务端呢,其实很简单,不是有一个获取通道的方法吗,其实我们在获取的时候先判断通道结果cf是否为null的,如果是null的话就再次调用connect进行连接,然后也判断一下通道cf.channel()是否是活跃的,如果不是活跃的也要重新调用connect方法进行连接。那么,我们下面对getChannelFuture()方法进行改造吧。

public ChannelFuture getChannelFuture(){
	if(this.cf == null){
		this.connect();
	}
	if(!this.cf.channel().isActive()){
		this.connect();
	}
	return this.cf;
}

改造完我们再次进行测试,先启动服务器,再启动客户端:

客户端:
客户端已远程连接到服务端,可以进行数据的交换....
1 服务端响应1 这是服务端给客户端的响应消息
2 服务端响应2 这是服务端给客户端的响应消息
3 服务端响应3 这是服务端给客户端的响应消息
4 服务端响应4 这是服务端给客户端的响应消息
断开连接,主线程结束
客户端已远程连接到服务端,可以进行数据的交换....  //很明显,客户端确实再次连接服务端了
通道是否活跃:true
通道是否打开:true
5 服务端响应5 这是服务端给客户端的响应消息
子线程结束  //细心点可以发现,五秒后这个子连接再次断开。
服务端:
1 客户端请求1 这是客户端向服务端的第1次请求
2 客户端请求2 这是客户端向服务端的第2次请求
3 客户端请求3 这是客户端向服务端的第3次请求
4 客户端请求4 这是客户端向服务端的第4次请求
io.netty.handler.timeout.ReadTimeoutException
5 客户端的请求5 这是客户端向服务端的第5次请求  //这里也能发现,客户端再次连接服务端了
io.netty.handler.timeout.ReadTimeoutException

 上面结果就知道再次连接的问题已经得到解决了。

那么第二个和第三个问题呢?

好了,再到第二个问题,客户端宕机后怎么办,这个不用说了,其实就是不用管,他宕机了那么就等他重启再重新去连接服务端咯。

第三个问题,服务端宕机,这个问题就大了,这就不能不管咯。老师说过一个思路,就是如果服务端代码是打成jar文件的话,在window系统(Linux系统应该是shell脚本吧)写一个bat脚本,定时让客户端连接一下服务端。如果是netty放到Tomcat应用的话,用Spring写一个定时器。总之就是用自动化解决宕机问题。

那么好了,其实老师还简单地介绍了一下实际场景二-心跳监控,下面是老师的原话。

我们使用Socket通信一般经常会处理多个服务器之间的心跳检测,一般来讲我们去维护服务器集群,肯定要有一台或者N台服务器主机(Master),然后还应该有N台(Slave),那么我们的主机肯定要时时刻刻知道自己下面的从服务器的各方面情况,然后进行实时监控的功能,这个在分布式架构里叫做心跳检测或者是心跳监控。最佳处理方案还是觉得是使用一些通信框架进行实现,我们的Netty就可以去做这样一件事情。

不知不觉已经两点半了,要开始干活了。。

 

 

最后

以上就是无情大地为你收集整理的Netty的数据通信场景(实用)的全部内容,希望文章能够帮你解决Netty的数据通信场景(实用)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部