我是靠谱客的博主 甜蜜钢笔,最近开发中收集的这篇文章主要介绍ZeroMQ学习笔记(4)——可靠的请求-应答模式coding=gbk,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

第四章 可靠的请求-应答模式

懒惰海盗模式:来自客户端的可靠的请求-应答。
简单海盗模式:使用负载均衡的可靠的请求-应答。
偏执海盗模式:使用信号检测的可靠的请求-应答。
管家模式:面向服务的可靠排队。
泰坦尼克模式:基于磁盘/断开连接的可靠排队。
双星模式:主备份服务器故障转移。
自由职业者模式:缺少代理的可靠的请求-应答。
一、什么是“可靠性”
1、 定义:
从故障的角度来定义可靠性,如果我们可以处理一组特定的被明确定义和理解的故障,那么我们对于这些故障是可靠的。简单来说,可靠性就是“在代码冻结或崩溃时,让事情保持正常工作”。
2、故障排序(概率大小降序)
1.应用程序代码。
2.系统代码。
3.消息队列可能会溢出。
4.网络可能会失效。
5.硬件可能会失效。
6.网络可能会因外来的途径失败。例如,一个交换机的某些端口损坏。
7.整个数据中心因雷电、地震、火灾,或常见的电力或冷却故障受损。
二、可靠性设计
1、REQ-REP(请求-应答):
当客户端接收不到服务器的答复,可以判断正在处理请求的服务器故障了。客户端可以采取在放弃、等待、稍后再试以及发现寻找另一台服务器等处理方式。基本的REQ-REP模式应对故障效果很差,处理请求时服务器崩溃,网络丢失REQ或REP,客户端就会一直挂起。
2、将客户端连接到服务器大致有三种方式:

  1. 多个客户端与一台服务器直接交流。
    故障类型:服务器崩溃重启,网络断开连接。
  2. 多个客户端与一台将工作分配给多个worker的代理服务器交流。
    故障类型:worker崩溃重启、超负荷、worker频繁循环,队列崩溃并重启,以及网络断开连接。
  3. 多个客户端与多台服务器交流,没有中间层代理。
    故障类型:服务崩溃并重新启动、服务忙于循环、服务过载, 以及网络断开连接。
    3、 提高可靠性的三种方法:
    1、 客户端可靠性(懒惰海盗模式):
    不执行一个阻塞的接收,而是:
     仅当确信应答已经到达时,才轮询REQ套接字并接收它的应答。
     如果在超时时间内没有应答到达,则重新发送一个请求。
     如果多次请求后还是没有应答,则放弃事务。
    在这里插入图片描述

示例Lazy Pirate client

# coding=gbk
#  懒惰海盗客户端
#  使用zmq_poll执行安全的请求-应答
#  要运行它,启动lpserver,然后再随机清除/重启它
#

import itertools
import logging
import sys
import zmq

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)

REQUEST_TIMEOUT = 2500
# 放弃前重试次数
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"

context = zmq.Context()

logging.info("Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)

for sequence in itertools.count():
    request = str(sequence).encode()
    logging.info("Sending (%s)", request)
    # 发送一个请求,然后努力去获取一个应答
    client.send(request)

    retries_left = REQUEST_RETRIES
    while True:
        # 以超时时间轮询,套接字得到一个应答
        if (client.poll(REQUEST_TIMEOUT) & zmq.POLLIN) != 0:
            reply = client.recv()
            if int(reply) == sequence:
                logging.info("Server replied OK (%s)", reply)
                break
            else:
                logging.error("Malformed reply from server: %s", reply)
                continue

        retries_left -= 1
        logging.warning("No response from server")
        # 关闭并移除旧套接字。
        client.setsockopt(zmq.LINGER, 0)
        client.close()
        if retries_left == 0:
            logging.error("Server seems to be offline, abandoning")
            sys.exit()

        logging.info("Reconnecting to server…")
        # 创建一个新套接字,再次发送请求
        client = context.socket(zmq.REQ)
        client.connect(SERVER_ENDPOINT)
        logging.info("Resending (%s)", request)
        client.send(request)

示例Lazy Pirate server

# coding=gbk
#  懒惰海盗服务器
#  绑定REQ套接字到 tcp://*:5555
#  类似于 hwserver,除了 :
#   - 按原样回应请求
#   - 随机地慢速运行,或退出以模拟服务器崩溃。
#
from random import randint
import itertools
import logging
import time
import zmq

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)

context = zmq.Context()
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")

for cycles in itertools.count():
    request = server.recv()

    # 模拟各种问题,在几个周期后
    if cycles > 3 and randint(0, 3) == 0:
        logging.info("Simulating a crash")
        break
    elif cycles > 3 and randint(0, 3) == 0:
        logging.info("Simulating CPU overload")
        time.sleep(2)

    logging.info("Normal request (%s)", request)
    time.sleep(1)  # Do some heavy work
    server.send(request)

(懒惰海盗模式)优点:
•易于理解和实施。
•能轻松地与现有的客户端和服务器应用程序代码配合使用。
•ZMQ自动重试实际的重新连接,直到它正常工作为止。
(懒惰海盗模式)缺点:
•不能执行到备份或备用服务器的故障切换。
2、 基本可靠队列(简单海盗模式):
带有队列代理的懒惰海盗模式,我们可以透明地通过这个代理与多台服务器交流,我们可以更准确地称这些服务器为worker,从最简化的模式(简单海盗模式)开始。
这些海盗模式中,工人都是无状态的。如果应用程序需要共享某些状态,例如,一个共享的数据库。有队列代理,意味着worker来去自如,客户端不需要知道关于它的任何事。如果一个worker崩溃,另一个worker就接管它。
在这里插入图片描述

示例Simple Pirate queue

# coding=gbk
#
#  简单海盗队列
#  这与负载均衡模式是相同的,不带可靠性
#  它依赖于客户端来恢复。始终运行。
#

import zmq

# worker准备就绪的信号
LRU_READY = "x01"

context = zmq.Context(1)

frontend = context.socket(zmq.ROUTER)  # ROUTER
backend = context.socket(zmq.ROUTER)  # ROUTER
frontend.bind("tcp://*:5555")  # 用于客户端
backend.bind("tcp://*:5556")  # 用于workers

poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)

poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)

workers = []

while True:
    if workers:
        socks = dict(poll_both.poll())
    else:
        socks = dict(poll_workers.poll())

    # 处理后端的worker活动
    if socks.get(backend) == zmq.POLLIN:
        # 使用worker地址进行LRU路由
        msg = backend.recv_multipart()
        if not msg:
            break
        address = msg[0]
        workers.append(address)

        # 第二个(分隔符)之后的所有内容都是应答
        reply = msg[2:]

        # 如果不是READY,则转发消息给客户端
        if reply[0] != LRU_READY:
            frontend.send_multipart(reply)

    if socks.get(frontend) == zmq.POLLIN:
        #  获取客户端请求,路由到第一个可用的worker
        msg = frontend.recv_multipart()
        request = [workers.pop(0), ''.encode()] + msg
        backend.send_multipart(request)

示例Simple Pirate worker
#  coding=gbk
#  简单海盗worker
#  连接REQ套接字到tcp://*:5556
#  实现LRU排队的worker部分

from random import randint
import time
import zmq

# worker准备就绪的信号
LRU_READY = "x01"

context = zmq.Context(1)
worker = context.socket(zmq.REQ)
# 设置随机的身份以便于跟踪
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
worker.connect("tcp://localhost:5556")
# 告诉代理我们已准备好接受工作
print("I: (%s) worker ready" % identity)
worker.send_string(LRU_READY)

cycles = 0
while True:
    msg = worker.recv_multipart()
    if not msg:
        break
    # 模拟各种问题,在几个周期后 cycles++
    cycles += 1
    if cycles > 0 and randint(0, 5) == 0:
        print("I: (%s) simulating a crash" % identity)
        break
    elif cycles > 3 and randint(0, 5) == 0:
        print("I: (%s) simulating CPU overload" % identity)
        time.sleep(3)
    print("I: (%s) normal reply" % identity)
    time.sleep(1)  # Do some heavy work
    worker.send_multipart(msg)

