概述
发布/订阅模式的特点:
1.一个发布者,多个订阅者的关系,1:n;
2.当发布者数据变化时发布数据,所有订阅者均能够接收到数据并处理。
这就是发布/订阅模式。
- 使用SUB设置一个订阅时,必须使用zmq_setsockopt()对消息进行过滤,例如:
服务端代码
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555") #多个客户端连接同样的地址
socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8')) # 消息过滤 只接受123开头的信息
#socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8')) # 当zmq_setsockopt()的第二个参数设置为空时,表示不过滤任何消息
while True:
response = socket.recv().decode('utf-8');
print("response: %s" % response)
这是另外一种消息的过滤方法 (订阅邮编默认10001)
# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
setsockopt的详细使用
http://api.zeromq.org/3-2:zmq-setsockopt
客户端代码
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
while True:
msg = input("请输入要发布的信息:").strip()
if msg == 'b': #关闭连接的字符
sys.exit()
socket.send(msg.encode('utf-8'))
time.sleep(1)
-
PUB-SUB模式是异步的
订阅者调用zmq.send()来发送消息是会报错的,同样发布者使用zmq.recv()来接收消息也会报错。 -
PUB和SUB谁bind谁connect并无严格要求(虽本质并无区别),但仍建议PUB使用bind,SUB使用connect
-
一个订阅者(subcriber)可以链接超过一个发布者(publisher)。数据到达后将交叉存取(公平队列),以保证发布者之间的数据不会淹没。
连接多个发布者来接收需要的消息:
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")
socket.connect("tcp://localhost:5557")
# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
while True:
string = socket.recv_string()
#zipcode, temperature, relhumidity = string.split()
print(string)
可以在发布者发送信息的开头来区分不同发布者发送的信息,客户端通过
subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
来判断接收指定的发布者的数据
当订阅者订阅了不同类型的发布信息,为了保证互相不阻塞 有两种方法
import zmq
import time
# Prepare our context and sockets
context = zmq.Context()
# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
# Process messages from both sockets
# We prioritize traffic from the task ventilator
while True:
# Process any waiting tasks
while True:
try:
msg = receiver.recv(zmq.DONTWAIT)
except zmq.Again:
break
# process task
# Process any waiting weather updates
while True:
try:
msg = subscriber.recv(zmq.DONTWAIT)
except zmq.Again:
break
# process weather update
time.sleep(0.01)
import zmq
# Prepare our context and sockets
context = zmq.Context()
# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
# Process messages from both sockets
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
break
if receiver in socks:
message = receiver.recv()
# process task
if subscriber in socks:
message = subscriber.recv()
# process weather update
- 如果一个发布者(publisher)没有任何订阅者(subcriber)连接,则发布者会简单的丢弃所有的消息。
- 如果使用TCP同时订阅者(subcriber)很慢,这会导致消息在发布者(publisher)端排队,造成消息堆积影响程序性能,为了解决这个问题,需要合理设置high-water mark(高水位线)。
定义:
ZeroMQ使用HWM(高水位标志)的概念来定义其内部管道的容量。从套接字或进入套接字的每个连接都有自己的管道和用于发送和/或接收的HWM,具体取决于套接字类型。一些套接字(发布、推送)只有发送缓冲区。一些(SUB, PULL, REQ, REP)只有接收缓冲区。一些(经销商,路由器,对)有发送和接收缓冲区。
当您的套接字到达其HWM时,它将根据套接字类型阻塞或删除数据。如果PUB和ROUTER套接字到达它们的HWM,它们将丢弃数据,而其他套接字类型将阻塞。
要注意的是,PUB、ROUTER套接字在到达HWM后会丢弃数据,其他的会阻塞。另外,inproc的transport,发送端和接收端共享同一个缓存,因此实际HWM是两者HWM之和。
下面是对必须处理未知订阅方的发布方来说的一个更明智的“最佳实践”:
总是给套接字设置一个基于期望的订阅方数量的最大值,你打算用于队列的内存的数量,和一个消息平均大小的高水位线。例如,如果你希望有5000个订阅方,有1G的内存可有,消息平均200字节,那么一个安全的高水位线应该是(1000000000/200/5000)=1000.
- 从ZeroMQ v3.x版本开始,使用tcp://或者ipc://协议连接时会在发布者进行消息过滤,使用epgm://协议仍在订阅者过滤;在ZeroMQ v2.x,所有消息过滤都发生在订阅者。
最后
以上就是魁梧水壶为你收集整理的zmq 发布/订阅模式的详解 python代码的全部内容,希望文章能够帮你解决zmq 发布/订阅模式的详解 python代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复