我是靠谱客的博主 彪壮冥王星,最近开发中收集的这篇文章主要介绍服务端ZMQ(一)——源码解析服务端ZMQ(一)——源码解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

服务端ZMQ(一)——源码解析

文章目录

  • 服务端ZMQ(一)——源码解析
    • 1、什么是ZeroMQ?
      • 1.1、为什么用消息队列和多线程?
      • 1.2、ZeroMQ长是什么样子 ?
    • 2、简单示例
      • 2.1、主要的四个操作
      • 2.2、HelloWorld
      • 2.3、与传统TCP的区别
    • 3、工作流程
      • **3.1、zmq_ctx_new**
      • 3.2、zmq_socket
      • 3.3、zmq_bind
      • 4、zmq_connect
      • 5、zmq_recv
    • 4、工作流程时序图
      • 4.1、接收端(server)
      • 4.2、发送端(client)
      • 4.3、zmq类层次

1、什么是ZeroMQ?

  1. 基于消息队列的多线程网络库:对套接字类型、连接处理、帧、路由底层细节进行抽象、提供跨越多种传输协议的套接字(socket library)
  2. 一个嵌入式库:封装了网络通信、消息队列、线程调度等功能,向上层应用提供API,应用程序通过加载库文件,调用API函数来实现高性能网络通信(简单好用的传输层)

总结:ZeroMQ在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口,提供一个底层的网络通讯库

注意:普通的 socket 是端到端的(1:1的关系)ZeroMQ 却是可以N:M 的关系

/**
socket
	1.建立连接		销毁连接
	2.制定协议		选择协议	protobuffer messagepack shrift
	3.处理错误		error close
	4.断线重连的问题心跳 检测
	5. IO epoll
	
有哪些模型?
1.请求回应模型,	req/rep			redis 协议请求的次序跟回应的次序

2、管道模型 		 push/ull模型   	 ginx

3、监听发布模型 	sub/pub 		观察者模式

**/

在这里插入图片描述

1.1、为什么用消息队列和多线程?

​ 看到消息队列,首先想到的是异步、消峰、解耦。例子:某个功能中很多步骤需要在一个流程里面完成,例如下图支付流程被拉长了,更加耗时。

在这里插入图片描述

引入消息队列,就变成了下图。对于一些可以同时处理的流程用线程,线程池去处理,减少响应时间。

在这里插入图片描述

1.2、ZeroMQ长是什么样子 ?

在这里插入图片描述

ZeroMQ几乎所有IO操作都是异步的,每个ZeroMQ IO线程都有与之绑定的Poller,Poller采用经典的Reactor模式实现,Poller根据不同的操作系统平台使用不同的网络IO模型(select、poll、epoll、devpoll、kequeue等)。主线程I/O线程通过Mail Box传递消息来进行通信

Server,在主线程创建zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_listener添加到Poller中用以侦听读事件。

Client,在主线程中创建zmq_connecter,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_connecter添加到Poller中用以侦听写事件。

Client与Server第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。

每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。

ZMQ性能优化的过程中发现有3个因素会对性能产生严重的影响:

  • 内存分配的次数
  • 系统调用的次数
  • 并发模型

**内存分配的次数:**一条ØMQ消息由一个不透明的句柄来表示。对于非常短小的消息,其内容被直接编码到句柄中。因此,对句柄的拷贝实际上就是对消息数据的拷贝。当遇到较大的消息时,它被分配到一个单独的缓冲区内,而句柄只包含一个指向缓冲区的指针。对句柄的拷贝并不会造成对消息数据的拷贝,当消息有数兆字节长时,这么处理是很有道理的如下图。需要提醒的是,后一种情况里缓冲区是按引用计数的,因此可以做到被多个句柄引用而不必拷贝数据。

在这里插入图片描述

**批量处理:**在消息通信系统中,系统调用的数量太多的话会导致出现性能瓶颈。当创建高性能的应用时应该尽可能多的去避免遍历调用栈。

在这里插入图片描述