这种模式适用于任何数量的客户端和工人。不过,它也有一些缺点:
• 它在面临队列崩溃和重启时不够健壮。客户端会恢复,但worker不会。新启动的队列连接之前,worker不会发出准备就绪的信号。要解决这个问题,需在队列与worker间做信号检测,让工人可以检测到队列已经死机。
• 队列不检测worker的故障,如果一个worker在空闲期间崩溃,除非队列给它发送一个请求,否则不能从工作队列中将其删除。客户端会不停的等待和重试。需在worker与队列之间做信号检测,让队列可以在任何阶段都检测到丢失的worker。
3、 健壮可靠队列(偏执海盗模式):
对于偏执海盗worker,切换到一个DEALER套接字。这样我们可以在任何时候发送和接收消息,不需遵循REQ强调的步调一致地发送/接收。
在这里插入图片描述

示例Paranoid Pirate queue

#  coding=gbk
#  偏执海盗队列
#
from collections import OrderedDict
import time
import zmq

HEARTBEAT_LIVENESS = 3  # 3-5是合理的
HEARTBEAT_INTERVAL = 1.0  # 秒钟

#  偏执海盗协议常量
PPP_READY = b"x01"  # worker准备就绪的信号
PPP_HEARTBEAT = b"x02"  # worker信号检测的信号

# worker类
class Worker(object):
    def __init__(self, address):
        self.address = address
        self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

class WorkerQueue(object):
    def __init__(self):
        self.queue = OrderedDict()

    def ready(self, worker):
        self.queue.pop(worker.address, None)
        self.queue[worker.address] = worker

    def purge(self):
        """寻找并杀死过期的workers."""
        t = time.time()
        expired = []
        for address, worker in self.queue.items():
            if t > worker.expiry:  # 过期的 Worker
                expired.append(address)
        for address in expired:
            print("W: Idle worker expired: %s" % address)
            self.queue.pop(address, None)

    def next(self):
        address, worker = self.queue.popitem(False)
        return address

context = zmq.Context(1)

frontend = context.socket(zmq.ROUTER)  # ROUTER
backend = context.socket(zmq.ROUTER)  # ROUTER
frontend.bind("tcp://*:5555")  # 用于客户端
backend.bind("tcp://*:5556")  # 用于workers

poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)

poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
# 可用的工人的列表
workers = WorkerQueue()
# 以常规时间间隔发出检测信号
heartbeat_at = time.time() + HEARTBEAT_INTERVAL

while True:
    if len(workers.queue) > 0:
        poller = poll_both
    else:
        poller = poll_workers
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # 在后台处理worker活动
    if socks.get(backend) == zmq.POLLIN:
        # 使用worker地址进行负载均衡
        frames = backend.recv_multipart()
        if not frames:
            break
        # 来自工人的任何信号表示它已准备就绪
        address = frames[0]
        workers.ready(Worker(address))

        # 验证控制消息,或给客户端返回应答
        msg = frames[1:]
        if len(msg) == 1:
            if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
                print("E: Invalid message from worker: %s" % msg)
        else:
            frontend.send_multipart(msg)

        # 如果时间到了,给空闲的worker发送心跳信号
        if time.time() >= heartbeat_at:
            for worker in workers.queue:
                msg = [worker, PPP_HEARTBEAT]
                backend.send_multipart(msg)
            heartbeat_at = time.time() + HEARTBEAT_INTERVAL
    if socks.get(frontend) == zmq.POLLIN:
        frames = frontend.recv_multipart()
        if not frames:
            break

        frames.insert(0, workers.next())
        backend.send_multipart(frames)

    workers.purge()

示例Paranoid Pirate worker

#  coding=gbk
#
#  偏执海盗工人
#
from random import randint
import time

import zmq

HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32

#  偏执海盗协议常量
PPP_READY = b"x01"  # 工人准备就绪的信号
PPP_HEARTBEAT = b"x02"  # 工人信号检测的信号

def worker_socket(context, poller):
    """返回一个新的配置套接字的Helper函数与偏执的海盗队列相连"""
    worker = context.socket(zmq.DEALER)  # DEALER
    identity = b"%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
    worker.setsockopt(zmq.IDENTITY, identity)
    poller.register(worker, zmq.POLLIN)
    worker.connect("tcp://localhost:5556")
    worker.send(PPP_READY)
    return worker

context = zmq.Context(1)
poller = zmq.Poller()

liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

worker = worker_socket(context, poller)
cycles = 0
while True:
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # 在后端处理worker活动
    if socks.get(worker) == zmq.POLLIN:
        #  Get message
        #  - 3-part envelope + content -> request
        #  - 1-part HEARTBEAT -> heartbeat
        frames = worker.recv_multipart()
        if not frames:
            break  # Interrupted

        if len(frames) == 3:
            # 在几个周期后模拟各种问题
            cycles += 1
            if cycles > 3 and randint(0, 5) == 0:
                print("I: Simulating a crash")
                break
            if cycles > 3 and randint(0, 5) == 0:
                print("I: Simulating CPU overload")
                time.sleep(3)
            print("I: Normal reply")
            worker.send_multipart(frames)
            liveness = HEARTBEAT_LIVENESS
            time.sleep(1)  # Do some heavy work
        elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
            print("I: Queue heartbeat")
            liveness = HEARTBEAT_LIVENESS
        else:
            print("E: Invalid message: %s" % frames)
        interval = INTERVAL_INIT
    else:
        liveness -= 1
        if liveness == 0:
            print("W: Heartbeat failure, can't reach queue")
            print("W: Reconnecting in %0.2fs..." % interval)
            time.sleep(interval)

            if interval < INTERVAL_MAX:
                interval *= 2
            poller.unregister(worker)
            worker.setsockopt(zmq.LINGER, 0)
            worker.close()
            worker = worker_socket(context, poller)
            liveness = HEARTBEAT_LIVENESS
    if time.time() > heartbeat_at:
        heartbeat_at = time.time() + HEARTBEAT_INTERVAL
        print("I: Worker heartbeat")
        worker.send(PPP_HEARTBEAT)

三、信号检测(Heartbeating)
信号检测用来确定节点是活着还是死掉。ZMQ来检测信号的三个方案
1、不做任何操作
最常见的方法是抱乐观态度不做信号检测。
2、 单向信号检测
所有节点每隔一秒发送检测信号消息给其对等节点。当某个节点在超过几秒钟内没有从另一个节点收到任何回复,就把该对等节点当作已崩溃。
3、 乒乓信号检测
使用一个乒乓对话框。一个节点发送一个ping命令到另一个节点,后者回复一个pong命令。
例子:针对偏执海盗的信号检测(方法2)
worker中处理来自队列的检测信号步骤:

  1. 设置一个活跃度(liveness)。这是我们让队列崩溃前,允许错过的检测信号数量。初始值是3,每次错过检测信号时,递减。
  2. 在zmq_poll()循环中,每次等候1秒(信号检测间隔)。
  3. 如果在这段时间里有来自队列的消息,将活跃度重置为3。
  4. 如果在这段时间里没有来自队列的消息,减少活跃度。
  5. 如果活跃度达到零,我们就认为队列崩溃了。
  6. 如果队列已经崩溃了,就销毁套接字,创建一个新的套接字,并重新连接。
  7. 为避免打开和关闭太多套接字,在重新连接之前,需等待一定的时间,并且每
    次对时间间隔加倍,直至达到32秒。
    处理队列的信号检测:
  8. 在发送下一个检测信号的时候执行计算,这是一个变量,因为我们正在与队列交流。
  9. 在zmq_poll()循环中,每当经过这段时间,我们就发送一个检测信号到队列。
    示例代码:worker的基本信号检测
#define HEARTBEAT_LIVENESS  3       //  3-5是合理的
#define HEARTBEAT_INTERVAL  1000    //  毫秒
#define INTERVAL_INIT       1000    //  重新连接时间间隔的初始值
#define INTERVAL_MAX       32000    //  在指数级退避后的时间间隔

...
//  如果活跃度触及零,就认为队列已经断开连接
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;

//  以常规时间间隔发出检测信号
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
    zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };
    int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);

    if (items [0].revents & ZMQ_POLLIN) {
        //  从队列接收到任何消息
        liveness = HEARTBEAT_LIVENESS;
        interval = INTERVAL_INIT;
    }
    else
    if (--liveness == 0) {
        zclock_sleep (interval);
        if (interval < INTERVAL_MAX)
            interval *= 2;
        zsocket_destroy (ctx, worker);
        ...
        liveness = HEARTBEAT_LIVENESS;
    }
    //  如果到时间了就发送检测信号给队列
    if (zclock_time () > heartbeat_at) {
        heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        //  发送检测信号消息给队列
    }
}

