概述
第四章 可靠的请求-应答模式
懒惰海盗模式:来自客户端的可靠的请求-应答。
简单海盗模式:使用负载均衡的可靠的请求-应答。
偏执海盗模式:使用信号检测的可靠的请求-应答。
管家模式:面向服务的可靠排队。
泰坦尼克模式:基于磁盘/断开连接的可靠排队。
双星模式:主备份服务器故障转移。
自由职业者模式:缺少代理的可靠的请求-应答。
一、什么是“可靠性”
1、 定义:
从故障的角度来定义可靠性,如果我们可以处理一组特定的被明确定义和理解的故障,那么我们对于这些故障是可靠的。简单来说,可靠性就是“在代码冻结或崩溃时,让事情保持正常工作”。
2、故障排序(概率大小降序)
1.应用程序代码。
2.系统代码。
3.消息队列可能会溢出。
4.网络可能会失效。
5.硬件可能会失效。
6.网络可能会因外来的途径失败。例如,一个交换机的某些端口损坏。
7.整个数据中心因雷电、地震、火灾,或常见的电力或冷却故障受损。
二、可靠性设计
1、REQ-REP(请求-应答):
当客户端接收不到服务器的答复,可以判断正在处理请求的服务器故障了。客户端可以采取在放弃、等待、稍后再试以及发现寻找另一台服务器等处理方式。基本的REQ-REP模式应对故障效果很差,处理请求时服务器崩溃,网络丢失REQ或REP,客户端就会一直挂起。
2、将客户端连接到服务器大致有三种方式:
- 多个客户端与一台服务器直接交流。
故障类型:服务器崩溃重启,网络断开连接。 - 多个客户端与一台将工作分配给多个worker的代理服务器交流。
故障类型:worker崩溃重启、超负荷、worker频繁循环,队列崩溃并重启,以及网络断开连接。 - 多个客户端与多台服务器交流,没有中间层代理。
故障类型:服务崩溃并重新启动、服务忙于循环、服务过载, 以及网络断开连接。
3、 提高可靠性的三种方法:
1、 客户端可靠性(懒惰海盗模式):
不执行一个阻塞的接收,而是:
仅当确信应答已经到达时,才轮询REQ套接字并接收它的应答。
如果在超时时间内没有应答到达,则重新发送一个请求。
如果多次请求后还是没有应答,则放弃事务。
示例Lazy Pirate client
# coding=gbk
# 懒惰海盗客户端
# 使用zmq_poll执行安全的请求-应答
# 要运行它,启动lpserver,然后再随机清除/重启它
#
import itertools
import logging
import sys
import zmq
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
REQUEST_TIMEOUT = 2500
# 放弃前重试次数
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
context = zmq.Context()
logging.info("Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
for sequence in itertools.count():
request = str(sequence).encode()
logging.info("Sending (%s)", request)
# 发送一个请求,然后努力去获取一个应答
client.send(request)
retries_left = REQUEST_RETRIES
while True:
# 以超时时间轮询,套接字得到一个应答
if (client.poll(REQUEST_TIMEOUT) & zmq.POLLIN) != 0:
reply = client.recv()
if int(reply) == sequence:
logging.info("Server replied OK (%s)", reply)
break
else:
logging.error("Malformed reply from server: %s", reply)
continue
retries_left -= 1
logging.warning("No response from server")
# 关闭并移除旧套接字。
client.setsockopt(zmq.LINGER, 0)
client.close()
if retries_left == 0:
logging.error("Server seems to be offline, abandoning")
sys.exit()
logging.info("Reconnecting to server…")
# 创建一个新套接字,再次发送请求
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
logging.info("Resending (%s)", request)
client.send(request)
示例Lazy Pirate server
# coding=gbk
# 懒惰海盗服务器
# 绑定REQ套接字到 tcp://*:5555
# 类似于 hwserver,除了 :
# - 按原样回应请求
# - 随机地慢速运行,或退出以模拟服务器崩溃。
#
from random import randint
import itertools
import logging
import time
import zmq
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")
for cycles in itertools.count():
request = server.recv()
# 模拟各种问题,在几个周期后
if cycles > 3 and randint(0, 3) == 0:
logging.info("Simulating a crash")
break
elif cycles > 3 and randint(0, 3) == 0:
logging.info("Simulating CPU overload")
time.sleep(2)
logging.info("Normal request (%s)", request)
time.sleep(1) # Do some heavy work
server.send(request)
(懒惰海盗模式)优点:
•易于理解和实施。
•能轻松地与现有的客户端和服务器应用程序代码配合使用。
•ZMQ自动重试实际的重新连接,直到它正常工作为止。
(懒惰海盗模式)缺点:
•不能执行到备份或备用服务器的故障切换。
2、 基本可靠队列(简单海盗模式):
带有队列代理的懒惰海盗模式,我们可以透明地通过这个代理与多台服务器交流,我们可以更准确地称这些服务器为worker,从最简化的模式(简单海盗模式)开始。
这些海盗模式中,工人都是无状态的。如果应用程序需要共享某些状态,例如,一个共享的数据库。有队列代理,意味着worker来去自如,客户端不需要知道关于它的任何事。如果一个worker崩溃,另一个worker就接管它。
示例Simple Pirate queue
# coding=gbk
#
# 简单海盗队列
# 这与负载均衡模式是相同的,不带可靠性
# 它依赖于客户端来恢复。始终运行。
#
import zmq
# worker准备就绪的信号
LRU_READY = "x01"
context = zmq.Context(1)
frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # 用于客户端
backend.bind("tcp://*:5556") # 用于workers
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
workers = []
while True:
if workers:
socks = dict(poll_both.poll())
else:
socks = dict(poll_workers.poll())
# 处理后端的worker活动
if socks.get(backend) == zmq.POLLIN:
# 使用worker地址进行LRU路由
msg = backend.recv_multipart()
if not msg:
break
address = msg[0]
workers.append(address)
# 第二个(分隔符)之后的所有内容都是应答
reply = msg[2:]
# 如果不是READY,则转发消息给客户端
if reply[0] != LRU_READY:
frontend.send_multipart(reply)
if socks.get(frontend) == zmq.POLLIN:
# 获取客户端请求,路由到第一个可用的worker
msg = frontend.recv_multipart()
request = [workers.pop(0), ''.encode()] + msg
backend.send_multipart(request)
示例Simple Pirate worker
# coding=gbk
# 简单海盗worker
# 连接REQ套接字到tcp://*:5556
# 实现LRU排队的worker部分
from random import randint
import time
import zmq
# worker准备就绪的信号
LRU_READY = "x01"
context = zmq.Context(1)
worker = context.socket(zmq.REQ)
# 设置随机的身份以便于跟踪
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
worker.connect("tcp://localhost:5556")
# 告诉代理我们已准备好接受工作
print("I: (%s) worker ready" % identity)
worker.send_string(LRU_READY)
cycles = 0
while True:
msg = worker.recv_multipart()
if not msg:
break
# 模拟各种问题,在几个周期后 cycles++
cycles += 1
if cycles > 0 and randint(0, 5) == 0:
print("I: (%s) simulating a crash" % identity)
break
elif cycles > 3 and randint(0, 5) == 0:
print("I: (%s) simulating CPU overload" % identity)
time.sleep(3)
print("I: (%s) normal reply" % identity)
time.sleep(1) # Do some heavy work
worker.send_multipart(msg)
这种模式适用于任何数量的客户端和工人。不过,它也有一些缺点:
• 它在面临队列崩溃和重启时不够健壮。客户端会恢复,但worker不会。新启动的队列连接之前,worker不会发出准备就绪的信号。要解决这个问题,需在队列与worker间做信号检测,让工人可以检测到队列已经死机。
• 队列不检测worker的故障,如果一个worker在空闲期间崩溃,除非队列给它发送一个请求,否则不能从工作队列中将其删除。客户端会不停的等待和重试。需在worker与队列之间做信号检测,让队列可以在任何阶段都检测到丢失的worker。
3、 健壮可靠队列(偏执海盗模式):
对于偏执海盗worker,切换到一个DEALER套接字。这样我们可以在任何时候发送和接收消息,不需遵循REQ强调的步调一致地发送/接收。
示例Paranoid Pirate queue
# coding=gbk
# 偏执海盗队列
#
from collections import OrderedDict
import time
import zmq
HEARTBEAT_LIVENESS = 3 # 3-5是合理的
HEARTBEAT_INTERVAL = 1.0 # 秒钟
# 偏执海盗协议常量
PPP_READY = b"x01" # worker准备就绪的信号
PPP_HEARTBEAT = b"x02" # worker信号检测的信号
# worker类
class Worker(object):
def __init__(self, address):
self.address = address
self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
class WorkerQueue(object):
def __init__(self):
self.queue = OrderedDict()
def ready(self, worker):
self.queue.pop(worker.address, None)
self.queue[worker.address] = worker
def purge(self):
"""寻找并杀死过期的workers."""
t = time.time()
expired = []
for address, worker in self.queue.items():
if t > worker.expiry: # 过期的 Worker
expired.append(address)
for address in expired:
print("W: Idle worker expired: %s" % address)
self.queue.pop(address, None)
def next(self):
address, worker = self.queue.popitem(False)
return address
context = zmq.Context(1)
frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # 用于客户端
backend.bind("tcp://*:5556") # 用于workers
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
# 可用的工人的列表
workers = WorkerQueue()
# 以常规时间间隔发出检测信号
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
while True:
if len(workers.queue) > 0:
poller = poll_both
else:
poller = poll_workers
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# 在后台处理worker活动
if socks.get(backend) == zmq.POLLIN:
# 使用worker地址进行负载均衡
frames = backend.recv_multipart()
if not frames:
break
# 来自工人的任何信号表示它已准备就绪
address = frames[0]
workers.ready(Worker(address))
# 验证控制消息,或给客户端返回应答
msg = frames[1:]
if len(msg) == 1:
if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
print("E: Invalid message from worker: %s" % msg)
else:
frontend.send_multipart(msg)
# 如果时间到了,给空闲的worker发送心跳信号
if time.time() >= heartbeat_at:
for worker in workers.queue:
msg = [worker, PPP_HEARTBEAT]
backend.send_multipart(msg)
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
if socks.get(frontend) == zmq.POLLIN:
frames = frontend.recv_multipart()
if not frames:
break
frames.insert(0, workers.next())
backend.send_multipart(frames)
workers.purge()
示例Paranoid Pirate worker
# coding=gbk
#
# 偏执海盗工人
#
from random import randint
import time
import zmq
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32
# 偏执海盗协议常量
PPP_READY = b"x01" # 工人准备就绪的信号
PPP_HEARTBEAT = b"x02" # 工人信号检测的信号
def worker_socket(context, poller):
"""返回一个新的配置套接字的Helper函数与偏执的海盗队列相连"""
worker = context.socket(zmq.DEALER) # DEALER
identity = b"%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt(zmq.IDENTITY, identity)
poller.register(worker, zmq.POLLIN)
worker.connect("tcp://localhost:5556")
worker.send(PPP_READY)
return worker
context = zmq.Context(1)
poller = zmq.Poller()
liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
worker = worker_socket(context, poller)
cycles = 0
while True:
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# 在后端处理worker活动
if socks.get(worker) == zmq.POLLIN:
# Get message
# - 3-part envelope + content -> request
# - 1-part HEARTBEAT -> heartbeat
frames = worker.recv_multipart()
if not frames:
break # Interrupted
if len(frames) == 3:
# 在几个周期后模拟各种问题
cycles += 1
if cycles > 3 and randint(0, 5) == 0:
print("I: Simulating a crash")
break
if cycles > 3 and randint(0, 5) == 0:
print("I: Simulating CPU overload")
time.sleep(3)
print("I: Normal reply")
worker.send_multipart(frames)
liveness = HEARTBEAT_LIVENESS
time.sleep(1) # Do some heavy work
elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
print("I: Queue heartbeat")
liveness = HEARTBEAT_LIVENESS
else:
print("E: Invalid message: %s" % frames)
interval = INTERVAL_INIT
else:
liveness -= 1
if liveness == 0:
print("W: Heartbeat failure, can't reach queue")
print("W: Reconnecting in %0.2fs..." % interval)
time.sleep(interval)
if interval < INTERVAL_MAX:
interval *= 2
poller.unregister(worker)
worker.setsockopt(zmq.LINGER, 0)
worker.close()
worker = worker_socket(context, poller)
liveness = HEARTBEAT_LIVENESS
if time.time() > heartbeat_at:
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
print("I: Worker heartbeat")
worker.send(PPP_HEARTBEAT)
三、信号检测(Heartbeating)
信号检测用来确定节点是活着还是死掉。ZMQ来检测信号的三个方案
1、不做任何操作
最常见的方法是抱乐观态度不做信号检测。
2、 单向信号检测
所有节点每隔一秒发送检测信号消息给其对等节点。当某个节点在超过几秒钟内没有从另一个节点收到任何回复,就把该对等节点当作已崩溃。
3、 乒乓信号检测
使用一个乒乓对话框。一个节点发送一个ping命令到另一个节点,后者回复一个pong命令。
例子:针对偏执海盗的信号检测(方法2)
worker中处理来自队列的检测信号步骤:
- 设置一个活跃度(liveness)。这是我们让队列崩溃前,允许错过的检测信号数量。初始值是3,每次错过检测信号时,递减。
- 在zmq_poll()循环中,每次等候1秒(信号检测间隔)。
- 如果在这段时间里有来自队列的消息,将活跃度重置为3。
- 如果在这段时间里没有来自队列的消息,减少活跃度。
- 如果活跃度达到零,我们就认为队列崩溃了。
- 如果队列已经崩溃了,就销毁套接字,创建一个新的套接字,并重新连接。
- 为避免打开和关闭太多套接字,在重新连接之前,需等待一定的时间,并且每
次对时间间隔加倍,直至达到32秒。
处理队列的信号检测: - 在发送下一个检测信号的时候执行计算,这是一个变量,因为我们正在与队列交流。
- 在zmq_poll()循环中,每当经过这段时间,我们就发送一个检测信号到队列。
示例代码:worker的基本信号检测
#define HEARTBEAT_LIVENESS 3 // 3-5是合理的
#define HEARTBEAT_INTERVAL 1000 // 毫秒
#define INTERVAL_INIT 1000 // 重新连接时间间隔的初始值
#define INTERVAL_MAX 32000 // 在指数级退避后的时间间隔
...
// 如果活跃度触及零,就认为队列已经断开连接
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// 以常规时间间隔发出检测信号
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
while (true) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// 从队列接收到任何消息
liveness = HEARTBEAT_LIVENESS;
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
...
liveness = HEARTBEAT_LIVENESS;
}
// 如果到时间了就发送检测信号给队列
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
// 发送检测信号消息给队列
}
}
四、合同和协议(Contracts and Protocols)
定义:为了保证互操作性,我们需要一种合同,一份协议,让在不同时间和地点的不同团队编写能保证协同工作的代码,我们称之为“协议”。
1、 面向服务的可靠队列(管家模式)
管家协议(MDP)用一个有趣的方式延伸并改善了海盗模式协议(PPP):它在客户端发送的请求上增加了一个“服务名称”,并要求worker注册特定的服务。添加服务名称使得偏执海盗队列变成了面向服务的代理。管家协议分为两部分,客户端一方和worker一方。
客户端API:
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
worker API:
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
示例Majordomo Protocol Client API
coding=gbk
“”"管家协议客户端API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
“”"
import logging
import zmq
import MDP
from zhelpers import dump
class MajorDomoClient(object):
“”"Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
broker = None
ctx = None
client = None
poller = None
timeout = 2500
retries = 3
verbose = False
def __init__(self, broker, verbose=False):
self.broker = broker
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""连接或重新连接到代理"""
if self.client:
self.poller.unregister(self.client)
self.client.close()
self.client = self.ctx.socket(zmq.REQ)
self.client.linger = 0
self.client.connect(self.broker)
self.poller.register(self.client, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
def send(self, service, request):
"""向代理发送请求并得到回复。
获得请求消息的所有权,并在发送时销毁它。
返回应答消息,如果没有应答,则返回None。
"""
if not isinstance(request, list):
request = [request]
request = [MDP.C_CLIENT, service] + request
if self.verbose:
logging.warn("I: send request to '%s' service: ", service)
dump(request)
reply = None
retries = self.retries
while retries > 0:
self.client.send_multipart(request)
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
break # interrupted
if items:
msg = self.client.recv_multipart()
if self.verbose:
logging.info("I: received reply:")
dump(msg)
# 不要尝试处理错误
assert len(msg) >= 3
header = msg.pop(0)
assert MDP.C_CLIENT == header
reply_service = msg.pop(0)
assert service == reply_service
reply = msg
break
else:
if retries:
logging.warn("W: no reply, reconnecting...")
self.reconnect_to_broker()
else:
logging.warn("W: permanent error, abandoning")
break
retries -= 1
return reply
def destroy(self):
self.context.destroy()
示例Majordomo Protocol client example
"""
管家协议客户端示例. Uses the mdcli API to hide all MDP aspects
Author : Min RK <benjaminrk@gmail.com>
"""
import sys
from mdcliapi import MajorDomoClient
def main():
verbose = '-v' in sys.argv
client = MajorDomoClient("tcp://localhost:5555", verbose)
count = 0
while count < 100000:
request = b"Hello world"
try:
reply = client.send(b"echo", request)
except KeyboardInterrupt:
break
else:
# 如果没有回复,也可以打断:
if reply is None:
break
count += 1
print ("%i requests/replies processed" % count)
if __name__ == '__main__':
main()
示例Majordomo Protocol Worker API
"""管家协议 Worker API, Python version
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""
import logging
import time
import zmq
from zhelpers import dump
# 管家协议常量:
import MDP
class MajorDomoWorker(object):
"""Majordomo Protocol Worker API, Python version
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
broker = None
ctx = None
service = None
worker = None # Socket 代理
heartbeat_at = 0 # 何时发送心跳信号 (relative to time.time(), so in seconds)
liveness = 0 # 还有多少次尝试
heartbeat = 2500 # 心跳延迟, 毫秒
reconnect = 2500 # 重连延迟, 毫秒
# 内部状态
expect_reply = False # 仅开始时 False
timeout = 2500 # 轮询超时
verbose = False # 将活动打印到标准输出
# 返回地址(如果有的话)
reply_to = None
def __init__(self, broker, service, verbose=False):
self.broker = broker
self.service = service
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.worker:
self.poller.unregister(self.worker)
self.worker.close()
self.worker = self.ctx.socket(zmq.DEALER)
self.worker.linger = 0
self.worker.connect(self.broker)
self.poller.register(self.worker, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
# Register service with broker
self.send_to_broker(MDP.W_READY, self.service, [])
# If liveness hits zero, queue is considered disconnected
self.liveness = self.HEARTBEAT_LIVENESS
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
def send_to_broker(self, command, option=None, msg=None):
"""Send message to broker.
If no msg is provided, creates one internally
"""
if msg is None:
msg = []
elif not isinstance(msg, list):
msg = [msg]
if option:
msg = [option] + msg
msg = [b'', MDP.W_WORKER, command] + msg
if self.verbose:
logging.info("I: sending %s to broker", command)
dump(msg)
self.worker.send_multipart(msg)
def recv(self, reply=None):
"""Send reply, if any, to broker and wait for next request."""
# Format and send the reply if we were provided one
assert reply is not None or not self.expect_reply
if reply is not None:
assert self.reply_to is not None
reply = [self.reply_to, b''] + reply
self.send_to_broker(MDP.W_REPLY, msg=reply)
self.expect_reply = True
while True:
# Poll socket for a reply, with timeout
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
break # Interrupted
if items:
msg = self.worker.recv_multipart()
if self.verbose:
logging.info("I: received message from broker: ")
dump(msg)
self.liveness = self.HEARTBEAT_LIVENESS
# Don't try to handle errors, just assert noisily
assert len(msg) >= 3
empty = msg.pop(0)
assert empty == b''
header = msg.pop(0)
assert header == MDP.W_WORKER
command = msg.pop(0)
if command == MDP.W_REQUEST:
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one...
self.reply_to = msg.pop(0)
# pop empty
empty = msg.pop(0)
assert empty == b''
return msg # We have a request to process
elif command == MDP.W_HEARTBEAT:
# Do nothing for heartbeats
pass
elif command == MDP.W_DISCONNECT:
self.reconnect_to_broker()
else :
logging.error("E: invalid input message: ")
dump(msg)
else:
self.liveness -= 1
if self.liveness == 0:
if self.verbose:
logging.warn("W: disconnected from broker - retrying...")
try:
time.sleep(1e-3*self.reconnect)
except KeyboardInterrupt:
break
self.reconnect_to_broker()
# Send HEARTBEAT if it's time
if time.time() > self.heartbeat_at:
self.send_to_broker(MDP.W_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3*self.heartbeat
logging.warn("W: interrupt received, killing worker...")
return None
def destroy(self):
# context.destroy depends on pyzmq >= 2.1.10
self.ctx.destroy(0)
示例Majordomo Protocol worker example
"""Majordomo Protocol worker example.
Uses the mdwrk API to hide all MDP aspects
Author: Min RK <benjaminrk@gmail.com>
"""
import sys
from mdwrkapi import MajorDomoWorker
def main():
verbose = '-v' in sys.argv
worker = MajorDomoWorker("tcp://localhost:5555", b"echo", verbose)
reply = None
while True:
request = worker.recv(reply)
if request is None:
break # Worker was interrupted
reply = request # Echo is complex... :-)
if __name__ == '__main__':
main()
示例Majordomo Protocol broker
# coding=gbk
"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""
import logging
import sys
import time
from binascii import hexlify
import zmq
# local
import MDP
from zhelpers import dump
class Service(object):
"""a single Service"""
name = None # Service name
requests = None # List of client requests
waiting = None # List of waiting workers
def __init__(self, name):
self.name = name
self.requests = []
self.waiting = []
class Worker(object):
"""a Worker, idle or active"""
identity = None # hex Identity of worker
address = None # Address to route to
service = None # Owning service, if known
expiry = None # expires at this point, unless heartbeat
def __init__(self, identity, address, lifetime):
self.identity = identity
self.address = address
self.expiry = time.time() + 1e-3*lifetime
class MajorDomoBroker(object):
"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
"""
# We'd normally pull these from config data
INTERNAL_SERVICE_PREFIX = b"mmi."
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
HEARTBEAT_INTERVAL = 2500 # msecs
HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
# ---------------------------------------------------------------------
ctx = None # Our context
socket = None # Socket for clients & workers
poller = None # our Poller
heartbeat_at = None# When to send HEARTBEAT
services = None # known services
workers = None # known workers
waiting = None # idle workers
verbose = False # Print activity to stdout
# ---------------------------------------------------------------------
def __init__(self, verbose=False):
"""Initialize broker state."""
self.verbose = verbose
self.services = {}
self.workers = {}
self.waiting = []
self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.ROUTER)
self.socket.linger = 0
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
logging.basicConfig(format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
# ---------------------------------------------------------------------
def mediate(self):
"""Main broker work happens here"""
while True:
try:
items = self.poller.poll(self.HEARTBEAT_INTERVAL)
except KeyboardInterrupt:
break # Interrupted
if items:
msg = self.socket.recv_multipart()
if self.verbose:
logging.info("I: received message:")
dump(msg)
sender = msg.pop(0)
empty = msg.pop(0)
assert empty == b''
header = msg.pop(0)
if (MDP.C_CLIENT == header):
self.process_client(sender, msg)
elif (MDP.W_WORKER == header):
self.process_worker(sender, msg)
else:
logging.error("E: invalid message:")
dump(msg)
self.purge_workers()
self.send_heartbeats()
def destroy(self):
"""Disconnect all workers, destroy context."""
while self.workers:
self.delete_worker(self.workers.values()[0], True)
self.ctx.destroy(0)
def process_client(self, sender, msg):
"""Process a request coming from a client."""
assert len(msg) >= 2 # Service name + body
service = msg.pop(0)
# Set reply return address to client sender
msg = [sender, b''] + msg
if service.startswith(self.INTERNAL_SERVICE_PREFIX):
self.service_internal(service, msg)
else:
self.dispatch(self.require_service(service), msg)
def process_worker(self, sender, msg):
"""Process message sent to us by a worker."""
assert len(msg) >= 1 # At least, command
command = msg.pop(0)
worker_ready = hexlify(sender) in self.workers
worker = self.require_worker(sender)
if (MDP.W_READY == command):
assert len(msg) >= 1 # At least, a service name
service = msg.pop(0)
# Not first command in session or Reserved service name
if (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):
self.delete_worker(worker, True)
else:
# Attach worker to service and mark as idle
worker.service = self.require_service(service)
self.worker_waiting(worker)
elif (MDP.W_REPLY == command):
if (worker_ready):
# Remove & save client return envelope and insert the
# protocol header and service name, then rewrap envelope.
client = msg.pop(0)
empty = msg.pop(0) # ?
msg = [client, b'', MDP.C_CLIENT, worker.service.name] + msg
self.socket.send_multipart(msg)
self.worker_waiting(worker)
else:
self.delete_worker(worker, True)
elif (MDP.W_HEARTBEAT == command):
if (worker_ready):
worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
else:
self.delete_worker(worker, True)
elif (MDP.W_DISCONNECT == command):
self.delete_worker(worker, False)
else:
logging.error("E: invalid message:")
dump(msg)
def delete_worker(self, worker, disconnect):
"""Deletes worker from all data structures, and deletes worker."""
assert worker is not None
if disconnect:
self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)
if worker.service is not None:
worker.service.waiting.remove(worker)
self.workers.pop(worker.identity)
def require_worker(self, address):
"""Finds the worker (creates if necessary)."""
assert (address is not None)
identity = hexlify(address)
worker = self.workers.get(identity)
if (worker is None):
worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)
self.workers[identity] = worker
if self.verbose:
logging.info("I: registering new worker: %s", identity)
return worker
def require_service(self, name):
"""Locates the service (creates if necessary)."""
assert (name is not None)
service = self.services.get(name)
if (service is None):
service = Service(name)
self.services[name] = service
return service
def bind(self, endpoint):
"""Bind broker to endpoint, can call this multiple times.
We use a single socket for both clients and workers.
"""
self.socket.bind(endpoint)
logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)
def service_internal(self, service, msg):
"""Handle internal service according to 8/MMI specification"""
returncode = b"501"
if b"mmi.service" == service:
name = msg[-1]
returncode = b"200" if name in self.services else b"404"
msg[-1] = returncode
# insert the protocol header and service name after the routing envelope ([client, ''])
msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]
self.socket.send_multipart(msg)
def send_heartbeats(self):
"""Send heartbeats to idle workers if it's time"""
if (time.time() > self.heartbeat_at):
for worker in self.waiting:
self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)
self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
def purge_workers(self):
"""Look for & kill expired workers.
Workers are oldest to most recent, so we stop at the first alive worker.
"""
while self.waiting:
w = self.waiting[0]
if w.expiry < time.time():
logging.info("I: deleting expired worker: %s", w.identity)
self.delete_worker(w,False)
self.waiting.pop(0)
else:
break
def worker_waiting(self, worker):
"""This worker is now waiting for work."""
# Queue to broker and service waiting lists
self.waiting.append(worker)
worker.service.waiting.append(worker)
worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
self.dispatch(worker.service, None)
def dispatch(self, service, msg):
"""Dispatch requests to waiting workers as possible"""
assert (service is not None)
if msg is not None:# Queue message if any
service.requests.append(msg)
self.purge_workers()
while service.waiting and service.requests:
msg = service.requests.pop(0)
worker = service.waiting.pop(0)
self.waiting.remove(worker)
self.send_to_worker(worker, MDP.W_REQUEST, None, msg)
def send_to_worker(self, worker, command, option, msg=None):
"""Send message to worker.
If message is provided, sends that message.
"""
if msg is None:
msg = []
elif not isinstance(msg, list):
msg = [msg]
# Stack routing and protocol envelopes to start of message
# and routing envelope
if option is not None:
msg = [option] + msg
msg = [worker.address, b'', MDP.W_WORKER, command] + msg
if self.verbose:
logging.info("I: sending %r to worker", command)
dump(msg)
self.socket.send_multipart(msg)
def main():
"""create and start new broker"""
verbose = '-v' in sys.argv
broker = MajorDomoBroker(verbose)
broker.bind("tcp://*:5555")
broker.mediate()
if __name__ == '__main__':
main()
2、 异步管家模式
示例1Round-trip demonstrator
"""Round-trip demonstrator
While this example runs in a single process, that is just to make
it easier to start and stop the example. Client thread signals to
main when it's ready.
"""
import sys
import threading
import time
import zmq
from zhelpers import zpipe
def client_task (ctx, pipe):
client = ctx.socket(zmq.DEALER)
client.identity = b'C'
client.connect("tcp://localhost:5555")
print ("Setting up test...")
time.sleep(0.1)
print ("Synchronous round-trip test...")
start = time.time()
requests = 10000
for r in range(requests):
client.send(b"hello")
client.recv()
print (" %d calls/second" % (requests / (time.time()-start)))
print ("Asynchronous round-trip test...")
start = time.time()
for r in range(requests):
client.send(b"hello")
for r in range(requests):
client.recv()
print (" %d calls/second" % (requests / (time.time()-start)))
# signal done:
pipe.send(b"done")
def worker_task():
ctx = zmq.Context()
worker = ctx.socket(zmq.DEALER)
worker.identity = b'W'
worker.connect("tcp://localhost:5556")
while True:
msg = worker.recv_multipart()
worker.send_multipart(msg)
ctx.destroy(0)
def broker_task():
# Prepare our context and sockets
ctx = zmq.Context()
frontend = ctx.socket(zmq.ROUTER)
backend = ctx.socket(zmq.ROUTER)
frontend.bind("tcp://*:5555")
backend.bind("tcp://*:5556")
# Initialize poll set
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
poller.register(frontend, zmq.POLLIN)
while True:
try:
items = dict(poller.poll())
except:
break # Interrupted
if frontend in items:
msg = frontend.recv_multipart()
msg[0] = b'W'
backend.send_multipart(msg)
if backend in items:
msg = backend.recv_multipart()
msg[0] = b'C'
frontend.send_multipart(msg)
def main():
# Create threads
ctx = zmq.Context()
client,pipe = zpipe(ctx)
client_thread = threading.Thread(target=client_task, args=(ctx, pipe))
worker_thread = threading.Thread(target=worker_task)
worker_thread.daemon=True
broker_thread = threading.Thread(target=broker_task)
broker_thread.daemon=True
worker_thread.start()
broker_thread.start()
client_thread.start()
# Wait for signal on client pipe
client.recv()
if __name__ == '__main__':
main()
示例2Majordomo Protocol Client API
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""
import logging
import zmq
import MDP
from zhelpers import dump
class MajorDomoClient(object):
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
broker = None
ctx = None
client = None
poller = None
timeout = 2500
verbose = False
def __init__(self, broker, verbose=False):
self.broker = broker
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.client:
self.poller.unregister(self.client)
self.client.close()
self.client = self.ctx.socket(zmq.DEALER)
self.client.linger = 0
self.client.connect(self.broker)
self.poller.register(self.client, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
def send(self, service, request):
"""Send request to broker
"""
if not isinstance(request, list):
request = [request]
# Prefix request with protocol frames
# Frame 0: empty (REQ emulation)
# Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
# Frame 2: Service name (printable string)
request = [b'', MDP.C_CLIENT, service] + request
if self.verbose:
logging.warn("I: send request to '%s' service: ", service)
dump(request)
self.client.send_multipart(request)
def recv(self):
"""Returns the reply message or None if there was no reply."""
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
return # interrupted
if items:
# if we got a reply, process it
msg = self.client.recv_multipart()
if self.verbose:
logging.info("I: received reply:")
dump(msg)
# Don't try to handle errors, just assert noisily
assert len(msg) >= 4
empty = msg.pop(0)
header = msg.pop(0)
assert MDP.C_CLIENT == header
service = msg.pop(0)
return msg
else:
logging.warn("W: permanent error, abandoning request")
示例3Majordomo Protocol client example
"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
Author : Min RK <benjaminrk@gmail.com>
"""
import sys
from mdcliapi2 import MajorDomoClient
def main():
verbose = '-v' in sys.argv
client = MajorDomoClient("tcp://localhost:5555", verbose)
requests = 100000
for i in range(requests):
request = b"Hello world"
try:
client.send(b"echo", request)
except KeyboardInterrupt:
print ("send interrupted, aborting")
return
count = 0
while count < requests:
try:
reply = client.recv()
except KeyboardInterrupt:
break
else:
# also break on failure to reply:
if reply is None:
break
count += 1
print ("%i requests/replies processed" % count)
if __name__ == '__main__':
main()
用以上的测试程序来衡量往返的实际成本。在这个测试中,我们发出了一堆消息,首先逐个等待每个消息的应答,其次作为批处理,将所有应答作为一个批次来读取。虽然这两种方法都做同样的工作,但它们给出了非常不同的结果。
示例1:在最简单的情况下,往返比异步要慢20倍
示例3:通过10W请求-应答循环,使用了一个worker的异步客户端比使用同步客户端快两倍。
3、 服务发现
•当客户端请求了以mmi.开头的服务时,我们不将这个请求路由到一个worker,而是在 内部处理它。
•我们在代理中只处理一个服务,这个服务是mmi.service(服务发现服务)。
•请求的有效载荷是一个外部服务的名称(一个实际的名称,由worker提供)。
•该代理将返回"200”(OK)或“404”(未找到),这取决于是否有worker注册那个服务。
示例MMI echo query example
"""
MMI echo query example
Author : Min RK <benjaminrk@gmail.com>
"""
import sys
from mdcliapi import MajorDomoClient
def main():
verbose = '-v' in sys.argv
client = MajorDomoClient("tcp://localhost:5555", verbose)
request = b"echo"
reply = client.send(b"mmi.service", request)
if reply:
replycode = reply[0]
print ("Lookup echo service:", replycode)
else:
print ("E: no response from broker, make sure it's running")
if __name__ == '__main__':
main()
4、 幂等服务(Idempotent Services)
幂等含义:重复某个操作是安全的。例如,检查时钟是幂等的,将信用卡借给小孩子则不是幂等的。
当服务器应用程序不是幂等的时候,我们必须更仔细地考虑,它们何时可能会崩溃。为了处理非幂等操作,我们使用检测和拒绝重复请求的解决方案。
- 客户端必须为每个请求加盖一个独特的客户端标识符和一个唯一的消息编号。
- 服务器在发回一个应答之前,使用客户端1D和消息编号的组合作为键来存储它。
- 服务器从给定的客户端获取请求时,首先检查它是否拥有该客户端ID和消息编号的 应答。如果有这样的应答,它就不处理该请求,而只是重新发送应答。
5、 断开连接的可靠性(泰坦尼克模式)
在此模式中,无论客户端和工人连接是多么零散,将消息写入磁盘,确保它们不会丢失。在MDP之上放一层Titanic,而不是扩展MDP。
它有下面几个优点:
- 代理处理消息路由,而worker处理可靠性,分而治之。
- 它允许我们将用一种语言编写的代理与用另一种语言编写的worker混用。
- 它允许我们独立地开发一劳永逸的(fire-and-forget)技术。
Titanic既是worker又是client
示例Titanic client example
"""
Titanic client example
Implements client side of http:rfc.zeromq.org/spec:9
Author : Min RK <benjaminrk@gmail.com>
"""
import sys
import time
from mdcliapi import MajorDomoClient
def service_call (session, service, request):
"""Calls a TSP service
Returns reponse if successful (status code 200 OK), else None
"""
reply = session.send(service, request)
if reply:
status = reply.pop(0)
if status == b"200":
return reply
elif status == b"400":
print ("E: client fatal error 400, aborting")
sys.exit (1)
elif status == b"500":
print ("E: server fatal error 500, aborting")
sys.exit (1)
else:
sys.exit (0); # Interrupted or failed
def main():
verbose = '-v' in sys.argv
session = MajorDomoClient("tcp://localhost:5555", verbose)
# 1. Send 'echo' request to Titanic
request = [b"echo", b"Hello world"]
reply = service_call(session, b"titanic.request", request)
uuid = None
if reply:
uuid = reply.pop(0)
print ("I: request UUID ", uuid)
# 2. Wait until we get a reply
while True:
time.sleep (.1)
request = [uuid]
reply = service_call (session, b"titanic.reply", request)
if reply:
reply_string = reply[-1]
print ("I: reply:", reply_string)
# 3. Close request
request = [uuid]
reply = service_call (session, b"titanic.close", request)
break
else:
print ("I: no reply yet, trying again...")
time.sleep(5) # Try again in 5 seconds
return 0
if __name__ == '__main__':
main()
示例Titanic service
"""
Titanic service
Implements server side of http:#rfc.zeromq.org/spec:9
Author: Min RK <benjaminrk@gmail.com>
"""
import pickle
import os
import sys
import threading
import time
from uuid import uuid4
import zmq
from mdwrkapi import MajorDomoWorker
from mdcliapi import MajorDomoClient
from zhelpers import zpipe
TITANIC_DIR = ".titanic"
def request_filename (suuid):
"""Returns freshly allocated request filename for given UUID str"""
return os.path.join(TITANIC_DIR, "%s.req" % suuid)
#
def reply_filename (suuid):
"""Returns freshly allocated reply filename for given UUID str"""
return os.path.join(TITANIC_DIR, "%s.rep" % suuid)
# ---------------------------------------------------------------------
# Titanic request service
def titanic_request (pipe):
worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.request")
reply = None
while True:
# Send reply if it's not null
# And then get next request from broker
request = worker.recv(reply)
if not request:
break # Interrupted, exit
# Ensure message directory exists
if not os.path.exists(TITANIC_DIR):
os.mkdir(TITANIC_DIR)
# Generate UUID and save message to disk
suuid = uuid4().hex
filename = request_filename (suuid)
with open(filename, 'wb') as f:
pickle.dump(request, f)
# Send UUID through to message queue
pipe.send_string(suuid)
# Now send UUID back to client
# Done by the worker.recv() at the top of the loop
reply = [b"200", suuid.encode('utf-8')]
# ---------------------------------------------------------------------
# Titanic reply service
def titanic_reply ():
worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.reply")
reply = None
while True:
request = worker.recv(reply)
if not request:
break # Interrupted, exit
suuid = request.pop(0).decode('utf-8')
req_filename = request_filename(suuid)
rep_filename = reply_filename(suuid)
if os.path.exists(rep_filename):
with open(rep_filename, 'rb') as f:
reply = pickle.load(f)
reply = [b"200"] + reply
else:
if os.path.exists(req_filename):
reply = [b"300"] # pending
else:
reply = [b"400"] # unknown
# ---------------------------------------------------------------------
# Titanic close service
def titanic_close():
worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.close")
reply = None
while True:
request = worker.recv(reply)
if not request:
break # Interrupted, exit
suuid = request.pop(0).decode('utf-8')
req_filename = request_filename(suuid)
rep_filename = reply_filename(suuid)
# should these be protected? Does zfile_delete ignore files
# that have already been removed? That's what we are doing here.
if os.path.exists(req_filename):
os.remove(req_filename)
if os.path.exists(rep_filename):
os.remove(rep_filename)
reply = [b"200"]
def service_success(client, suuid):
"""Attempt to process a single request, return True if successful"""
# Load request message, service will be first frame
filename = request_filename (suuid)
# If the client already closed request, treat as successful
if not os.path.exists(filename):
return True
with open(filename, 'rb') as f:
request = pickle.load(f)
service = request.pop(0)
# Use MMI protocol to check if service is available
mmi_request = [service]
mmi_reply = client.send(b"mmi.service", mmi_request)
service_ok = mmi_reply and mmi_reply[0] == b"200"
if service_ok:
reply = client.send(service, request)
if reply:
filename = reply_filename (suuid)
with open(filename, "wb") as f:
pickle.dump(reply, f)
return True
return False
def main():
verbose = '-v' in sys.argv
ctx = zmq.Context()
# Create MDP client session with short timeout
client = MajorDomoClient("tcp://localhost:5555", verbose)
client.timeout = 1000 # 1 sec
client.retries = 1 # only 1 retry
request_pipe, peer = zpipe(ctx)
request_thread = threading.Thread(target=titanic_request, args=(peer,))
request_thread.daemon = True
request_thread.start()
reply_thread = threading.Thread(target=titanic_reply)
reply_thread.daemon = True
reply_thread.start()
close_thread = threading.Thread(target=titanic_close)
close_thread.daemon = True
close_thread.start()
poller = zmq.Poller()
poller.register(request_pipe, zmq.POLLIN)
queue_filename = os.path.join(TITANIC_DIR, 'queue')
# Main dispatcher loop
while True:
# Ensure message directory exists
if not os.path.exists(TITANIC_DIR):
os.mkdir(TITANIC_DIR)
f = open(queue_filename,'wb')
f.close()
# We'll dispatch once per second, if there's no activity
try:
items = poller.poll(1000)
except KeyboardInterrupt:
break; # Interrupted
if items:
# Append UUID to queue, prefixed with '-' for pending
suuid = request_pipe.recv().decode('utf-8')
with open(queue_filename, 'ab') as f:
line = "-%sn" % suuid
f.write(line.encode('utf-8'))
# Brute-force dispatcher
with open(queue_filename, 'rb+') as f:
for entry in f.readlines():
entry = entry.decode('utf-8')
# UUID is prefixed with '-' if still waiting
if entry[0] == '-':
suuid = entry[1:].rstrip() # rstrip 'n' etc.
print ("I: processing request %s" % suuid)
if service_success(client, suuid):
# mark queue entry as processed
here = f.tell()
f.seek(-1*len(entry), os.SEEK_CUR)
f.write('+'.encode('utf-8'))
f.seek(here, os.SEEK_SET)
if __name__ == '__main__':
main()
6、 高可用性对(双星模式)
双星模式(Binary Star Pattern)配置两台服务器作为主/备用高可用性对。在任何给定的时间内,其中一台(活动服务器)接受来自客户端应用程序的连接。另一台(被动服务器)不执行任何操作,但两台服务器相互监视。如果活动的那台服务器从网络消失,经过一定时间,被动的服务器就接管工作,充当活动服务器。
正常运行的高可用性对
故障转移过程中的高可用性对
从故障转移恢复的工作原理如下:
•操作者修复导致主服务器从网络上消失的问题,并重新启动它。
•操作者在备用服务器会对应用程序造成最小中断的时候,停止它。
•当应用程序已重新连接到主服务器时,操作者重新启动备用服务器。
示例Binary Star Server
# Binary Star Server
#
# Author: Dan Colish <dcolish@gmail.com>
from argparse import ArgumentParser
import time
from zhelpers import zmq
STATE_PRIMARY = 1
STATE_BACKUP = 2
STATE_ACTIVE = 3
STATE_PASSIVE = 4
PEER_PRIMARY = 1
PEER_BACKUP = 2
PEER_ACTIVE = 3
PEER_PASSIVE = 4
CLIENT_REQUEST = 5
HEARTBEAT = 1000
class BStarState(object):
def __init__(self, state, event, peer_expiry):
self.state = state
self.event = event
self.peer_expiry = peer_expiry
class BStarException(Exception):
pass
fsm_states = {
STATE_PRIMARY: {
PEER_BACKUP: ("I: connected to backup (slave), ready as master",
STATE_ACTIVE),
PEER_ACTIVE: ("I: connected to backup (master), ready as slave",
STATE_PASSIVE)
},
STATE_BACKUP: {
PEER_ACTIVE: ("I: connected to primary (master), ready as slave",
STATE_PASSIVE),
CLIENT_REQUEST: ("", False)
},
STATE_ACTIVE: {
PEER_ACTIVE: ("E: fatal error - dual masters, aborting", False)
},
STATE_PASSIVE: {
PEER_PRIMARY: ("I: primary (slave) is restarting, ready as master",
STATE_ACTIVE),
PEER_BACKUP: ("I: backup (slave) is restarting, ready as master",
STATE_ACTIVE),
PEER_PASSIVE: ("E: fatal error - dual slaves, aborting", False),
CLIENT_REQUEST: (CLIENT_REQUEST, True) # Say true, check peer later
}
}
def run_fsm(fsm):
# There are some transitional states we do not want to handle
state_dict = fsm_states.get(fsm.state, {})
res = state_dict.get(fsm.event)
if res:
msg, state = res
else:
return
if state is False:
raise BStarException(msg)
elif msg == CLIENT_REQUEST:
assert fsm.peer_expiry > 0
if int(time.time() * 1000) > fsm.peer_expiry:
fsm.state = STATE_ACTIVE
else:
raise BStarException()
else:
print(msg)
fsm.state = state
def main():
parser = ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument("-p", "--primary", action="store_true", default=False)
group.add_argument("-b", "--backup", action="store_true", default=False)
args = parser.parse_args()
ctx = zmq.Context()
statepub = ctx.socket(zmq.PUB)
statesub = ctx.socket(zmq.SUB)
statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
frontend = ctx.socket(zmq.ROUTER)
fsm = BStarState(0, 0, 0)
if args.primary:
print("I: Primary master, waiting for backup (slave)")
frontend.bind("tcp://*:5001")
statepub.bind("tcp://*:5003")
statesub.connect("tcp://localhost:5004")
fsm.state = STATE_PRIMARY
elif args.backup:
print("I: Backup slave, waiting for primary (master)")
frontend.bind("tcp://*:5002")
statepub.bind("tcp://*:5004")
statesub.connect("tcp://localhost:5003")
statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
fsm.state = STATE_BACKUP
send_state_at = int(time.time() * 1000 + HEARTBEAT)
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(statesub, zmq.POLLIN)
while True:
time_left = send_state_at - int(time.time() * 1000)
if time_left < 0:
time_left = 0
socks = dict(poller.poll(time_left))
if socks.get(frontend) == zmq.POLLIN:
msg = frontend.recv_multipart()
fsm.event = CLIENT_REQUEST
try:
run_fsm(fsm)
frontend.send_multipart(msg)
except BStarException:
del msg
if socks.get(statesub) == zmq.POLLIN:
msg = statesub.recv()
fsm.event = int(msg)
del msg
try:
run_fsm(fsm)
fsm.peer_expiry = int(time.time() * 1000) + (2 * HEARTBEAT)
except BStarException:
break
if int(time.time() * 1000) >= send_state_at:
statepub.send("%d" % fsm.state)
send_state_at = int(time.time() * 1000) + HEARTBEAT
if __name__ == '__main__':
main()
示例Binary Star Client
from time import sleep
import zmq
REQUEST_TIMEOUT = 1000 # msecs
SETTLE_DELAY = 2000 # before failing over
def main():
server = ['tcp://localhost:5001', 'tcp://localhost:5002']
server_nbr = 0
ctx = zmq.Context()
client = ctx.socket(zmq.REQ)
client.connect(server[server_nbr])
poller = zmq.Poller()
poller.register(client, zmq.POLLIN)
sequence = 0
while True:
client.send_string("%s" % sequence)
expect_reply = True
while expect_reply:
socks = dict(poller.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv_string()
if int(reply) == sequence:
print("I: server replied OK (%s)" % reply)
expect_reply = False
sequence += 1
sleep(1)
else:
print("E: malformed reply from server: %s" % reply)
else:
print("W: no response from server, failing over")
sleep(SETTLE_DELAY / 1000)
poller.unregister(client)
client.close()
server_nbr = (server_nbr + 1) % 2
print("I: connecting to server at %s.." % server[server_nbr])
client = ctx.socket(zmq.REQ)
poller.register(client, zmq.POLLIN)
# reconnect and resend request
client.connect(server[server_nbr])
client.send_string("%s" % sequence)
if __name__ == '__main__':
main()
双星有限状态机
五、无代理可靠性(自由职业者模式)
ZMQ虽然不强加给你一个以代理为中心的架构,但它提供了构建代理的工具。通过解构迄今为止已经建成的以代理为基础的可靠性,并把它们放入一个称之为自由职业者模式的分布式对等架构。
模式一:简单的重试和故障转移
示例Freelance server - Model 1
#
# Freelance server - Model 1
# Trivial echo service
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
import sys
import zmq
if len(sys.argv) < 2:
print "I: Syntax: %s <endpoint>" % sys.argv[0]
sys.exit(0)
endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)
print "I: Echo service is ready at %s" % endpoint
while True:
msg = server.recv_multipart()
if not msg:
break # Interrupted
server.send_multipart(msg)
server.setsockopt(zmq.LINGER, 0) # Terminate immediately
示例Freelance Client - Model 1
#
# Freelance Client - Model 1
# Uses REQ socket to query one or more services
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
import sys
import time
import zmq
REQUEST_TIMEOUT = 1000 # ms
MAX_RETRIES = 3 # Before we abandon
def try_request(ctx, endpoint, request):
print "I: Trying echo service at %s..." % endpoint
client = ctx.socket(zmq.REQ)
client.setsockopt(zmq.LINGER, 0) # Terminate early
client.connect(endpoint)
client.send(request)
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv_multipart()
else:
reply = ''
poll.unregister(client)
client.close()
return reply
context = zmq.Context()
request = "Hello world"
reply = None
endpoints = len(sys.argv) - 1
if endpoints == 0:
print "I: syntax: %s <endpoint> ..." % sys.argv[0]
elif endpoints == 1:
# For one endpoint, we retry N times
endpoint = sys.argv[1]
for retries in xrange(MAX_RETRIES):
reply = try_request(context, endpoint, request)
if reply:
break # Success
print "W: No response from %s, retrying" % endpoint
else:
# For multiple endpoints, try each at most once
for endpoint in sys.argv[1:]:
reply = try_request(context, endpoint, request)
if reply:
break # Success
print "W: No response from %s" % endpoint
if reply:
print "Service is running OK"
模式二:粗暴猎枪屠杀
示例Freelance server - Model 2
#
# Freelance server - Model 2
# Does some work, replies OK, with message sequencing
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
import sys
import zmq
if len(sys.argv) < 2:
print "I: Syntax: %s <endpoint>" % sys.argv[0]
sys.exit(0)
endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)
print "I: Service is ready at %s" % endpoint
while True:
request = server.recv_multipart()
if not request:
break # Interrupted
# Fail nastily if run against wrong client
assert len(request) == 2
address = request[0]
reply = [address, "OK"]
server.send_multipart(reply)
server.setsockopt(zmq.LINGER, 0) # Terminate early
示例Freelance Client - Model 2
#
# Freelance Client - Model 2
# Uses DEALER socket to blast one or more services
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
import sys
import time
import zmq
GLOBAL_TIMEOUT = 2500 # ms
class FLClient(object):
def __init__(self):
self.servers = 0
self.sequence = 0
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER) # DEALER
def destroy(self):
self.socket.setsockopt(zmq.LINGER, 0) # Terminate early
self.socket.close()
self.context.term()
def connect(self, endpoint):
self.socket.connect(endpoint)
self.servers += 1
print "I: Connected to %s" % endpoint
def request(self, *request):
# Prefix request with sequence number and empty envelope
self.sequence += 1
msg = ['', str(self.sequence)] + list(request)
# Blast the request to all connected servers
for server in xrange(self.servers):
self.socket.send_multipart(msg)
# Wait for a matching reply to arrive from anywhere
# Since we can poll several times, calculate each one
poll = zmq.Poller()
poll.register(self.socket, zmq.POLLIN)
reply = None
endtime = time.time() + GLOBAL_TIMEOUT / 1000
while time.time() < endtime:
socks = dict(poll.poll((endtime - time.time()) * 1000))
if socks.get(self.socket) == zmq.POLLIN:
reply = self.socket.recv_multipart()
assert len(reply) == 3
sequence = int(reply[1])
if sequence == self.sequence:
break
return reply
if len(sys.argv) == 1:
print "I: Usage: %s <endpoint> ..." % sys.argv[0]
sys.exit(0)
# Create new freelance client object
client = FLClient()
for endpoint in sys.argv[1:]:
client.connect(endpoint)
start = time.time()
for requests in xrange(10000):
request = "random name"
reply = client.request(request)
if not reply:
print "E: Name service not available, aborting"
break
print "Average round trip cost: %i usec" % ((time.time() - start) / 100)
client.destroy()
模式三:复杂和讨厌的
示例Freelance server - Model 3
"""Freelance server - Model 3
Uses an ROUTER/ROUTER socket but just one thread
Author: Min RK <benjaminrk@gmail.com>
"""
import sys
import zmq
from zhelpers import dump
def main():
verbose = '-v' in sys.argv
ctx = zmq.Context()
# Prepare server socket with predictable identity
bind_endpoint = "tcp://*:5555"
connect_endpoint = "tcp://localhost:5555"
server = ctx.socket(zmq.ROUTER)
server.identity = connect_endpoint
server.bind(bind_endpoint)
print "I: service is ready at", bind_endpoint
while True:
try:
request = server.recv_multipart()
except:
break # Interrupted
# Frame 0: identity of client
# Frame 1: PING, or client control frame
# Frame 2: request body
address, control = request[:2]
reply = [address, control]
if control == "PING":
reply[1] = "PONG"
else:
reply.append("OK")
if verbose:
dump(reply)
server.send_multipart(reply)
print "W: interrupted"
if __name__ == '__main__':
main()
示例Freelance client - Model 3
"""
Freelance client - Model 3
Uses flcliapi class to encapsulate Freelance pattern
Author : Min RK <benjaminrk@gmail.com>
"""
import time
from flcliapi import FreelanceClient
def main():
# Create new freelance client object
client = FreelanceClient()
# Connect to several endpoints
client.connect ("tcp://localhost:5555")
client.connect ("tcp://localhost:5556")
client.connect ("tcp://localhost:5557")
# Send a bunch of name resolution 'requests', measure time
requests = 10000
start = time.time()
for i in range(requests):
request = ["random name"]
reply = client.request(request)
if not reply:
print "E: name service not available, aborting"
return
print "Average round trip cost: %d usec" % (1e6*(time.time() - start) / requests)
if __name__ == '__main__':
main()
示例Freelance Pattern agent class
"""
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services
Author: Min RK <benjaminrk@gmail.com>
"""
import threading
import time
import zmq
from zhelpers import zpipe
# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 3000 # msecs
# PING interval for servers we think are alivecp
PING_INTERVAL = 2000 # msecs
# Server considered dead if silent for this long
SERVER_TTL = 6000 # msecs
def flciapi_agent(peer):
"""This is the thread that handles our real flcliapi class
"""
pass
# =====================================================================
# Synchronous part, works in our application thread
class FreelanceClient(object):
ctx = None # Our Context
pipe = None # Pipe through to flciapi agent
agent = None # agent in a thread
def __init__(self):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
self.agent = threading.Thread(target=agent_task, args=(self.ctx,peer))
self.agent.daemon = True
self.agent.start()
def connect(self, endpoint):
"""Connect to new server endpoint
Sends [CONNECT][endpoint] to the agent
"""
self.pipe.send_multipart(["CONNECT", endpoint])
time.sleep(0.1) # Allow connection to come up
def request(self, msg):
"Send request, get reply"
request = ["REQUEST"] + msg
self.pipe.send_multipart(request)
reply = self.pipe.recv_multipart()
status = reply.pop(0)
if status != "FAILED":
return reply
# =====================================================================
# Asynchronous part, works in the background
# ---------------------------------------------------------------------
# Simple class for one server we talk to
class FreelanceServer(object):
endpoint = None # Server identity/endpoint
alive = True # 1 if known to be alive
ping_at = 0 # Next ping at this time
expires = 0 # Expires at this time
def __init__(self, endpoint):
self.endpoint = endpoint
self.alive = True
self.ping_at = time.time() + 1e-3*PING_INTERVAL
self.expires = time.time() + 1e-3*SERVER_TTL
def ping(self, socket):
if time.time() > self.ping_at:
socket.send_multipart([self.endpoint, 'PING'])
self.ping_at = time.time() + 1e-3*PING_INTERVAL
def tickless(self, tickless):
if tickless > self.ping_at:
tickless = self.ping_at
return tickless
# ---------------------------------------------------------------------
# Simple class for one background agent
class FreelanceAgent(object):
ctx = None # Own context
pipe = None # Socket to talk back to application
router = None # Socket to talk to servers
servers = None # Servers we've connected to
actives = None # Servers we know are alive
sequence = 0 # Number of requests ever sent
request = None # Current request if any
reply = None # Current reply if any
expires = 0 # Timeout for request/reply
def __init__(self, ctx, pipe):
self.ctx = ctx
self.pipe = pipe
self.router = ctx.socket(zmq.ROUTER)
self.servers = {}
self.actives = []
def control_message (self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)
if command == "CONNECT":
endpoint = msg.pop(0)
print "I: connecting to %s...n" % endpoint,
self.router.connect(endpoint)
server = FreelanceServer(endpoint)
self.servers[endpoint] = server
self.actives.append(server)
# these are in the C case, but seem redundant:
server.ping_at = time.time() + 1e-3*PING_INTERVAL
server.expires = time.time() + 1e-3*SERVER_TTL
elif command == "REQUEST":
assert not self.request # Strict request-reply cycle
# Prefix request with sequence number and empty envelope
self.request = [str(self.sequence), ''] + msg
# Request expires after global timeout
self.expires = time.time() + 1e-3*GLOBAL_TIMEOUT
def router_message (self):
reply = self.router.recv_multipart()
# Frame 0 is server that replied
endpoint = reply.pop(0)
server = self.servers[endpoint]
if not server.alive:
self.actives.append(server)
server.alive = 1
server.ping_at = time.time() + 1e-3*PING_INTERVAL
server.expires = time.time() + 1e-3*SERVER_TTL;
# Frame 1 may be sequence number for reply
sequence = reply.pop(0)
if int(sequence) == self.sequence:
self.sequence += 1
reply = ["OK"] + reply
self.pipe.send_multipart(reply)
self.request = None
# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.
def agent_task(ctx, pipe):
agent = FreelanceAgent(ctx, pipe)
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
poller.register(agent.router, zmq.POLLIN)
while True:
# Calculate tickless timer, up to 1 hour
tickless = time.time() + 3600
if (agent.request and tickless > agent.expires):
tickless = agent.expires
for server in agent.servers.values():
tickless = server.tickless(tickless)
try:
items = dict(poller.poll(1000 * (tickless - time.time())))
except:
break # Context has been shut down
if agent.pipe in items:
agent.control_message()
if agent.router in items:
agent.router_message()
# If we're processing a request, dispatch to next server
if (agent.request):
if (time.time() >= agent.expires):
# Request expired, kill it
agent.pipe.send("FAILED")
agent.request = None
else:
# Find server to talk to, remove any expired ones
while agent.actives:
server = agent.actives[0]
if time.time() >= server.expires:
server.alive = 0
agent.actives.pop(0)
else:
request = [server.endpoint] + agent.request
agent.router.send_multipart(request)
break
# Disconnect and delete any expired servers
# Send heartbeats to idle servers if needed
for server in agent.servers.values():
server.ping(agent.router)
最后
以上就是甜蜜钢笔为你收集整理的ZeroMQ学习笔记(4)——可靠的请求-应答模式coding=gbk的全部内容,希望文章能够帮你解决ZeroMQ学习笔记(4)——可靠的请求-应答模式coding=gbk所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复