**并发模型:**采用一种不同的模型,目标是完全避免锁机制,并让每个线程能够全速运行。线程间的通信是通过在线程间传递异步消息(事件)来实现的。(actor模式)

无锁算法:ØMQ在pipe对象中采用无锁队列来在用户线程和ØMQ的工作者线程之间传递消息。

2、简单示例

2.1、主要的四个操作

  1. 创建和销毁套接字:zmq_socket(), zmq_close()
  2. 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
  3. 为套接字建立连接:zmq_bind(), zmq_connect()
  4. 发送和接收消息:zmq_send(), zmq_recv()

2.2、HelloWorld

//client
#include <zmq.h>                                                                                       
#include <string.h>                                                                                    
#include <stdio.h>                                                                                     
#include <unistd.h>                                                                                    
                                                                                                       
int main (void)                                                                                        
{                                                                                                      
    printf ("Connecting to hello world server…n");                                                   
    void *context = zmq_ctx_new ();                         //创建套接字   上下文环境                                                       
    void *requester = zmq_socket (context, ZMQ_REQ);        //设置套接字   ZMQ_REQ     请求回应模型                                                                                                   
    zmq_connect (requester, "tcp://localhost:5555");        //建立连接                                        
                                                                                                       
    int request_nbr;                                                                                   
    for (request_nbr = 0; request_nbr != 5; request_nbr++) { //发送消息                                    
        char buffer [5];                                                                              
        printf ("Sending Hello %d…n", request_nbr);                                                  
        zmq_send (requester, "Hello", 5, 0);                                                           
        zmq_recv (requester, buffer, 5, 0);                                                           
        printf ("Received World %dn", request_nbr);                                                   
    }                                                                                                  
                                                                                                       
    zmq_close (requester);                                                                             
    zmq_ctx_destroy (context);                                                                         
                                                                                                       
    return 0;                                                                                          
}
//server
#include <stdio.h>                                                                                                                       #include <unistd.h>                                                                                    
#include <string.h>                                                                                    
#include <assert.h>                                                                                    
#include <zmq.h>                                                                                     
                                                                                                       
int main (void)                                                                                        
{                                                                                                                                                                           
    void *context = zmq_ctx_new ();                     //创建套接字   上下文环境                                                       
    void *responder = zmq_socket (context, ZMQ_REP);    //设置套接字   ZMQ_REP     请求回应模型                                             
    int rc = zmq_bind (responder, "tcp://*:5555");      //建立连接                                                    
    assert (rc == 0);                                                                                  
                                                                                                       
    while (1) {                                 //接收消息                                                       
        char buffer [5];                                                                              
        zmq_recv (responder, buffer, 5, 0);                                                           
        printf ("Received Hellon");                                                                   
        sleep (1);          //  Do some 'work'                                                         
        zmq_send (responder, "World", 5, 0);                                                           
    }                                                                                                  
    return 0;                                                                                          
}

实现效果:

在这里插入图片描述

2.3、与传统TCP的区别

  1. 使用多种协议:inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;
  2. 连接是异步的,并由一组消息队列做缓冲;
  3. 连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;
  4. 一个套接字可以有多个输入和输出连接;N:M;
  5. ZMQ没有提供类似accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;
  6. 应用程序无法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。、
  7. 当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定;

3、工作流程

3.1、zmq_ctx_new