在这里插入图片描述

四、合同和协议(Contracts and Protocols)
定义:为了保证互操作性,我们需要一种合同,一份协议,让在不同时间和地点的不同团队编写能保证协同工作的代码,我们称之为“协议”。

1、 面向服务的可靠队列(管家模式)
管家协议(MDP)用一个有趣的方式延伸并改善了海盗模式协议(PPP):它在客户端发送的请求上增加了一个“服务名称”,并要求worker注册特定的服务。添加服务名称使得偏执海盗队列变成了面向服务的代理。管家协议分为两部分,客户端一方和worker一方。
客户端API:
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
worker API:
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
示例Majordomo Protocol Client API

coding=gbk

“”"管家协议客户端API, Python version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

“”"

import logging

import zmq

import MDP
from zhelpers import dump

class MajorDomoClient(object):
“”"Majordomo Protocol Client API, Python version.

  Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
broker = None
ctx = None
client = None
poller = None
timeout = 2500
retries = 3
verbose = False

def __init__(self, broker, verbose=False):
    self.broker = broker
    self.verbose = verbose
    self.ctx = zmq.Context()
    self.poller = zmq.Poller()
    logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
            level=logging.INFO)
    self.reconnect_to_broker()


def reconnect_to_broker(self):
    """连接或重新连接到代理"""
    if self.client:
        self.poller.unregister(self.client)
        self.client.close()
    self.client = self.ctx.socket(zmq.REQ)
    self.client.linger = 0
    self.client.connect(self.broker)
    self.poller.register(self.client, zmq.POLLIN)
    if self.verbose:
        logging.info("I: connecting to broker at %s...", self.broker)

def send(self, service, request):
    """向代理发送请求并得到回复。

    获得请求消息的所有权,并在发送时销毁它。
    返回应答消息,如果没有应答,则返回None。
    """
    if not isinstance(request, list):
        request = [request]
    request = [MDP.C_CLIENT, service] + request
    if self.verbose:
        logging.warn("I: send request to '%s' service: ", service)
        dump(request)
    reply = None

    retries = self.retries
    while retries > 0:
        self.client.send_multipart(request)
        try:
            items = self.poller.poll(self.timeout)
        except KeyboardInterrupt:
            break # interrupted

        if items:
            msg = self.client.recv_multipart()
            if self.verbose:
                logging.info("I: received reply:")
                dump(msg)

            # 不要尝试处理错误
            assert len(msg) >= 3

            header = msg.pop(0)
            assert MDP.C_CLIENT == header

            reply_service = msg.pop(0)
            assert service == reply_service

            reply = msg
            break
        else:
            if retries:
                logging.warn("W: no reply, reconnecting...")
                self.reconnect_to_broker()
            else:
                logging.warn("W: permanent error, abandoning")
                break
            retries -= 1

    return reply

def destroy(self):
    self.context.destroy()

示例Majordomo Protocol client example

"""
管家协议客户端示例. Uses the mdcli API to hide all MDP aspects

Author : Min RK <benjaminrk@gmail.com>

"""

import sys
from mdcliapi import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    count = 0
    while count < 100000:
        request = b"Hello world"
        try:
            reply = client.send(b"echo", request)
        except KeyboardInterrupt:
            break
        else:
            # 如果没有回复,也可以打断:
            if reply is None:
                break
        count += 1
    print ("%i requests/replies processed" % count)

if __name__ == '__main__':
    main()

示例Majordomo Protocol Worker API
"""管家协议 Worker API, Python version

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""

import logging
import time
import zmq

from zhelpers import dump
# 管家协议常量:
import MDP

class MajorDomoWorker(object):
    """Majordomo Protocol Worker API, Python version

    Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """

    HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
    broker = None
    ctx = None
    service = None

    worker = None # Socket 代理
    heartbeat_at = 0 # 何时发送心跳信号 (relative to time.time(), so in seconds)
    liveness = 0 # 还有多少次尝试
    heartbeat = 2500 # 心跳延迟, 毫秒
    reconnect = 2500 # 重连延迟, 毫秒

    # 内部状态
    expect_reply = False # 仅开始时 False

    timeout = 2500 # 轮询超时
    verbose = False # 将活动打印到标准输出

    # 返回地址(如果有的话)
    reply_to = None

    def __init__(self, broker, service, verbose=False):
        self.broker = broker
        self.service = service
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s",
                            datefmt="%Y-%m-%d %H:%M:%S",
                            level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.worker:
            self.poller.unregister(self.worker)
            self.worker.close()
        self.worker = self.ctx.socket(zmq.DEALER)
        self.worker.linger = 0
        self.worker.connect(self.broker)
        self.poller.register(self.worker, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

        # Register service with broker
        self.send_to_broker(MDP.W_READY, self.service, [])

        # If liveness hits zero, queue is considered disconnected
        self.liveness = self.HEARTBEAT_LIVENESS
        self.heartbeat_at = time.time() + 1e-3 * self.heartbeat


    def send_to_broker(self, command, option=None, msg=None):
        """Send message to broker.

        If no msg is provided, creates one internally
        """
        if msg is None:
            msg = []
        elif not isinstance(msg, list):
            msg = [msg]

        if option:
            msg = [option] + msg

        msg = [b'', MDP.W_WORKER, command] + msg
        if self.verbose:
            logging.info("I: sending %s to broker", command)
            dump(msg)
        self.worker.send_multipart(msg)


    def recv(self, reply=None):
        """Send reply, if any, to broker and wait for next request."""
        # Format and send the reply if we were provided one
        assert reply is not None or not self.expect_reply

        if reply is not None:
            assert self.reply_to is not None
            reply = [self.reply_to, b''] + reply
            self.send_to_broker(MDP.W_REPLY, msg=reply)

        self.expect_reply = True

        while True:
            # Poll socket for a reply, with timeout
            try:
                items = self.poller.poll(self.timeout)
            except KeyboardInterrupt:
                break # Interrupted

            if items:
                msg = self.worker.recv_multipart()
                if self.verbose:
                    logging.info("I: received message from broker: ")
                    dump(msg)

                self.liveness = self.HEARTBEAT_LIVENESS
                # Don't try to handle errors, just assert noisily
                assert len(msg) >= 3

                empty = msg.pop(0)
                assert empty == b''

                header = msg.pop(0)
                assert header == MDP.W_WORKER

                command = msg.pop(0)
                if command == MDP.W_REQUEST:
                    # We should pop and save as many addresses as there are
                    # up to a null part, but for now, just save one...
                    self.reply_to = msg.pop(0)
                    # pop empty
                    empty = msg.pop(0)
                    assert empty == b''

                    return msg # We have a request to process
                elif command == MDP.W_HEARTBEAT:
                    # Do nothing for heartbeats
                    pass
                elif command == MDP.W_DISCONNECT:
                    self.reconnect_to_broker()
                else :
                    logging.error("E: invalid input message: ")
                    dump(msg)

            else:
                self.liveness -= 1
                if self.liveness == 0:
                    if self.verbose:
                        logging.warn("W: disconnected from broker - retrying...")
                    try:
                        time.sleep(1e-3*self.reconnect)
                    except KeyboardInterrupt:
                        break
                    self.reconnect_to_broker()

            # Send HEARTBEAT if it's time
            if time.time() > self.heartbeat_at:
                self.send_to_broker(MDP.W_HEARTBEAT)
                self.heartbeat_at = time.time() + 1e-3*self.heartbeat

        logging.warn("W: interrupt received, killing worker...")
        return None


    def destroy(self):
        # context.destroy depends on pyzmq >= 2.1.10
        self.ctx.destroy(0)

示例Majordomo Protocol worker example

"""Majordomo Protocol worker example.

Uses the mdwrk API to hide all MDP aspects

