概述
服务端ZMQ(一)——源码解析
文章目录
- 服务端ZMQ(一)——源码解析
- 1、什么是ZeroMQ?
- 1.1、为什么用消息队列和多线程?
- 1.2、ZeroMQ长是什么样子 ?
- 2、简单示例
- 2.1、主要的四个操作
- 2.2、HelloWorld
- 2.3、与传统TCP的区别
- 3、工作流程
- **3.1、zmq_ctx_new**
- 3.2、zmq_socket
- 3.3、zmq_bind
- 4、zmq_connect
- 5、zmq_recv
- 4、工作流程时序图
- 4.1、接收端(server)
- 4.2、发送端(client)
- 4.3、zmq类层次
1、什么是ZeroMQ?
- 基于消息队列的多线程网络库:对套接字类型、连接处理、帧、路由底层细节进行抽象、提供跨越多种传输协议的套接字(socket library)
- 一个嵌入式库:封装了网络通信、消息队列、线程调度等功能,向上层应用提供API,应用程序通过加载库文件,调用API函数来实现高性能网络通信(简单好用的传输层)
总结:ZeroMQ在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口,提供一个底层的网络通讯库
注意:普通的 socket 是端到端的(1:1的关系)ZeroMQ 却是可以N:M 的关系
/**
socket
1.建立连接 销毁连接
2.制定协议 选择协议 protobuffer messagepack shrift
3.处理错误 error close
4.断线重连的问题心跳 检测
5. IO epoll
有哪些模型?
1.请求回应模型, req/rep redis 协议请求的次序跟回应的次序
2、管道模型 push/ull模型 ginx
3、监听发布模型 sub/pub 观察者模式
**/
1.1、为什么用消息队列和多线程?
看到消息队列,首先想到的是异步、消峰、解耦。例子:某个功能中很多步骤需要在一个流程里面完成,例如下图支付流程被拉长了,更加耗时。
引入消息队列,就变成了下图。对于一些可以同时处理的流程用线程,线程池去处理,减少响应时间。
1.2、ZeroMQ长是什么样子 ?
ZeroMQ几乎所有IO操作都是异步的,每个ZeroMQ IO线程
都有与之绑定的Poller,Poller采用经典的Reactor模式实现,Poller根据不同的操作系统平台使用不同的网络IO模型(select、poll、epoll、devpoll、kequeue等)。主线程与I/O线程通过Mail Box传递消息来进行通信
Server,在主线程创建zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_listener添加到Poller中用以侦听读事件。
Client,在主线程中创建zmq_connecter,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_connecter添加到Poller中用以侦听写事件。
Client与Server第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。
每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。
ZMQ性能优化的过程中发现有3个因素会对性能产生严重的影响:
- 内存分配的次数
- 系统调用的次数
- 并发模型
**内存分配的次数:**一条ØMQ消息由一个不透明的句柄来表示。对于非常短小的消息,其内容被直接编码到句柄中。因此,对句柄的拷贝实际上就是对消息数据的拷贝。当遇到较大的消息时,它被分配到一个单独的缓冲区内,而句柄只包含一个指向缓冲区的指针。对句柄的拷贝并不会造成对消息数据的拷贝,当消息有数兆字节长时,这么处理是很有道理的如下图。需要提醒的是,后一种情况里缓冲区是按引用计数的,因此可以做到被多个句柄引用而不必拷贝数据。
**批量处理:**在消息通信系统中,系统调用的数量太多的话会导致出现性能瓶颈。当创建高性能的应用时应该尽可能多的去避免遍历调用栈。
**并发模型:**采用一种不同的模型,目标是完全避免锁机制,并让每个线程能够全速运行。线程间的通信是通过在线程间传递异步消息(事件)来实现的。(actor模式)
无锁算法:ØMQ在pipe对象中采用无锁队列来在用户线程和ØMQ的工作者线程之间传递消息。
2、简单示例
2.1、主要的四个操作
- 创建和销毁套接字:zmq_socket(), zmq_close()
- 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
- 为套接字建立连接:zmq_bind(), zmq_connect()
- 发送和接收消息:zmq_send(), zmq_recv()
2.2、HelloWorld
//client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main (void)
{
printf ("Connecting to hello world server…n");
void *context = zmq_ctx_new (); //创建套接字 上下文环境
void *requester = zmq_socket (context, ZMQ_REQ); //设置套接字 ZMQ_REQ 请求回应模型
zmq_connect (requester, "tcp://localhost:5555"); //建立连接
int request_nbr;
for (request_nbr = 0; request_nbr != 5; request_nbr++) { //发送消息
char buffer [5];
printf ("Sending Hello %d…n", request_nbr);
zmq_send (requester, "Hello", 5, 0);
zmq_recv (requester, buffer, 5, 0);
printf ("Received World %dn", request_nbr);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
//server
#include <stdio.h> #include <unistd.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
int main (void)
{
void *context = zmq_ctx_new (); //创建套接字 上下文环境
void *responder = zmq_socket (context, ZMQ_REP); //设置套接字 ZMQ_REP 请求回应模型
int rc = zmq_bind (responder, "tcp://*:5555"); //建立连接
assert (rc == 0);
while (1) { //接收消息
char buffer [5];
zmq_recv (responder, buffer, 5, 0);
printf ("Received Hellon");
sleep (1); // Do some 'work'
zmq_send (responder, "World", 5, 0);
}
return 0;
}
实现效果:
2.3、与传统TCP的区别
- 使用多种协议:inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;
- 连接是异步的,并由一组消息队列做缓冲;
- 连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;
- 一个套接字可以有多个输入和输出连接;N:M;
- ZMQ没有提供类似accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;
- 应用程序无法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。、
- 当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定;
3、工作流程
3.1、zmq_ctx_new
/*
zmq.cpp 原文件
*/
void *zmq_ctx_new (void)
{
//首先是初始化网络环境 ZMQ_HAVE_OPENPGM 或者 ZMQ_HAVE_WINDOWS
if (!zmq::initialize_network ()) {
return NULL;
}
// Create 0MQ context.
/*
ctx_t ctx.hpp源文件
zmq::ctx_t::ctx_t () :
//套接字是否使用的标志 后续close 最后一个释放
_tag (ZMQ_CTX_TAG_VALUE_GOOD),
//初始化信箱、zmq_ctx_term thread、reaper thread、I/O thread...
_starting (true),
//zmq_ctx_term 是否启用
_terminating (false),
//回收线程
_reaper (NULL),
//同时socket最大打开数
_max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
//同时消息的最大数
_max_msgsz (INT_MAX),
//IO 线程数量
_io_thread_count (ZMQ_IO_THREADS_DFLT),
// 该上下文是否永远不会终止
_blocky (true),
//是否支持ipv6
_ipv6 (false),
//是否使用零拷贝消息解析功能
_zero_copy (true)
{
#ifdef HAVE_FORK
_pid = getpid ();
#endif
#ifdef ZMQ_HAVE_VMCI
_vmci_fd = -1;
_vmci_family = -1;
#endif
// Initialise crypto library, if needed.
zmq::random_open ();
#ifdef ZMQ_USE_NSS
NSS_NoDB_Init (NULL);
#endif
#ifdef ZMQ_USE_GNUTLS
gnutls_global_init ();
#endif
}
1、创建ctx指针
2、check_tag() --> 当前 ctx_t 的状态 _tag == ZMQ_CTX_TAG_VALUE_GOOD
3、terminate() --> _terminatin --> start () --> create socket(并注册信箱)
4、shutdown () --> _reaper(NULL) 回收线程
5、set() --> thread_ctx_t () --> 启用新线程(参数)
...
*/
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
if (ctx) {
if (!ctx->valid ()) {
delete ctx;
return NULL;
}
}
return ctx;
}
/*
所以zmq_ctx_new 只是做了初始化工作 ctx_t提供一个start_thread后续的函数调用中进行的启动(poller_base -> start -> _ctx.start_thread | poller_set -> start -> _ctx.start_thread)
*/
3.2、zmq_socket
//1、zmq_socket
void *zmq_socket (void *ctx_, int type_)
{
//ctx_t检查
if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
errno = EFAULT;
return NULL;
}
zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
// 返回对象指针 是基类socket_base_t(看一下基类构造形式)
zmq::socket_base_t *s = ctx->create_socket (type_);
return static_cast<void *> (s);
}
//2、ctx_t::create_socket
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
scoped_lock_t locker (_slot_sync);
// 一旦调用了zmq_ctx_term,将不能创建新套接字
if (_terminating) {
errno = ETERM;
return NULL;
}
if (unlikely (_starting)) {
// start_thread 启动
if (!start ())
return NULL;
}
// 如果当前已达到套接字上限,返回错误
if (_empty_slots.empty ()) {
errno = EMFILE;
return NULL;
}
// 选择索引
const uint32_t slot = _empty_slots.back ();
_empty_slots.pop_back ();
// 生成唯一id
const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
// 创建套接字,并注册在其身上的mailbox
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
if (!s) {
_empty_slots.push_back (slot);
return NULL;
}
_sockets.push_back (s);
//该 ctx_t 上的 i_mailbox 数组
_slots[slot] = s->get_mailbox ();
return s;
}
//3、ctx_t::start
bool zmq::ctx_t::start ()
{
//数组中的 mailboxes 进行初始化,增加回收线程
_opt_sync.lock ();
const int term_and_reaper_threads_count = 2;
const int mazmq = _max_sockets;
const int ios = _io_thread_count;
_opt_sync.unlock ();
const int slot_count = mazmq + ios + term_and_reaper_threads_count;
try {
//vector 重设 capacity 上限
_slots.reserve (slot_count);
_empty_slots.reserve (slot_count - term_and_reaper_threads_count);
}
catch (const std::bad_alloc &) {
errno = ENOMEM;
return false;
}
_slots.resize (term_and_reaper_threads_count);
// 将关闭线程的 mailbox 绑定到 ctx 上
_slots[term_tid] = &_term_mailbox;
// 创建回收线程并启动
_reaper = new (std::nothrow) reaper_t (this, reaper_tid);
if (!_reaper) {
errno = ENOMEM;
goto fail_cleanup_slots;
}
if (!_reaper->get_mailbox ()->valid ())
goto fail_cleanup_reaper;
_slots[reaper_tid] = _reaper->get_mailbox ();
_reaper->start ();
//创建指定数量的io线程启动且注册,当然包括其mailbox
_slots.resize (slot_count, NULL);
for (int i = term_and_reaper_threads_count;
i != ios + term_and_reaper_threads_count; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
if (!io_thread) {
errno = ENOMEM;
goto fail_cleanup_reaper;
}
if (!io_thread->get_mailbox ()->valid ()) {
delete io_thread;
goto fail_cleanup_reaper;
}
_io_threads.push_back (io_thread);
_slots[i] = io_thread->get_mailbox ();
//io_thread 会使用 ctx_t 上的start_thread来启动成员函数 worker_routine ,进而启动当前平台下的io接口的
//loop(), 再接下来就是经典的 reactor 模式, 从响应的fd中,找到对应的 poll_entry_t
//通过判断响应的事件来调用挂接在io_thread上的对象的 in_event 或者 out_event 函数
io_thread->start ();
}
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
_empty_slots.push_back (i);
}
//启动完成
_starting = false;
return true;
fail_cleanup_reaper:
_reaper->stop ();
delete _reaper;
_reaper = NULL;
fail_cleanup_slots:
_slots.clear ();
return false;
}
//4、zmq::socket_base_t
zmq::socket_base_t::socket_base_t (ctx_t *parent_,
uint32_t tid_,
int sid_,
bool thread_safe_) :
//调用 own_t 的构造函数,用于维护对象的生命周期
own_t (parent_, tid_),
_sync (),
_tag (0xbaddecaf),
_ctx_terminated (false),
_destroyed (false),
_poller (NULL),
_handle (static_cast<poller_t::handle_t> (NULL)),
_last_tsc (0),
_ticks (0),
_rcvmore (false),
_monitor_socket (NULL),
_monitor_events (0),
_thread_safe (thread_safe_),
_reaper_signaler (NULL),
_monitor_sync ()
{
options.socket_id = sid_;
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
//根据线程安全选项来决定是否生成线程安全的 mailbox 对象
if (_thread_safe) {
_mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
zmq_assert (_mailbox);
} else {
mailbox_t *m = new (std::nothrow) mailbox_t ();
zmq_assert (m);
if (m->get_fd () != retired_fd)
_mailbox = m;
else {
LIBZMQ_DELETE (m);
_mailbox = NULL;
}
}
}
/*
zmq_socket 在ctx上插入一个socket_base_t对象并将其指针抛出 由own_t来维护生命周期
*/
3.3、zmq_bind
int zmq_bind (void *s_, const char *addr_)
{
//转化成socket_base_t 指针 (socket_base_t定义了很多操作)
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
//进行地址的解析和绑定地址
return s->bind (addr_);
}
//1、zmq::socket_base_t::bind
int zmq::socket_base_t::bind (const char *endpoint_uri_)
{
//根据线程安全拍段进行上锁准备
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
// 执行可能存在的被挂起的命令
int rc = process_commands (0, false);
if (unlikely (rc != 0)) {
return -1;
}
//为分割对传入的协议和地址端口进行分片
//并对传入协议进行检查
std::string protocol;
std::string address;
if (parse_uri (endpoint_uri_, protocol, address)
|| check_protocol (protocol)) {
return -1;
}
// ......
//以下传输方式需要在io线程中进行,所以我们选择一个io线程
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}
if (protocol == protocol_name::tcp) {
//创建tcp 监听对象
tcp_listener_t *listener =
new (std::nothrow) tcp_listener_t (io_thread, this, options);
alloc_assert (listener);
//设置地址
rc = listener->set_local_address (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (listener);
event_bind_failed (make_unconnected_bind_endpoint_pair (address),
zmq_errno ());
return -1;
}
// Save last endpoint URI
listener->get_local_address (_last_endpoint);
//将节点插入子树中
add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
static_cast<own_t *> (listener), NULL);
options.connected = true;
return 0;
}
// ......
zmq_assert (false);
return -1;
}
//2、zmq::ctx_t::choose_io_thread 传入绑定的CPU下标
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{
if (_io_threads.empty ())
return NULL;
//根据cpu偏好以及当前的io压力来选择压力最小的io线程并返回
int min_load = -1;
io_thread_t *selected_io_thread = NULL;
for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
i++) {
if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
const int load = _io_threads[i]->get_load ();
if (selected_io_thread == NULL || load < min_load) {
min_load = load;
selected_io_thread = _io_threads[i];
}
}
}
return selected_io_thread;
}
//3、zmq::socket_base_t::add_endpoint
void zmq::socket_base_t::add_endpoint (
const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
{
// 将新节点插入endpoint_
launch_child (endpoint_);
//插入ctx
_endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
endpoint_pipe_t (endpoint_, pipe_));
if (pipe_ != NULL)
pipe_->set_endpoint_pair (endpoint_pair_);
}
//4、zmq::own_t::launch_child
void zmq::own_t::launch_child (own_t *object_)
{
// 插入
object_->set_owner (this);
// 向object_所属的io线程发送plug消息,在执行process_plug
send_plug (object_);
// 设置object_归属权
send_own (this, object_);
}
/*
zmq 在进行bind操作后,并不是马上绑定上的,虽然时间很短但其实是一个异步的流程
tcp_listener_t 在plug初始化过程中,首先加入到当前的socket_base_t, 然后向tcp_listener_t发起一个plug消息,在io_thread的mailbox中进行缓存,并再下次loop循环中进行 tcp_listener_t process_plug
为什么process_plug用消息启动:zmq开启多线程模式时,直接执行 process_plug 操作可能会不在 tcp_listener_t 所属线程中执行,也就是破坏了 actor 设计初衷(猜测)
*/
4、zmq_connect
int zmq_connect (void *s_, const char *addr_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->connect (addr_);
}
int zmq::socket_base_t::connect (const char *endpoint_uri_)
{
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
return connect_internal (endpoint_uri_);
}
int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
{
if (unlikely (_ctx_terminated)) {
errno = ETERM;
return -1;
}
// 执行任何可能被挂起的命令
int rc = process_commands (0, false);
if (unlikely (rc != 0)) {
return -1;
}
// 解析 endpoint_uri_字符串 以 :
// 为边界进行分别
std::string protocol;
std::string address;
if (parse_uri (endpoint_uri_, protocol, address)
|| check_protocol (protocol)) {
return -1;
}
//......
//DEALER SUB PUB REQ 不支持对一个端点同时开启多个会话,进行判断当前会话是否存在
const bool is_single_connect =
(options.type == ZMQ_DEALER || options.type == ZMQ_SUB
|| options.type == ZMQ_PUB || options.type == ZMQ_REQ);
if (unlikely (is_single_connect)) {
if (0 != _endpoints.count (endpoint_uri_)) {
// There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
// nonsensical results.
return 0;
}
}
//选择io线程去运行我们的会话对象
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}
address_t *paddr =
new (std::nothrow) address_t (protocol, address, this->get_ctx ());
alloc_assert (paddr);
// Resolve address (if needed by the protocol)
if (protocol == protocol_name::tcp) {
// Do some basic sanity checks on tcp:// address syntax
// - hostname starts with digit or letter, with embedded '-' or '.'
// - IPv6 address may contain hex chars and colons.
// - IPv6 link local address may contain % followed by interface name / zone_id
// (Reference: https://tools.ietf.org/html/rfc4007)
// - IPv4 address may contain decimal digits and dots.
// - Address must end in ":port" where port is *, or numeric
// - Address may contain two parts separated by ':'
// Following code is quick and dirty check to catch obvious errors,
// without trying to be fully accurate.
//对提供的字符串进行解析
const char *check = address.c_str ();
if (isalnum (*check) || isxdigit (*check) || *check == '['
|| *check == ':') {
check++;
while (isalnum (*check) || isxdigit (*check) || *check == '.'
|| *check == '-' || *check == ':' || *check == '%'
|| *check == ';' || *check == '[' || *check == ']'
|| *check == '_' || *check == '*') {
check++;
}
}
// Assume the worst, now look for success
rc = -1;
//检查地址是否是安全有效的
if (*check == 0) {
// Do we have a valid port string? (cannot be '*' in connect
check = strrchr (address.c_str (), ':');
if (check) {
check++;
if (*check && (isdigit (*check)))
rc = 0; // Valid
}
}
if (rc == -1) {
errno = EINVAL;
LIBZMQ_DELETE (paddr);
return -1;
}
/推迟解决方案配置
paddr->resolved.tcp_addr = NULL;
}
#ifdef ZMQ_HAVE_WS
#ifdef ZMQ_HAVE_WSS
else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
if (protocol == protocol_name::wss) {
paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
alloc_assert (paddr->resolved.wss_addr);
rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
options.ipv6);
} else
#else
else if (protocol == protocol_name::ws) {
#endif
{
paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
alloc_assert (paddr->resolved.ws_addr);
rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
options.ipv6);
}
if (rc != 0) {
LIBZMQ_DELETE (paddr);
return -1;
}
}
#endif
#if defined ZMQ_HAVE_IPC
else if (protocol == protocol_name::ipc) {
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
alloc_assert (paddr->resolved.ipc_addr);
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (paddr);
return -1;
}
}
#endif
if (protocol == protocol_name::udp) {
if (options.type != ZMQ_RADIO) {
errno = ENOCOMPATPROTO;
LIBZMQ_DELETE (paddr);
return -1;
}
paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
alloc_assert (paddr->resolved.udp_addr);
rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
options.ipv6);
if (rc != 0) {
LIBZMQ_DELETE (paddr);
return -1;
}
}
// TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM
if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
struct pgm_addrinfo_t *res = NULL;
uint16_t port_number = 0;
int rc =
pgm_socket_t::init_address (address.c_str (), &res, &port_number);
if (res != NULL)
pgm_freeaddrinfo (res);
if (rc != 0 || port_number == 0) {
return -1;
}
}
#endif
#if defined ZMQ_HAVE_TIPC
else if (protocol == protocol_name::tipc) {
paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
alloc_assert (paddr->resolved.tipc_addr);
int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (paddr);
return -1;
}
const sockaddr_tipc *const saddr =
reinterpret_cast<const sockaddr_tipc *> (
paddr->resolved.tipc_addr->addr ());
// Cannot connect to random Port Identity
if (saddr->addrtype == TIPC_ADDR_ID
&& paddr->resolved.tipc_addr->is_random ()) {
LIBZMQ_DELETE (paddr);
errno = EINVAL;
return -1;
}
}
#endif
#if defined ZMQ_HAVE_VMCI
else if (protocol == protocol_name::vmci) {
paddr->resolved.vmci_addr =
new (std::nothrow) vmci_address_t (this->get_ctx ());
alloc_assert (paddr->resolved.vmci_addr);
int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (paddr);
return -1;
}
}
#endif
// Create session.
session_base_t *session =
session_base_t::create (io_thread, true, this, options, paddr);
errno_assert (session);
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. (same for NORM, currently?)
#if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
const bool subscribe_to_all =
protocol == protocol_name::pgm || protocol == protocol_name::epgm
|| protocol == protocol_name::norm || protocol == protocol_name::udp;
#elif defined ZMQ_HAVE_OPENPGM
const bool subscribe_to_all = protocol == protocol_name::pgm
|| protocol == protocol_name::epgm
|| protocol == protocol_name::udp;
#elif defined ZMQ_HAVE_NORM
const bool subscribe_to_all =
protocol == protocol_name::norm || protocol == protocol_name::udp;
#else
const bool subscribe_to_all = protocol == protocol_name::udp;
#endif
pipe_t *newpipe = NULL;
if (options.immediate != 1 || subscribe_to_all) {
// Create a bi-directional pipe.
object_t *parents[2] = {this, session};
pipe_t *new_pipes[2] = {NULL, NULL};
const bool conflate = get_effective_conflate_option (options);
int hwms[2] = {conflate ? -1 : options.sndhwm,
conflate ? -1 : options.rcvhwm};
bool conflates[2] = {conflate, conflate};
rc = pipepair (parents, new_pipes, hwms, conflates);
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
attach_pipe (new_pipes[0], subscribe_to_all, true);
newpipe = new_pipes[0];
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (new_pipes[1]);
}
// .........
// Save last endpoint URI
paddr->to_string (_last_endpoint);
add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
static_cast<own_t *> (session), newpipe);
return 0;
}
5、zmq_recv
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
//初始化 zmq_msg_t
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
const int nbytes = s_recvmsg (s, &msg, flags_);
if (unlikely (nbytes < 0)) {
const int err = errno;
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
errno = err;
return -1;
}
// 判断是否超过给定的大小
const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
// We exp如果比给定的大小 大,则进行拷贝
if (to_copy) {
assert (buf_);
memcpy (buf_, zmq_msg_data (&msg), to_copy);
}
//释放消息 (当一个消息不再被使用的时候应该立刻调用zmq_msg_close()进行资源释放,否则可能引起内存泄露)
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
return nbytes;
}
static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
//调用 socket_base_t 的recv函数
//以我们目前来说实际上就是调用的 rep_t 的 xrecv,在 socket_base_t 中,recv 调用的虚函数 xrecv 而 xrecv 将会被子类重写
const int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
if (unlikely (rc < 0))
return -1;
// Truncate returned size to INT_MAX to avoid overflow to negative values
const size_t sz = zmq_msg_size (msg_);
return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
}
4、工作流程时序图
4.1、接收端(server)
4.2、发送端(client)
4.3、zmq类层次
①、object_t,主要用于发送命令和处理命令,所有继承object_t的子类都具备该类的功能
②、io_thread_t,内含一个poller,可监听句柄的读、写、异常状态,继承自object_t,具有接收命令、处理命令、发送命令的功能
③、io_object_t,可以获取一个io_thread_t的poller,从而具备poller功能,所有继承自该类的子类都具有pollere功能,可监听句柄的读、写、异常状态
④、reaper_t,zmq的回收线程
⑤、own_t,zmq的对象树结点,或者说多叉树的结点,其主要用于对象的销毁,可以想到,对象的销毁就是这棵树的销毁过程,必须要使用深度优先的算法来销毁。关于zmq对象树在Internal Architecture of libzmq有详细讲解
⑥、tcp_connector_t,zmq_socket的连接器,使用她来建立tcp连接
⑦、tcp_listener_t,zmq_socket的监听器
⑧、stream_engine,负责处理io事件中的一种----网络事件,把网络字节流转换成zeromq的msg_t消息传递给session_base_t。另外一些和版本兼容相关的杂务也stream_engine处理的。stream_engine_t处理完杂务,到session_base_t就只看见msg_t了。
⑨、session_base_t,管理zmq_socket的连接和通信,主要与engine进行交换
⑩、socket_base_t,zeromq的socket,在zmq中,被当成一种特殊的”线程“,具有收发命令的功能
//参考资料
https://blog.csdn.net/tbyzs/article/category/1710475 zeromq源码分析
https://www.cnblogs.com/zengzy/category/777608.html zeromq源码学习笔记
最后
以上就是彪壮冥王星为你收集整理的服务端ZMQ(一)——源码解析服务端ZMQ(一)——源码解析的全部内容,希望文章能够帮你解决服务端ZMQ(一)——源码解析服务端ZMQ(一)——源码解析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复