/*
	zmq.cpp  原文件
*/
void *zmq_ctx_new (void)
{
	//首先是初始化网络环境  ZMQ_HAVE_OPENPGM 或者 ZMQ_HAVE_WINDOWS
    if (!zmq::initialize_network ()) {  
        return NULL;
    }

    //  Create 0MQ context.
    /*
        ctx_t	ctx.hpp源文件
        
        zmq::ctx_t::ctx_t () :
        //套接字是否使用的标志  后续close 最后一个释放
        _tag (ZMQ_CTX_TAG_VALUE_GOOD),	 
        //初始化信箱、zmq_ctx_term thread、reaper thread、I/O thread...
        _starting (true),
        //zmq_ctx_term 是否启用
        _terminating (false),
        //回收线程
        _reaper (NULL),
        //同时socket最大打开数
        _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
        //同时消息的最大数
        _max_msgsz (INT_MAX),
        //IO 线程数量
        _io_thread_count (ZMQ_IO_THREADS_DFLT),
        // 该上下文是否永远不会终止
        _blocky (true),
        //是否支持ipv6
        _ipv6 (false),
        //是否使用零拷贝消息解析功能
        _zero_copy (true)
    {
    #ifdef HAVE_FORK
        _pid = getpid ();
    #endif
    #ifdef ZMQ_HAVE_VMCI
        _vmci_fd = -1;
        _vmci_family = -1;
    #endif

        //  Initialise crypto library, if needed.
        zmq::random_open ();

    #ifdef ZMQ_USE_NSS
        NSS_NoDB_Init (NULL);
    #endif

    #ifdef ZMQ_USE_GNUTLS
        gnutls_global_init ();
    #endif
    }
        
        
        1、创建ctx指针
        2、check_tag()  --> 当前 ctx_t 的状态   _tag == ZMQ_CTX_TAG_VALUE_GOOD 
        3、terminate()  --> _terminatin 	--> start () --> create socket(并注册信箱)
        4、shutdown ()  --> _reaper(NULL)   回收线程
        5、set() --> thread_ctx_t () -->    启用新线程(参数)
        ...
    
    */
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
    if (ctx) {
        if (!ctx->valid ()) {
            delete ctx;
            return NULL;
        }
    }
    return ctx;
}
/*
	所以zmq_ctx_new 只是做了初始化工作		ctx_t提供一个start_thread后续的函数调用中进行的启动(poller_base -> start -> _ctx.start_thread | poller_set -> start -> _ctx.start_thread)
*/

3.2、zmq_socket

//1、zmq_socket
void *zmq_socket (void *ctx_, int type_)
{
    //ctx_t检查
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
        errno = EFAULT;
        return NULL;
    }
    zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
    // 返回对象指针 是基类socket_base_t(看一下基类构造形式)
    zmq::socket_base_t *s = ctx->create_socket (type_);
    return static_cast<void *> (s);
}

//2、ctx_t::create_socket
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
    scoped_lock_t locker (_slot_sync);

    //  一旦调用了zmq_ctx_term,将不能创建新套接字
    if (_terminating) {
        errno = ETERM;
        return NULL;
    }

    if (unlikely (_starting)) {
        // start_thread 启动
        if (!start ())
            return NULL;
    }

    //  如果当前已达到套接字上限,返回错误
    if (_empty_slots.empty ()) {
        errno = EMFILE;
        return NULL;
    }

    //  选择索引
    const uint32_t slot = _empty_slots.back ();
    _empty_slots.pop_back ();

    //  生成唯一id
    const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;

    //  创建套接字,并注册在其身上的mailbox
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
    if (!s) {
        _empty_slots.push_back (slot);
        return NULL;
    }
    _sockets.push_back (s);
    //该 ctx_t 上的 i_mailbox 数组
    _slots[slot] = s->get_mailbox ();

    return s;
}