Author: Min RK <benjaminrk@gmail.com>
"""

import sys
from mdwrkapi import MajorDomoWorker

def main():
    verbose = '-v' in sys.argv
    worker = MajorDomoWorker("tcp://localhost:5555", b"echo", verbose)
    reply = None
    while True:
        request = worker.recv(reply)
        if request is None:
            break # Worker was interrupted
        reply = request # Echo is complex... :-)


if __name__ == '__main__':
    main()

示例Majordomo Protocol broker

#  coding=gbk
"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8

Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""

import logging
import sys
import time
from binascii import hexlify

import zmq

# local
import MDP
from zhelpers import dump

class Service(object):
    """a single Service"""
    name = None # Service name
    requests = None # List of client requests
    waiting = None # List of waiting workers

    def __init__(self, name):
        self.name = name
        self.requests = []
        self.waiting = []

class Worker(object):
    """a Worker, idle or active"""
    identity = None # hex Identity of worker
    address = None # Address to route to
    service = None # Owning service, if known
    expiry = None # expires at this point, unless heartbeat

    def __init__(self, identity, address, lifetime):
        self.identity = identity
        self.address = address
        self.expiry = time.time() + 1e-3*lifetime

class MajorDomoBroker(object):
    """
    Majordomo Protocol broker
    A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
    """

    # We'd normally pull these from config data
    INTERNAL_SERVICE_PREFIX = b"mmi."
    HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
    HEARTBEAT_INTERVAL = 2500 # msecs
    HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

    # ---------------------------------------------------------------------

    ctx = None # Our context
    socket = None # Socket for clients & workers
    poller = None # our Poller

    heartbeat_at = None# When to send HEARTBEAT
    services = None # known services
    workers = None # known workers
    waiting = None # idle workers

    verbose = False # Print activity to stdout

    # ---------------------------------------------------------------------


    def __init__(self, verbose=False):
        """Initialize broker state."""
        self.verbose = verbose
        self.services = {}
        self.workers = {}
        self.waiting = []
        self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.ROUTER)
        self.socket.linger = 0
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        logging.basicConfig(format="%(asctime)s %(message)s",
                            datefmt="%Y-%m-%d %H:%M:%S",
                            level=logging.INFO)



    # ---------------------------------------------------------------------

    def mediate(self):
        """Main broker work happens here"""
        while True:
            try:
                items = self.poller.poll(self.HEARTBEAT_INTERVAL)
            except KeyboardInterrupt:
                break # Interrupted
            if items:
                msg = self.socket.recv_multipart()
                if self.verbose:
                    logging.info("I: received message:")
                    dump(msg)

                sender = msg.pop(0)
                empty = msg.pop(0)
                assert empty == b''
                header = msg.pop(0)

                if (MDP.C_CLIENT == header):
                    self.process_client(sender, msg)
                elif (MDP.W_WORKER == header):
                    self.process_worker(sender, msg)
                else:
                    logging.error("E: invalid message:")
                    dump(msg)

            self.purge_workers()
            self.send_heartbeats()

    def destroy(self):
        """Disconnect all workers, destroy context."""
        while self.workers:
            self.delete_worker(self.workers.values()[0], True)
        self.ctx.destroy(0)


    def process_client(self, sender, msg):
        """Process a request coming from a client."""
        assert len(msg) >= 2 # Service name + body
        service = msg.pop(0)
        # Set reply return address to client sender
        msg = [sender, b''] + msg
        if service.startswith(self.INTERNAL_SERVICE_PREFIX):
            self.service_internal(service, msg)
        else:
            self.dispatch(self.require_service(service), msg)


    def process_worker(self, sender, msg):
        """Process message sent to us by a worker."""
        assert len(msg) >= 1 # At least, command

        command = msg.pop(0)

        worker_ready = hexlify(sender) in self.workers

        worker = self.require_worker(sender)

        if (MDP.W_READY == command):
            assert len(msg) >= 1 # At least, a service name
            service = msg.pop(0)
            # Not first command in session or Reserved service name
            if (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):
                self.delete_worker(worker, True)
            else:
                # Attach worker to service and mark as idle
                worker.service = self.require_service(service)
                self.worker_waiting(worker)

        elif (MDP.W_REPLY == command):
            if (worker_ready):
                # Remove & save client return envelope and insert the
                # protocol header and service name, then rewrap envelope.
                client = msg.pop(0)
                empty = msg.pop(0) # ?
                msg = [client, b'', MDP.C_CLIENT, worker.service.name] + msg
                self.socket.send_multipart(msg)
                self.worker_waiting(worker)
            else:
                self.delete_worker(worker, True)

        elif (MDP.W_HEARTBEAT == command):
            if (worker_ready):
                worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
            else:
                self.delete_worker(worker, True)

        elif (MDP.W_DISCONNECT == command):
            self.delete_worker(worker, False)
        else:
            logging.error("E: invalid message:")
            dump(msg)

    def delete_worker(self, worker, disconnect):
        """Deletes worker from all data structures, and deletes worker."""
        assert worker is not None
        if disconnect:
            self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)

        if worker.service is not None:
            worker.service.waiting.remove(worker)
        self.workers.pop(worker.identity)

    def require_worker(self, address):
        """Finds the worker (creates if necessary)."""
        assert (address is not None)
        identity = hexlify(address)
        worker = self.workers.get(identity)
        if (worker is None):
            worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)
            self.workers[identity] = worker
            if self.verbose:
                logging.info("I: registering new worker: %s", identity)

        return worker

    def require_service(self, name):
        """Locates the service (creates if necessary)."""
        assert (name is not None)
        service = self.services.get(name)
        if (service is None):
            service = Service(name)
            self.services[name] = service

        return service

    def bind(self, endpoint):
        """Bind broker to endpoint, can call this multiple times.

        We use a single socket for both clients and workers.
        """
        self.socket.bind(endpoint)
        logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)

    def service_internal(self, service, msg):
        """Handle internal service according to 8/MMI specification"""
        returncode = b"501"
        if b"mmi.service" == service:
            name = msg[-1]
            returncode = b"200" if name in self.services else b"404"
        msg[-1] = returncode

        # insert the protocol header and service name after the routing envelope ([client, ''])
        msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]
        self.socket.send_multipart(msg)

    def send_heartbeats(self):
        """Send heartbeats to idle workers if it's time"""
        if (time.time() > self.heartbeat_at):
            for worker in self.waiting:
                self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)

            self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL

    def purge_workers(self):
        """Look for & kill expired workers.

        Workers are oldest to most recent, so we stop at the first alive worker.
        """
        while self.waiting:
            w = self.waiting[0]
            if w.expiry < time.time():
                logging.info("I: deleting expired worker: %s", w.identity)
                self.delete_worker(w,False)
                self.waiting.pop(0)
            else:
                break

    def worker_waiting(self, worker):
        """This worker is now waiting for work."""
        # Queue to broker and service waiting lists
        self.waiting.append(worker)
        worker.service.waiting.append(worker)
        worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
        self.dispatch(worker.service, None)

    def dispatch(self, service, msg):
        """Dispatch requests to waiting workers as possible"""
        assert (service is not None)
        if msg is not None:# Queue message if any
            service.requests.append(msg)
        self.purge_workers()
        while service.waiting and service.requests:
            msg = service.requests.pop(0)
            worker = service.waiting.pop(0)
            self.waiting.remove(worker)
            self.send_to_worker(worker, MDP.W_REQUEST, None, msg)

    def send_to_worker(self, worker, command, option, msg=None):
        """Send message to worker.

        If message is provided, sends that message.
        """

        if msg is None:
            msg = []
        elif not isinstance(msg, list):
            msg = [msg]

        # Stack routing and protocol envelopes to start of message
        # and routing envelope
        if option is not None:
            msg = [option] + msg
        msg = [worker.address, b'', MDP.W_WORKER, command] + msg

        if self.verbose:
            logging.info("I: sending %r to worker", command)
            dump(msg)

        self.socket.send_multipart(msg)


def main():
    """create and start new broker"""
    verbose = '-v' in sys.argv
    broker = MajorDomoBroker(verbose)
    broker.bind("tcp://*:5555")
    broker.mediate()

if __name__ == '__main__':
    main()

2、 异步管家模式
示例1Round-trip demonstrator

"""Round-trip demonstrator

While this example runs in a single process, that is just to make
it easier to start and stop the example. Client thread signals to
main when it's ready.
"""

import sys
import threading
import time

import zmq

from zhelpers import zpipe

