概述
关于ZeroMq(简称ZMQ)的定义、作用和强大这里就不再赘述了。总结一下它常见的几种经典模式,然后顺便提下我最近在高并发环境下使用它出现的一个异常及其解决过程吧!
(PS:图是盗的。。。。。)
一、REQ/REP模式
这是最常见的请求/响应模式。服务端作为发送方,客户端作为请求方。
服务端:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REP);
String url = "tcp://*:9999";
socket.bind(url);
boolean wait = true;
while (wait) {
byte[] request;
try {
request = socket.recv(0);
socket.send("OK".getBytes(), 1);
} catch (ZMQException e) {
}
}
客户端:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
System.out.println("Connecting to hello world server...");
socket.connect("tcp://localhost:9999");
String requestString = "Hello" + " ";
byte[] request = requestString.getBytes();
socket.send(request, ZMQ.NOBLOCK);
byte[] reply = socket.recv(0);
System.out.println("Received reply [" + new String(reply) + "]");
说明:服务端bind一个端口,客户端则connect一个ip地址(服务端所在的ip)和相应的端口。服务端通过context.socket(ZMQ.REP);表明是服务端,同理,客户端通过context.socket(ZMQ.REQ);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!
二、PUB/SUB模式
发布/订阅模式,这种模式大家想必很熟悉,比如微博消息的推送等。
发布方作为服务端,多个订阅方作为客户端。代码略,总结一下:
发布端bind一个端口,订阅端则connect一个ip地址(服务端所在的ip)和相应的端口。服务端通过context.socket(ZMQ.PUB);表明是发布端,同理,订阅端通过context.socket(ZMQ.SUB);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!——这里发布方作为服务端,订阅方作为客户端。
三、PUSH/PULL模式
总结:push端bind一个端口,pull端则connect一个ip地址(push端端所在的ip)和相应的端口。push端端通过context.socket(ZMQ.PUSH);表明是push端,同理,订阅端通过context.socket(ZMQ.PULL);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!——这里push端作为服务端,pull端作为客户端。
后两种模式写法类似第一种。
---------------------------------------
高并发下,出现的一个异常:
zmq.ZError$IOException: java.io.IOException: Unable to establish loopback connection
at zmq.Signaler.make_fdpair(Signaler.java:87)
at zmq.Signaler.<init>(Signaler.java:48)
at zmq.Mailbox.<init>(Mailbox.java:55)
at zmq.Ctx.<init>(Ctx.java:132)
at zmq.ZMQ.zmq_ctx_new(ZMQ.java:225)
……………………
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:106)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.<init>(PipeImpl.java:122)
at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:27)
at java.nio.channels.Pipe.open(Pipe.java:133)
at zmq.Signaler.make_fdpair(Signaler.java:85)
... 36 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): bind
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:72)
... 41 more
说明ZMQ的连接不够!
解决办法:发现自己的一个类,是并发情况下每笔数据都会经过的类,而我把
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket reg = context.socket(ZMQ.PUSH);
reg.connect("tcp://localhost:xxx");
写在这个类的方法里,造成每次经过该类的该方法,都会创建一次context和socket,也许在并发情况还不是很明显的情况下,这个异常不会出现。但在高并发的情形下,频繁地创建ZMQ.Context和ZMQ.Socket并且再connect时,会造成很大的网络开销。故把该代码块放在static{...}类的静态代码块中——无论并发情况如何,只创建和连接一次。while循环里面有的代码,只能是send操作和recv操作!避免把ZMQ的Context,Socket的创建和connect操作放在循环里面。
改完之后,压力测试48w+数据,ZMQ终于没有挂了。。
最后
以上就是儒雅小霸王为你收集整理的zeromq学习笔记和解决一个相关的异常的全部内容,希望文章能够帮你解决zeromq学习笔记和解决一个相关的异常所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复