//3、ctx_t::start
bool zmq::ctx_t::start ()
{
   //数组中的 mailboxes 进行初始化,增加回收线程
    _opt_sync.lock ();
    const int term_and_reaper_threads_count = 2;
    const int mazmq = _max_sockets;
    const int ios = _io_thread_count;
    _opt_sync.unlock ();
    const int slot_count = mazmq + ios + term_and_reaper_threads_count;
    try {
        //vector 重设 capacity 上限
        _slots.reserve (slot_count);
        _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
    }
    catch (const std::bad_alloc &) {
        errno = ENOMEM;
        return false;
    }
    _slots.resize (term_and_reaper_threads_count);

    // 将关闭线程的 mailbox 绑定到 ctx 上
    _slots[term_tid] = &_term_mailbox;

    //  创建回收线程并启动
    _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
    if (!_reaper) {
        errno = ENOMEM;
        goto fail_cleanup_slots;
    }
    if (!_reaper->get_mailbox ()->valid ())
        goto fail_cleanup_reaper;
    _slots[reaper_tid] = _reaper->get_mailbox ();
    _reaper->start ();

    //创建指定数量的io线程启动且注册,当然包括其mailbox
    _slots.resize (slot_count, NULL);

    for (int i = term_and_reaper_threads_count;
         i != ios + term_and_reaper_threads_count; i++) {
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
        if (!io_thread) {
            errno = ENOMEM;
            goto fail_cleanup_reaper;
        }
        if (!io_thread->get_mailbox ()->valid ()) {
            delete io_thread;
            goto fail_cleanup_reaper;
        }
        _io_threads.push_back (io_thread);
        _slots[i] = io_thread->get_mailbox ();
        //io_thread 会使用 ctx_t 上的start_thread来启动成员函数 worker_routine ,进而启动当前平台下的io接口的
        //loop(), 再接下来就是经典的 reactor 模式, 从响应的fd中,找到对应的 poll_entry_t 
        //通过判断响应的事件来调用挂接在io_thread上的对象的 in_event 或者 out_event 函数
        io_thread->start ();
    }

    //  In the unused part of the slot array, create a list of empty slots.
    for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
         i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
        _empty_slots.push_back (i);
    }
	//启动完成
    _starting = false;
    return true;

fail_cleanup_reaper:
    _reaper->stop ();
    delete _reaper;
    _reaper = NULL;

fail_cleanup_slots:
    _slots.clear ();
    return false;
}
//4、zmq::socket_base_t
zmq::socket_base_t::socket_base_t (ctx_t *parent_,
                                   uint32_t tid_,
                                   int sid_,
                                   bool thread_safe_) :
	//调用 own_t 的构造函数,用于维护对象的生命周期
    own_t (parent_, tid_),
    _sync (),
    _tag (0xbaddecaf),
    _ctx_terminated (false),
    _destroyed (false),
    _poller (NULL),
    _handle (static_cast<poller_t::handle_t> (NULL)),
    _last_tsc (0),
    _ticks (0),
    _rcvmore (false),
    _monitor_socket (NULL),
    _monitor_events (0),
    _thread_safe (thread_safe_),
    _reaper_signaler (NULL),
    _monitor_sync ()
{
    options.socket_id = sid_;
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
    options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
    options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
	//根据线程安全选项来决定是否生成线程安全的 mailbox 对象
    if (_thread_safe) {
        _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
        zmq_assert (_mailbox);
    } else {
        mailbox_t *m = new (std::nothrow) mailbox_t ();
        zmq_assert (m);

        if (m->get_fd () != retired_fd)
            _mailbox = m;
        else {
            LIBZMQ_DELETE (m);
            _mailbox = NULL;
        }
    }
}

/*
	zmq_socket 在ctx上插入一个socket_base_t对象并将其指针抛出 由own_t来维护生命周期
*/

3.3、zmq_bind

int zmq_bind (void *s_, const char *addr_)
{
    //转化成socket_base_t 指针 (socket_base_t定义了很多操作)
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
        return -1;
    //进行地址的解析和绑定地址
    return s->bind (addr_);
}

//1、zmq::socket_base_t::bind
int zmq::socket_base_t::bind (const char *endpoint_uri_)
{
    //根据线程安全拍段进行上锁准备
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);

    if (unlikely (_ctx_terminated)) {
        errno = ETERM;
        return -1;
    }

    // 执行可能存在的被挂起的命令
    int rc = process_commands (0, false);
    if (unlikely (rc != 0)) {
        return -1;
    }

   //为分割对传入的协议和地址端口进行分片
   //并对传入协议进行检查
    std::string protocol;
    std::string address;
    if (parse_uri (endpoint_uri_, protocol, address)
        || check_protocol (protocol)) {
        return -1;
    }

    // ......

	//以下传输方式需要在io线程中进行,所以我们选择一个io线程
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }

    if (protocol == protocol_name::tcp) {
        //创建tcp 监听对象
        tcp_listener_t *listener =
          new (std::nothrow) tcp_listener_t (io_thread, this, options);
        alloc_assert (listener);
        //设置地址
        rc = listener->set_local_address (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE (listener);
            event_bind_failed (make_unconnected_bind_endpoint_pair (address),
                               zmq_errno ());
            return -1;
        }

        // Save last endpoint URI
        listener->get_local_address (_last_endpoint);
		//将节点插入子树中
        add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
                      static_cast<own_t *> (listener), NULL);
        options.connected = true;
        return 0;
    }
 	
    // ......
	
    zmq_assert (false);
    return -1;
}