def client_task (ctx, pipe):
    client = ctx.socket(zmq.DEALER)
    client.identity = b'C'
    client.connect("tcp://localhost:5555")

    print ("Setting up test...")
    time.sleep(0.1)

    print ("Synchronous round-trip test...")
    start = time.time()
    requests = 10000
    for r in range(requests):
        client.send(b"hello")
        client.recv()
    print (" %d calls/second" % (requests / (time.time()-start)))

    print ("Asynchronous round-trip test...")
    start = time.time()
    for r in range(requests):
        client.send(b"hello")
    for r in range(requests):
        client.recv()
    print (" %d calls/second" % (requests / (time.time()-start)))

    # signal done:
    pipe.send(b"done")

def worker_task():
    ctx = zmq.Context()
    worker = ctx.socket(zmq.DEALER)
    worker.identity = b'W'
    worker.connect("tcp://localhost:5556")

    while True:
        msg = worker.recv_multipart()
        worker.send_multipart(msg)
    ctx.destroy(0)

def broker_task():
    # Prepare our context and sockets
    ctx = zmq.Context()
    frontend = ctx.socket(zmq.ROUTER)
    backend = ctx.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5555")
    backend.bind("tcp://*:5556")

    # Initialize poll set
    poller = zmq.Poller()
    poller.register(backend, zmq.POLLIN)
    poller.register(frontend, zmq.POLLIN)

    while True:
        try:
            items = dict(poller.poll())
        except:
            break # Interrupted

        if frontend in items:
            msg = frontend.recv_multipart()
            msg[0] = b'W'
            backend.send_multipart(msg)
        if backend in items:
            msg = backend.recv_multipart()
            msg[0] = b'C'
            frontend.send_multipart(msg)

def main():
    # Create threads
    ctx = zmq.Context()
    client,pipe = zpipe(ctx)

    client_thread = threading.Thread(target=client_task, args=(ctx, pipe))
    worker_thread = threading.Thread(target=worker_task)
    worker_thread.daemon=True
    broker_thread = threading.Thread(target=broker_task)
    broker_thread.daemon=True

    worker_thread.start()
    broker_thread.start()
    client_thread.start()

    # Wait for signal on client pipe
    client.recv()

if __name__ == '__main__':
    main()

示例2Majordomo Protocol Client API

"""Majordomo Protocol Client API, Python version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""

import logging

import zmq

import MDP
from zhelpers import dump

class MajorDomoClient(object):
    """Majordomo Protocol Client API, Python version.

      Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """
    broker = None
    ctx = None
    client = None
    poller = None
    timeout = 2500
    verbose = False

    def __init__(self, broker, verbose=False):
        self.broker = broker
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s",
                            datefmt="%Y-%m-%d %H:%M:%S",
                            level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.client:
            self.poller.unregister(self.client)
            self.client.close()
        self.client = self.ctx.socket(zmq.DEALER)
        self.client.linger = 0
        self.client.connect(self.broker)
        self.poller.register(self.client, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

    def send(self, service, request):
        """Send request to broker
        """
        if not isinstance(request, list):
            request = [request]

        # Prefix request with protocol frames
        # Frame 0: empty (REQ emulation)
        # Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
        # Frame 2: Service name (printable string)

        request = [b'', MDP.C_CLIENT, service] + request
        if self.verbose:
            logging.warn("I: send request to '%s' service: ", service)
            dump(request)
        self.client.send_multipart(request)

    def recv(self):
        """Returns the reply message or None if there was no reply."""
        try:
            items = self.poller.poll(self.timeout)
        except KeyboardInterrupt:
            return # interrupted

        if items:
            # if we got a reply, process it
            msg = self.client.recv_multipart()
            if self.verbose:
                logging.info("I: received reply:")
                dump(msg)

            # Don't try to handle errors, just assert noisily
            assert len(msg) >= 4

            empty = msg.pop(0)
            header = msg.pop(0)
            assert MDP.C_CLIENT == header

            service = msg.pop(0)
            return msg
        else:
            logging.warn("W: permanent error, abandoning request")

示例3Majordomo Protocol client example

"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects

Author : Min RK <benjaminrk@gmail.com>

"""

import sys
from mdcliapi2 import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    requests = 100000
    for i in range(requests):
        request = b"Hello world"
        try:
            client.send(b"echo", request)
        except KeyboardInterrupt:
            print ("send interrupted, aborting")
            return

    count = 0
    while count < requests:
        try:
            reply = client.recv()
        except KeyboardInterrupt:
            break
        else:
            # also break on failure to reply:
            if reply is None:
                break
        count += 1
    print ("%i requests/replies processed" % count)

if __name__ == '__main__':
    main()

用以上的测试程序来衡量往返的实际成本。在这个测试中,我们发出了一堆消息,首先逐个等待每个消息的应答,其次作为批处理,将所有应答作为一个批次来读取。虽然这两种方法都做同样的工作,但它们给出了非常不同的结果。
示例1:在最简单的情况下,往返比异步要慢20倍
示例3:通过10W请求-应答循环,使用了一个worker的异步客户端比使用同步客户端快两倍。
3、 服务发现
•当客户端请求了以mmi.开头的服务时,我们不将这个请求路由到一个worker,而是在 内部处理它。
•我们在代理中只处理一个服务,这个服务是mmi.service(服务发现服务)。
•请求的有效载荷是一个外部服务的名称(一个实际的名称,由worker提供)。
•该代理将返回"200”(OK)或“404”(未找到),这取决于是否有worker注册那个服务。
示例MMI echo query example

"""
MMI echo query example

Author : Min RK <benjaminrk@gmail.com>

"""

import sys
from mdcliapi import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    request = b"echo"
    reply = client.send(b"mmi.service", request)

    if reply:
        replycode = reply[0]
        print ("Lookup echo service:", replycode)
    else:
        print ("E: no response from broker, make sure it's running")

if __name__ == '__main__':
    main()

4、 幂等服务(Idempotent Services)
幂等含义:重复某个操作是安全的。例如,检查时钟是幂等的,将信用卡借给小孩子则不是幂等的。
当服务器应用程序不是幂等的时候,我们必须更仔细地考虑,它们何时可能会崩溃。为了处理非幂等操作,我们使用检测和拒绝重复请求的解决方案。

  1. 客户端必须为每个请求加盖一个独特的客户端标识符和一个唯一的消息编号。
  2. 服务器在发回一个应答之前,使用客户端1D和消息编号的组合作为键来存储它。
  3. 服务器从给定的客户端获取请求时,首先检查它是否拥有该客户端ID和消息编号的 应答。如果有这样的应答,它就不处理该请求,而只是重新发送应答。

5、 断开连接的可靠性(泰坦尼克模式)
在此模式中,无论客户端和工人连接是多么零散,将消息写入磁盘,确保它们不会丢失。在MDP之上放一层Titanic,而不是扩展MDP。
它有下面几个优点:

  1. 代理处理消息路由,而worker处理可靠性,分而治之。
  2. 它允许我们将用一种语言编写的代理与用另一种语言编写的worker混用。
  3. 它允许我们独立地开发一劳永逸的(fire-and-forget)技术。
    在这里插入图片描述

Titanic既是worker又是client
示例Titanic client example

"""
Titanic client example
Implements client side of http:rfc.zeromq.org/spec:9

Author : Min RK <benjaminrk@gmail.com>
"""

import sys
import time

from mdcliapi import MajorDomoClient

def service_call (session, service, request):
    """Calls a TSP service

    Returns reponse if successful (status code 200 OK), else None
    """
    reply = session.send(service, request)
    if reply:
        status = reply.pop(0)
        if status == b"200":
            return reply
        elif status == b"400":
            print ("E: client fatal error 400, aborting")
            sys.exit (1)
        elif status == b"500":
            print ("E: server fatal error 500, aborting")
            sys.exit (1)
    else:
        sys.exit (0);    #  Interrupted or failed

def main():
    verbose = '-v' in sys.argv
    session = MajorDomoClient("tcp://localhost:5555", verbose)

    #  1. Send 'echo' request to Titanic
    request = [b"echo", b"Hello world"]
    reply = service_call(session, b"titanic.request", request)

    uuid = None

    if reply:
        uuid = reply.pop(0)
        print ("I: request UUID ", uuid)

    #  2. Wait until we get a reply
    while True:
        time.sleep (.1)
        request = [uuid]
        reply = service_call (session, b"titanic.reply", request)

        if reply:
            reply_string = reply[-1]
            print ("I: reply:", reply_string)

            #  3. Close request
            request = [uuid]
            reply = service_call (session, b"titanic.close", request)
            break
        else:
            print ("I: no reply yet, trying again...")
            time.sleep(5)     #  Try again in 5 seconds
    return 0