//2、zmq::ctx_t::choose_io_thread   传入绑定的CPU下标
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{
    if (_io_threads.empty ())
        return NULL;

     //根据cpu偏好以及当前的io压力来选择压力最小的io线程并返回
    int min_load = -1;
    io_thread_t *selected_io_thread = NULL;
    for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
         i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
            const int load = _io_threads[i]->get_load ();
            if (selected_io_thread == NULL || load < min_load) {
                min_load = load;
                selected_io_thread = _io_threads[i];
            }
        }
    }
    return selected_io_thread;
}

//3、zmq::socket_base_t::add_endpoint
void zmq::socket_base_t::add_endpoint (
  const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
{
    //  将新节点插入endpoint_
    launch_child (endpoint_);
    //插入ctx
    _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
                                          endpoint_pipe_t (endpoint_, pipe_));

    if (pipe_ != NULL)
        pipe_->set_endpoint_pair (endpoint_pair_);
}

//4、zmq::own_t::launch_child
void zmq::own_t::launch_child (own_t *object_)
{
    //  插入
    object_->set_owner (this);

    //  向object_所属的io线程发送plug消息,在执行process_plug
    send_plug (object_);

    //  设置object_归属权
    send_own (this, object_);
}


/*
	zmq 在进行bind操作后,并不是马上绑定上的,虽然时间很短但其实是一个异步的流程
	
	tcp_listener_t 在plug初始化过程中,首先加入到当前的socket_base_t, 然后向tcp_listener_t发起一个plug消息,在io_thread的mailbox中进行缓存,并再下次loop循环中进行 tcp_listener_t process_plug
	
	为什么process_plug用消息启动:zmq开启多线程模式时,直接执行 process_plug 操作可能会不在 tcp_listener_t 所属线程中执行,也就是破坏了 actor 设计初衷(猜测)
*/

4、zmq_connect

int zmq_connect (void *s_, const char *addr_)
{
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
        return -1;
    return s->connect (addr_);
}

int zmq::socket_base_t::connect (const char *endpoint_uri_)
{
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
    return connect_internal (endpoint_uri_);
}


int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
{
    if (unlikely (_ctx_terminated)) {
        errno = ETERM;
        return -1;
    }

    //  执行任何可能被挂起的命令
    int rc = process_commands (0, false);
    if (unlikely (rc != 0)) {
        return -1;
    }

    //  解析 endpoint_uri_字符串 以 :
    //  为边界进行分别
    std::string protocol;
    std::string address;
    if (parse_uri (endpoint_uri_, protocol, address)
        || check_protocol (protocol)) {
        return -1;
    }
   
    //......
    //DEALER SUB PUB REQ 不支持对一个端点同时开启多个会话,进行判断当前会话是否存在
    const bool is_single_connect =
      (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
       || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
        if (0 != _endpoints.count (endpoint_uri_)) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
            return 0;
        }
    }

    //选择io线程去运行我们的会话对象
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }

    address_t *paddr =
      new (std::nothrow) address_t (protocol, address, this->get_ctx ());
    alloc_assert (paddr);

    //  Resolve address (if needed by the protocol)
    if (protocol == protocol_name::tcp) {
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
        //  - IPv6 link local address may contain % followed by interface name / zone_id
        //    (Reference: https://tools.ietf.org/html/rfc4007)
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        //对提供的字符串进行解析
        const char *check = address.c_str ();
        if (isalnum (*check) || isxdigit (*check) || *check == '['
            || *check == ':') {
            check++;
            while (isalnum (*check) || isxdigit (*check) || *check == '.'
                   || *check == '-' || *check == ':' || *check == '%'
                   || *check == ';' || *check == '[' || *check == ']'
                   || *check == '_' || *check == '*') {
                check++;
            }
        }
        //  Assume the worst, now look for success
        rc = -1;
         //检查地址是否是安全有效的
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0; //  Valid
            }
        }
        if (rc == -1) {
            errno = EINVAL;
            LIBZMQ_DELETE (paddr);
            return -1;
        }
        /推迟解决方案配置
        paddr->resolved.tcp_addr = NULL;
    }
#ifdef ZMQ_HAVE_WS
#ifdef ZMQ_HAVE_WSS
    else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
        if (protocol == protocol_name::wss) {
            paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
            alloc_assert (paddr->resolved.wss_addr);
            rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
                                                    options.ipv6);
        } else
#else
    else if (protocol == protocol_name::ws) {
#endif
        {
            paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
            alloc_assert (paddr->resolved.ws_addr);
            rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
                                                   options.ipv6);
        }

        if (rc != 0) {
            LIBZMQ_DELETE (paddr);
            return -1;
        }
    }
#endif

#if defined ZMQ_HAVE_IPC
    else if (protocol == protocol_name::ipc) {
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
        alloc_assert (paddr->resolved.ipc_addr);
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE (paddr);
            return -1;
        }
    }
#endif

    if (protocol == protocol_name::udp) {
        if (options.type != ZMQ_RADIO) {
            errno = ENOCOMPATPROTO;
            LIBZMQ_DELETE (paddr);
            return -1;
        }

        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
        rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
                                                options.ipv6);
        if (rc != 0) {
            LIBZMQ_DELETE (paddr);
            return -1;
        }
    }

    // TBD - Should we check address for ZMQ_HAVE_NORM???

#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc =
          pgm_socket_t::init_address (address.c_str (), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
        if (rc != 0 || port_number == 0) {
            return -1;
        }
    }
#endif
#if defined ZMQ_HAVE_TIPC
    else if (protocol == protocol_name::tipc) {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE (paddr);
            return -1;
        }
        const sockaddr_tipc *const saddr =
          reinterpret_cast<const sockaddr_tipc *> (
            paddr->resolved.tipc_addr->addr ());
        // Cannot connect to random Port Identity
        if (saddr->addrtype == TIPC_ADDR_ID
            && paddr->resolved.tipc_addr->is_random ()) {
            LIBZMQ_DELETE (paddr);
            errno = EINVAL;
            return -1;
        }
    }
#endif
#if defined ZMQ_HAVE_VMCI
    else if (protocol == protocol_name::vmci) {
        paddr->resolved.vmci_addr =
          new (std::nothrow) vmci_address_t (this->get_ctx ());
        alloc_assert (paddr->resolved.vmci_addr);
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE (paddr);
            return -1;
        }
    }
#endif

    //  Create session.
    session_base_t *session =
      session_base_t::create (io_thread, true, this, options, paddr);
    errno_assert (session);

    //  PGM does not support subscription forwarding; ask for all data to be
    //  sent to this pipe. (same for NORM, currently?)
#if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
    const bool subscribe_to_all =
      protocol == protocol_name::pgm || protocol == protocol_name::epgm
      || protocol == protocol_name::norm || protocol == protocol_name::udp;
#elif defined ZMQ_HAVE_OPENPGM
    const bool subscribe_to_all = protocol == protocol_name::pgm
                                  || protocol == protocol_name::epgm
                                  || protocol == protocol_name::udp;
#elif defined ZMQ_HAVE_NORM
    const bool subscribe_to_all =
      protocol == protocol_name::norm || protocol == protocol_name::udp;
#else
    const bool subscribe_to_all = protocol == protocol_name::udp;