if __name__ == '__main__':
    main()

示例Titanic service
"""
Titanic service

Implements server side of http:#rfc.zeromq.org/spec:9

Author: Min RK <benjaminrk@gmail.com>
"""

import pickle
import os
import sys
import threading
import time
from uuid import uuid4

import zmq

from mdwrkapi import MajorDomoWorker
from mdcliapi import MajorDomoClient

from zhelpers import zpipe

TITANIC_DIR = ".titanic"

def request_filename (suuid):
    """Returns freshly allocated request filename for given UUID str"""
    return os.path.join(TITANIC_DIR, "%s.req" % suuid)

#

def reply_filename (suuid):
    """Returns freshly allocated reply filename for given UUID str"""
    return os.path.join(TITANIC_DIR, "%s.rep" % suuid)

# ---------------------------------------------------------------------
# Titanic request service

def titanic_request (pipe):
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.request")

    reply = None

    while True:
        # Send reply if it's not null
        # And then get next request from broker
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        # Ensure message directory exists
        if not os.path.exists(TITANIC_DIR):
            os.mkdir(TITANIC_DIR)

        # Generate UUID and save message to disk
        suuid = uuid4().hex
        filename = request_filename (suuid)
        with open(filename, 'wb') as f:
            pickle.dump(request, f)

        # Send UUID through to message queue
        pipe.send_string(suuid)

        # Now send UUID back to client
        # Done by the worker.recv() at the top of the loop
        reply = [b"200", suuid.encode('utf-8')]


# ---------------------------------------------------------------------
# Titanic reply service

def titanic_reply ():
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.reply")
    reply = None

    while True:
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        suuid = request.pop(0).decode('utf-8')
        req_filename = request_filename(suuid)
        rep_filename = reply_filename(suuid)
        if os.path.exists(rep_filename):
            with open(rep_filename, 'rb') as f:
                reply = pickle.load(f)
            reply = [b"200"] + reply
        else:
            if os.path.exists(req_filename):
                reply = [b"300"] # pending
            else:
                reply = [b"400"] # unknown


# ---------------------------------------------------------------------
# Titanic close service

def titanic_close():
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.close")
    reply = None

    while True:
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        suuid = request.pop(0).decode('utf-8')
        req_filename = request_filename(suuid)
        rep_filename = reply_filename(suuid)
        # should these be protected?  Does zfile_delete ignore files
        # that have already been removed?  That's what we are doing here.
        if os.path.exists(req_filename):
            os.remove(req_filename)
        if os.path.exists(rep_filename):
            os.remove(rep_filename)
        reply = [b"200"]


def service_success(client, suuid):
    """Attempt to process a single request, return True if successful"""
    # Load request message, service will be first frame
    filename = request_filename (suuid)

    # If the client already closed request, treat as successful
    if not os.path.exists(filename):
        return True

    with open(filename, 'rb') as f:
        request = pickle.load(f)
    service = request.pop(0)
    # Use MMI protocol to check if service is available
    mmi_request = [service]
    mmi_reply = client.send(b"mmi.service", mmi_request)
    service_ok = mmi_reply and mmi_reply[0] == b"200"

    if service_ok:
        reply = client.send(service, request)
        if reply:
            filename = reply_filename (suuid)
            with open(filename, "wb") as f:
                pickle.dump(reply, f)
            return True

    return False


def main():
    verbose = '-v' in sys.argv
    ctx = zmq.Context()

    # Create MDP client session with short timeout
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    client.timeout = 1000 # 1 sec
    client.retries = 1 # only 1 retry

    request_pipe, peer = zpipe(ctx)
    request_thread = threading.Thread(target=titanic_request, args=(peer,))
    request_thread.daemon = True
    request_thread.start()
    reply_thread = threading.Thread(target=titanic_reply)
    reply_thread.daemon = True
    reply_thread.start()
    close_thread = threading.Thread(target=titanic_close)
    close_thread.daemon = True
    close_thread.start()

    poller = zmq.Poller()
    poller.register(request_pipe, zmq.POLLIN)

    queue_filename = os.path.join(TITANIC_DIR, 'queue')

    # Main dispatcher loop
    while True:
        # Ensure message directory exists
        if not os.path.exists(TITANIC_DIR):
            os.mkdir(TITANIC_DIR)
            f = open(queue_filename,'wb')
            f.close()
        # We'll dispatch once per second, if there's no activity
        try:
            items = poller.poll(1000)
        except KeyboardInterrupt:
            break;              # Interrupted

        if items:
            # Append UUID to queue, prefixed with '-' for pending
            suuid = request_pipe.recv().decode('utf-8')
            with open(queue_filename, 'ab') as f:
                line = "-%sn" % suuid
                f.write(line.encode('utf-8'))

        # Brute-force dispatcher
        with open(queue_filename, 'rb+') as f:
            for entry in f.readlines():
                entry = entry.decode('utf-8')
                # UUID is prefixed with '-' if still waiting
                if entry[0] == '-':
                    suuid = entry[1:].rstrip() # rstrip 'n' etc.
                    print ("I: processing request %s" % suuid)
                    if service_success(client, suuid):
                        # mark queue entry as processed
                        here = f.tell()
                        f.seek(-1*len(entry), os.SEEK_CUR)
                        f.write('+'.encode('utf-8'))
                        f.seek(here, os.SEEK_SET)


if __name__ == '__main__':
    main()

6、 高可用性对(双星模式)

双星模式(Binary Star Pattern)配置两台服务器作为主/备用高可用性对。在任何给定的时间内,其中一台(活动服务器)接受来自客户端应用程序的连接。另一台(被动服务器)不执行任何操作,但两台服务器相互监视。如果活动的那台服务器从网络消失,经过一定时间,被动的服务器就接管工作,充当活动服务器。
正常运行的高可用性对
在这里插入图片描述

故障转移过程中的高可用性对
在这里插入图片描述

从故障转移恢复的工作原理如下:
•操作者修复导致主服务器从网络上消失的问题,并重新启动它。
•操作者在备用服务器会对应用程序造成最小中断的时候,停止它。
•当应用程序已重新连接到主服务器时,操作者重新启动备用服务器。
示例Binary Star Server

# Binary Star Server
#
# Author: Dan Colish <dcolish@gmail.com>

from argparse import ArgumentParser
import time

from zhelpers import zmq

STATE_PRIMARY = 1
STATE_BACKUP = 2
STATE_ACTIVE = 3
STATE_PASSIVE = 4

PEER_PRIMARY = 1
PEER_BACKUP = 2
PEER_ACTIVE = 3
PEER_PASSIVE = 4
CLIENT_REQUEST = 5

HEARTBEAT = 1000


class BStarState(object):
    def __init__(self, state, event, peer_expiry):
        self.state = state
        self.event = event
        self.peer_expiry = peer_expiry


class BStarException(Exception):
    pass

fsm_states = {
    STATE_PRIMARY: {
        PEER_BACKUP: ("I: connected to backup (slave), ready as master",
                      STATE_ACTIVE),
        PEER_ACTIVE: ("I: connected to backup (master), ready as slave",
                      STATE_PASSIVE)
        },
    STATE_BACKUP: {
        PEER_ACTIVE: ("I: connected to primary (master), ready as slave",
                      STATE_PASSIVE),
        CLIENT_REQUEST: ("", False)
        },
    STATE_ACTIVE: {
        PEER_ACTIVE: ("E: fatal error - dual masters, aborting", False)
        },
    STATE_PASSIVE: {
        PEER_PRIMARY: ("I: primary (slave) is restarting, ready as master",
                       STATE_ACTIVE),
        PEER_BACKUP: ("I: backup (slave) is restarting, ready as master",
                      STATE_ACTIVE),
        PEER_PASSIVE: ("E: fatal error - dual slaves, aborting", False),
        CLIENT_REQUEST: (CLIENT_REQUEST, True)  # Say true, check peer later
        }
    }


def run_fsm(fsm):
    # There are some transitional states we do not want to handle
    state_dict = fsm_states.get(fsm.state, {})
    res = state_dict.get(fsm.event)
    if res:
        msg, state = res
    else:
        return
    if state is False:
        raise BStarException(msg)
    elif msg == CLIENT_REQUEST:
        assert fsm.peer_expiry > 0
        if int(time.time() * 1000) > fsm.peer_expiry:
            fsm.state = STATE_ACTIVE
        else:
            raise BStarException()
    else:
        print(msg)
        fsm.state = state


def main():
    parser = ArgumentParser()
    group = parser.add_mutually_exclusive_group()
    group.add_argument("-p", "--primary", action="store_true", default=False)
    group.add_argument("-b", "--backup", action="store_true", default=False)
    args = parser.parse_args()

    ctx = zmq.Context()
    statepub = ctx.socket(zmq.PUB)
    statesub = ctx.socket(zmq.SUB)
    statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
    frontend = ctx.socket(zmq.ROUTER)

    fsm = BStarState(0, 0, 0)

    if args.primary:
        print("I: Primary master, waiting for backup (slave)")
        frontend.bind("tcp://*:5001")
        statepub.bind("tcp://*:5003")
        statesub.connect("tcp://localhost:5004")
        fsm.state = STATE_PRIMARY
    elif args.backup:
        print("I: Backup slave, waiting for primary (master)")
        frontend.bind("tcp://*:5002")
        statepub.bind("tcp://*:5004")
        statesub.connect("tcp://localhost:5003")
        statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
        fsm.state = STATE_BACKUP

    send_state_at = int(time.time() * 1000 + HEARTBEAT)
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(statesub, zmq.POLLIN)

    while True:
        time_left = send_state_at - int(time.time() * 1000)
        if time_left < 0:
            time_left = 0
        socks = dict(poller.poll(time_left))
        if socks.get(frontend) == zmq.POLLIN:
            msg = frontend.recv_multipart()
            fsm.event = CLIENT_REQUEST
            try:
                run_fsm(fsm)
                frontend.send_multipart(msg)
            except BStarException:
                del msg

        if socks.get(statesub) == zmq.POLLIN:
            msg = statesub.recv()
            fsm.event = int(msg)
            del msg
            try:
                run_fsm(fsm)
                fsm.peer_expiry = int(time.time() * 1000) + (2 * HEARTBEAT)
            except BStarException:
                break
        if int(time.time() * 1000) >= send_state_at:
            statepub.send("%d" % fsm.state)
            send_state_at = int(time.time() * 1000) + HEARTBEAT

if __name__ == '__main__':
    main()

示例Binary Star Client
from time import sleep
import zmq


REQUEST_TIMEOUT = 1000  # msecs
SETTLE_DELAY = 2000  # before failing over


def main():
    server = ['tcp://localhost:5001', 'tcp://localhost:5002']
    server_nbr = 0
    ctx = zmq.Context()
    client = ctx.socket(zmq.REQ)
    client.connect(server[server_nbr])
    poller = zmq.Poller()
    poller.register(client, zmq.POLLIN)

    sequence = 0
    while True:
        client.send_string("%s" % sequence)

        expect_reply = True
        while expect_reply:
            socks = dict(poller.poll(REQUEST_TIMEOUT))
            if socks.get(client) == zmq.POLLIN:
                reply = client.recv_string()
                if int(reply) == sequence:
                    print("I: server replied OK (%s)" % reply)
                    expect_reply = False
                    sequence += 1
                    sleep(1)
                else:
                    print("E: malformed reply from server: %s" % reply)
            else:
                print("W: no response from server, failing over")
                sleep(SETTLE_DELAY / 1000)
                poller.unregister(client)
                client.close()
                server_nbr = (server_nbr + 1) % 2
                print("I: connecting to server at %s.." % server[server_nbr])
                client = ctx.socket(zmq.REQ)
                poller.register(client, zmq.POLLIN)
                # reconnect and resend request
                client.connect(server[server_nbr])
                client.send_string("%s" % sequence)

if __name__ == '__main__':
    main()

在这里插入图片描述

双星有限状态机

五、无代理可靠性(自由职业者模式)
ZMQ虽然不强加给你一个以代理为中心的架构,但它提供了构建代理的工具。通过解构迄今为止已经建成的以代理为基础的可靠性,并把它们放入一个称之为自由职业者模式的分布式对等架构。
模式一:简单的重试和故障转移
示例Freelance server - Model 1

#
# Freelance server - Model 1
# Trivial echo service
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#

import sys
import zmq

if len(sys.argv) < 2:
    print "I: Syntax: %s <endpoint>" % sys.argv[0]
    sys.exit(0)

endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)

print "I: Echo service is ready at %s" % endpoint
while True:
    msg = server.recv_multipart()
    if not msg:
        break  # Interrupted
    server.send_multipart(msg)

server.setsockopt(zmq.LINGER, 0) # Terminate immediately

示例Freelance Client - Model 1
#
# Freelance Client - Model 1
# Uses REQ socket to query one or more services
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#

import sys
import time

import zmq

REQUEST_TIMEOUT = 1000  # ms
MAX_RETRIES = 3   # Before we abandon

def try_request(ctx, endpoint, request):
    print "I: Trying echo service at %s..." % endpoint
    client = ctx.socket(zmq.REQ)
    client.setsockopt(zmq.LINGER, 0)  # Terminate early
    client.connect(endpoint)
    client.send(request)
    poll = zmq.Poller()
    poll.register(client, zmq.POLLIN)
    socks = dict(poll.poll(REQUEST_TIMEOUT))
    if socks.get(client) == zmq.POLLIN:
        reply = client.recv_multipart()
    else:
        reply = ''
    poll.unregister(client)
    client.close()
    return reply

context = zmq.Context()
request = "Hello world"
reply = None

endpoints = len(sys.argv) - 1
if endpoints == 0:
    print "I: syntax: %s <endpoint> ..." % sys.argv[0]
elif endpoints == 1:
    # For one endpoint, we retry N times
    endpoint = sys.argv[1]
    for retries in xrange(MAX_RETRIES):
        reply = try_request(context, endpoint, request)
        if reply:
            break  # Success
        print "W: No response from %s, retrying" % endpoint
else:
    # For multiple endpoints, try each at most once
    for endpoint in sys.argv[1:]:
        reply = try_request(context, endpoint, request)
        if reply:
            break  # Success
        print "W: No response from %s" % endpoint

if reply:
    print "Service is running OK"

模式二:粗暴猎枪屠杀
示例Freelance server - Model 2

#
# Freelance server - Model 2
# Does some work, replies OK, with message sequencing
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#

import sys
import zmq

if len(sys.argv) < 2:
    print "I: Syntax: %s <endpoint>" % sys.argv[0]
    sys.exit(0)

endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)

print "I: Service is ready at %s" % endpoint
while True:
    request = server.recv_multipart()
    if not request:
        break  # Interrupted
    # Fail nastily if run against wrong client
    assert len(request) == 2

    address = request[0]
    reply = [address, "OK"]
    server.send_multipart(reply)

server.setsockopt(zmq.LINGER, 0)  # Terminate early

示例Freelance Client - Model 2

#
# Freelance Client - Model 2
# Uses DEALER socket to blast one or more services
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#

import sys
import time

import zmq

GLOBAL_TIMEOUT = 2500  # ms

class FLClient(object):
    def __init__(self):
        self.servers = 0
        self.sequence = 0
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)   # DEALER

    def destroy(self):
        self.socket.setsockopt(zmq.LINGER, 0)  # Terminate early
        self.socket.close()
        self.context.term()

    def connect(self, endpoint):
        self.socket.connect(endpoint)
        self.servers += 1
        print "I: Connected to %s" % endpoint

    def request(self, *request):
        # Prefix request with sequence number and empty envelope
        self.sequence += 1
        msg = ['', str(self.sequence)] + list(request)


        # Blast the request to all connected servers
        for server in xrange(self.servers):
            self.socket.send_multipart(msg)

        # Wait for a matching reply to arrive from anywhere
        # Since we can poll several times, calculate each one
        poll = zmq.Poller()
        poll.register(self.socket, zmq.POLLIN)

        reply = None
        endtime = time.time() + GLOBAL_TIMEOUT / 1000
        while time.time() < endtime:
            socks = dict(poll.poll((endtime - time.time()) * 1000))
            if socks.get(self.socket) == zmq.POLLIN:
                reply = self.socket.recv_multipart()
                assert len(reply) == 3
                sequence = int(reply[1])
                if sequence == self.sequence:
                    break
        return reply

if len(sys.argv) == 1:
    print "I: Usage: %s <endpoint> ..." % sys.argv[0]
    sys.exit(0)