#endif
    pipe_t *newpipe = NULL;

    if (options.immediate != 1 || subscribe_to_all) {
        //  Create a bi-directional pipe.
        object_t *parents[2] = {this, session};
        pipe_t *new_pipes[2] = {NULL, NULL};

        const bool conflate = get_effective_conflate_option (options);

        int hwms[2] = {conflate ? -1 : options.sndhwm,
                       conflate ? -1 : options.rcvhwm};
        bool conflates[2] = {conflate, conflate};
        rc = pipepair (parents, new_pipes, hwms, conflates);
        errno_assert (rc == 0);

        //  Attach local end of the pipe to the socket object.
        attach_pipe (new_pipes[0], subscribe_to_all, true);
        newpipe = new_pipes[0];

        //  Attach remote end of the pipe to the session object later on.
        session->attach_pipe (new_pipes[1]);
    }
	
    // .........
        
    //  Save last endpoint URI
    paddr->to_string (_last_endpoint);

    add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
                  static_cast<own_t *> (session), newpipe);
    return 0;
}


5、zmq_recv

int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
        return -1;
    //初始化 zmq_msg_t 
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

    const int nbytes = s_recvmsg (s, &msg, flags_);
    if (unlikely (nbytes < 0)) {
        const int err = errno;
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
        errno = err;
        return -1;
    }

    // 判断是否超过给定的大小
    const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;

    //  We exp如果比给定的大小 大,则进行拷贝
    if (to_copy) {
        assert (buf_);
        memcpy (buf_, zmq_msg_data (&msg), to_copy);
    }
    //释放消息 (当一个消息不再被使用的时候应该立刻调用zmq_msg_close()进行资源释放,否则可能引起内存泄露)
    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

    return nbytes;
}


static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
    //调用 socket_base_t 的recv函数
	//以我们目前来说实际上就是调用的 rep_t 的 xrecv,在 socket_base_t 中,recv 调用的虚函数 xrecv 而 xrecv 将会被子类重写
    const int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
    if (unlikely (rc < 0))
        return -1;

    //  Truncate returned size to INT_MAX to avoid overflow to negative values
    const size_t sz = zmq_msg_size (msg_);
    return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
}


4、工作流程时序图

4.1、接收端(server)

在这里插入图片描述

4.2、发送端(client)

在这里插入图片描述

4.3、zmq类层次

在这里插入图片描述

①、object_t,主要用于发送命令和处理命令,所有继承object_t的子类都具备该类的功能

②、io_thread_t,内含一个poller,可监听句柄的读、写、异常状态,继承自object_t,具有接收命令、处理命令、发送命令的功能

③、io_object_t,可以获取一个io_thread_t的poller,从而具备poller功能,所有继承自该类的子类都具有pollere功能,可监听句柄的读、写、异常状态

④、reaper_t,zmq的回收线程

⑤、own_t,zmq的对象树结点,或者说多叉树的结点,其主要用于对象的销毁,可以想到,对象的销毁就是这棵树的销毁过程,必须要使用深度优先的算法来销毁。关于zmq对象树在Internal Architecture of libzmq有详细讲解

⑥、tcp_connector_t,zmq_socket的连接器,使用她来建立tcp连接

⑦、tcp_listener_t,zmq_socket的监听器

⑧、stream_engine,负责处理io事件中的一种----网络事件,把网络字节流转换成zeromq的msg_t消息传递给session_base_t。另外一些和版本兼容相关的杂务也stream_engine处理的。stream_engine_t处理完杂务,到session_base_t就只看见msg_t了。

⑨、session_base_t,管理zmq_socket的连接和通信,主要与engine进行交换

⑩、socket_base_t,zeromq的socket,在zmq中,被当成一种特殊的”线程“,具有收发命令的功能

//参考资料
https://blog.csdn.net/tbyzs/article/category/1710475   		zeromq源码分析
https://www.cnblogs.com/zengzy/category/777608.html			zeromq源码学习笔记		

最后

以上就是彪壮冥王星为你收集整理的服务端ZMQ(一)——源码解析服务端ZMQ(一)——源码解析的全部内容,希望文章能够帮你解决服务端ZMQ(一)——源码解析服务端ZMQ(一)——源码解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(37)

评论列表共有 0 条评论

立即
投稿
返回
顶部