# Create new freelance client object
client = FLClient()

for endpoint in sys.argv[1:]:
    client.connect(endpoint)

start = time.time()
for requests in xrange(10000):
    request = "random name"
    reply = client.request(request)
    if not reply:
        print "E: Name service not available, aborting"
        break
print "Average round trip cost: %i usec" % ((time.time() - start) / 100)
client.destroy()

模式三:复杂和讨厌的

示例Freelance server - Model 3
"""Freelance server - Model 3

Uses an ROUTER/ROUTER socket but just one thread

Author: Min RK <benjaminrk@gmail.com>
"""

import sys

import zmq

from zhelpers import dump

def main():
    verbose = '-v' in sys.argv

    ctx = zmq.Context()
    # Prepare server socket with predictable identity
    bind_endpoint = "tcp://*:5555"
    connect_endpoint = "tcp://localhost:5555"
    server = ctx.socket(zmq.ROUTER)
    server.identity = connect_endpoint
    server.bind(bind_endpoint)
    print "I: service is ready at", bind_endpoint

    while True:
        try:
            request = server.recv_multipart()
        except:
            break # Interrupted
        # Frame 0: identity of client
        # Frame 1: PING, or client control frame
        # Frame 2: request body
        address, control = request[:2]
        reply = [address, control]
        if control == "PING":
            reply[1] = "PONG"
        else:
            reply.append("OK")
        if verbose:
            dump(reply)
        server.send_multipart(reply)
    print "W: interrupted"

if __name__ == '__main__':
    main()

示例Freelance client - Model 3
"""
Freelance client - Model 3

Uses flcliapi class to encapsulate Freelance pattern

Author : Min RK <benjaminrk@gmail.com>
"""

import time

from flcliapi import FreelanceClient

def main():
    # Create new freelance client object
    client = FreelanceClient()

    # Connect to several endpoints
    client.connect ("tcp://localhost:5555")
    client.connect ("tcp://localhost:5556")
    client.connect ("tcp://localhost:5557")

    # Send a bunch of name resolution 'requests', measure time
    requests = 10000
    start = time.time()
    for i in range(requests):
        request = ["random name"]
        reply = client.request(request)
        if not reply:
            print "E: name service not available, aborting"
            return

    print "Average round trip cost: %d usec" % (1e6*(time.time() - start) / requests)

if __name__ == '__main__':
    main()

示例Freelance Pattern agent class

"""
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services

Author: Min RK <benjaminrk@gmail.com>
"""

import threading
import time

import zmq

from zhelpers import zpipe

# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 3000    # msecs
# PING interval for servers we think are alivecp
PING_INTERVAL  = 2000    # msecs
# Server considered dead if silent for this long
SERVER_TTL     = 6000    # msecs


def flciapi_agent(peer):
    """This is the thread that handles our real flcliapi class
    """
    pass

# =====================================================================
# Synchronous part, works in our application thread

class FreelanceClient(object):
    ctx = None      # Our Context
    pipe = None     # Pipe through to flciapi agent
    agent = None    # agent in a thread

    def __init__(self):
        self.ctx = zmq.Context()
        self.pipe, peer = zpipe(self.ctx)
        self.agent = threading.Thread(target=agent_task, args=(self.ctx,peer))
        self.agent.daemon = True
        self.agent.start()


    def connect(self, endpoint):
        """Connect to new server endpoint
        Sends [CONNECT][endpoint] to the agent
        """
        self.pipe.send_multipart(["CONNECT", endpoint])
        time.sleep(0.1) # Allow connection to come up

    def request(self, msg):
        "Send request, get reply"
        request = ["REQUEST"] + msg
        self.pipe.send_multipart(request)
        reply = self.pipe.recv_multipart()
        status = reply.pop(0)
        if status != "FAILED":
            return reply


# =====================================================================
# Asynchronous part, works in the background

# ---------------------------------------------------------------------
# Simple class for one server we talk to

class FreelanceServer(object):
    endpoint = None         # Server identity/endpoint
    alive = True            # 1 if known to be alive
    ping_at = 0             # Next ping at this time
    expires = 0             # Expires at this time

    def __init__(self, endpoint):
        self.endpoint = endpoint
        self.alive = True
        self.ping_at = time.time() + 1e-3*PING_INTERVAL
        self.expires = time.time() + 1e-3*SERVER_TTL

    def ping(self, socket):
        if time.time() > self.ping_at:
            socket.send_multipart([self.endpoint, 'PING'])
            self.ping_at = time.time() + 1e-3*PING_INTERVAL

    def tickless(self, tickless):
        if tickless > self.ping_at:
            tickless = self.ping_at
        return tickless

# ---------------------------------------------------------------------
# Simple class for one background agent

class FreelanceAgent(object):
    ctx = None              # Own context
    pipe = None             # Socket to talk back to application
    router = None           # Socket to talk to servers
    servers = None          # Servers we've connected to
    actives = None          # Servers we know are alive
    sequence = 0            # Number of requests ever sent
    request = None          # Current request if any
    reply = None            # Current reply if any
    expires = 0             # Timeout for request/reply

    def __init__(self, ctx, pipe):
        self.ctx = ctx
        self.pipe = pipe
        self.router = ctx.socket(zmq.ROUTER)
        self.servers = {}
        self.actives = []

    def control_message (self):
        msg = self.pipe.recv_multipart()
        command = msg.pop(0)

        if command == "CONNECT":
            endpoint = msg.pop(0)
            print "I: connecting to %s...n" % endpoint,
            self.router.connect(endpoint)
            server = FreelanceServer(endpoint)
            self.servers[endpoint] = server
            self.actives.append(server)
            # these are in the C case, but seem redundant:
            server.ping_at = time.time() + 1e-3*PING_INTERVAL
            server.expires = time.time() + 1e-3*SERVER_TTL
        elif command == "REQUEST":
            assert not self.request    # Strict request-reply cycle
            # Prefix request with sequence number and empty envelope
            self.request = [str(self.sequence), ''] + msg

            # Request expires after global timeout
            self.expires = time.time() + 1e-3*GLOBAL_TIMEOUT

    def router_message (self):
        reply = self.router.recv_multipart()
        # Frame 0 is server that replied
        endpoint = reply.pop(0)
        server = self.servers[endpoint]
        if not server.alive:
            self.actives.append(server)
            server.alive = 1

        server.ping_at = time.time() + 1e-3*PING_INTERVAL
        server.expires = time.time() + 1e-3*SERVER_TTL;

        # Frame 1 may be sequence number for reply
        sequence = reply.pop(0)
        if int(sequence) == self.sequence:
            self.sequence += 1
            reply = ["OK"] + reply
            self.pipe.send_multipart(reply)
            self.request = None


# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.

def agent_task(ctx, pipe):
    agent = FreelanceAgent(ctx, pipe)
    poller = zmq.Poller()
    poller.register(agent.pipe, zmq.POLLIN)
    poller.register(agent.router, zmq.POLLIN)

    while True:
        # Calculate tickless timer, up to 1 hour
        tickless = time.time() + 3600
        if (agent.request and tickless > agent.expires):
            tickless = agent.expires
            for server in agent.servers.values():
                tickless = server.tickless(tickless)
        try:
            items = dict(poller.poll(1000 * (tickless - time.time())))
        except:
            break              # Context has been shut down

        if agent.pipe in items:
            agent.control_message()

        if agent.router in items:
            agent.router_message()

        # If we're processing a request, dispatch to next server
        if (agent.request):
            if (time.time() >= agent.expires):
                # Request expired, kill it
                agent.pipe.send("FAILED")
                agent.request = None
            else:
                # Find server to talk to, remove any expired ones
                while agent.actives:
                    server = agent.actives[0]
                    if time.time() >= server.expires:
                        server.alive = 0
                        agent.actives.pop(0)
                    else:
                        request = [server.endpoint] + agent.request
                        agent.router.send_multipart(request)
                        break

        # Disconnect and delete any expired servers
        # Send heartbeats to idle servers if needed
        for server in agent.servers.values():
            server.ping(agent.router)

最后

以上就是甜蜜钢笔为你收集整理的ZeroMQ学习笔记(4)——可靠的请求-应答模式coding=gbk的全部内容,希望文章能够帮你解决ZeroMQ学习笔记(4)——可靠的请求-应答模式coding=gbk所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部