服务端ZMQ(一)——源码解析
文章目录
- 服务端ZMQ(一)——源码解析
- 1、什么是ZeroMQ?
- 1.1、为什么用消息队列和多线程?
- 1.2、ZeroMQ长是什么样子 ?
- 2、简单示例
- 2.1、主要的四个操作
- 2.2、HelloWorld
- 2.3、与传统TCP的区别
- 3、工作流程
- **3.1、zmq_ctx_new**
- 3.2、zmq_socket
- 3.3、zmq_bind
- 4、zmq_connect
- 5、zmq_recv
- 4、工作流程时序图
- 4.1、接收端(server)
- 4.2、发送端(client)
- 4.3、zmq类层次
1、什么是ZeroMQ?
- 基于消息队列的多线程网络库:对套接字类型、连接处理、帧、路由底层细节进行抽象、提供跨越多种传输协议的套接字(socket library)
- 一个嵌入式库:封装了网络通信、消息队列、线程调度等功能,向上层应用提供API,应用程序通过加载库文件,调用API函数来实现高性能网络通信(简单好用的传输层)
总结:ZeroMQ在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口,提供一个底层的网络通讯库
注意:普通的 socket 是端到端的(1:1的关系)ZeroMQ 却是可以N:M 的关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/** socket 1.建立连接 销毁连接 2.制定协议 选择协议 protobuffer messagepack shrift 3.处理错误 error close 4.断线重连的问题心跳 检测 5. IO epoll 有哪些模型? 1.请求回应模型, req/rep redis 协议请求的次序跟回应的次序 2、管道模型 push/ull模型 ginx 3、监听发布模型 sub/pub 观察者模式 **/
1.1、为什么用消息队列和多线程?
看到消息队列,首先想到的是异步、消峰、解耦。例子:某个功能中很多步骤需要在一个流程里面完成,例如下图支付流程被拉长了,更加耗时。
引入消息队列,就变成了下图。对于一些可以同时处理的流程用线程,线程池去处理,减少响应时间。
1.2、ZeroMQ长是什么样子 ?
ZeroMQ几乎所有IO操作都是异步的,每个ZeroMQ IO线程
都有与之绑定的Poller,Poller采用经典的Reactor模式实现,Poller根据不同的操作系统平台使用不同的网络IO模型(select、poll、epoll、devpoll、kequeue等)。主线程与I/O线程通过Mail Box传递消息来进行通信
Server,在主线程创建zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_listener添加到Poller中用以侦听读事件。
Client,在主线程中创建zmq_connecter,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_connecter添加到Poller中用以侦听写事件。
Client与Server第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。
每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。
ZMQ性能优化的过程中发现有3个因素会对性能产生严重的影响:
- 内存分配的次数
- 系统调用的次数
- 并发模型
**内存分配的次数:**一条ØMQ消息由一个不透明的句柄来表示。对于非常短小的消息,其内容被直接编码到句柄中。因此,对句柄的拷贝实际上就是对消息数据的拷贝。当遇到较大的消息时,它被分配到一个单独的缓冲区内,而句柄只包含一个指向缓冲区的指针。对句柄的拷贝并不会造成对消息数据的拷贝,当消息有数兆字节长时,这么处理是很有道理的如下图。需要提醒的是,后一种情况里缓冲区是按引用计数的,因此可以做到被多个句柄引用而不必拷贝数据。
**批量处理:**在消息通信系统中,系统调用的数量太多的话会导致出现性能瓶颈。当创建高性能的应用时应该尽可能多的去避免遍历调用栈。
**并发模型:**采用一种不同的模型,目标是完全避免锁机制,并让每个线程能够全速运行。线程间的通信是通过在线程间传递异步消息(事件)来实现的。(actor模式)
无锁算法:ØMQ在pipe对象中采用无锁队列来在用户线程和ØMQ的工作者线程之间传递消息。
2、简单示例
2.1、主要的四个操作
- 创建和销毁套接字:zmq_socket(), zmq_close()
- 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
- 为套接字建立连接:zmq_bind(), zmq_connect()
- 发送和接收消息:zmq_send(), zmq_recv()
2.2、HelloWorld
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28//client #include <zmq.h> #include <string.h> #include <stdio.h> #include <unistd.h> int main (void) { printf ("Connecting to hello world server…n"); void *context = zmq_ctx_new (); //创建套接字 上下文环境 void *requester = zmq_socket (context, ZMQ_REQ); //设置套接字 ZMQ_REQ 请求回应模型 zmq_connect (requester, "tcp://localhost:5555"); //建立连接 int request_nbr; for (request_nbr = 0; request_nbr != 5; request_nbr++) { //发送消息 char buffer [5]; printf ("Sending Hello %d…n", request_nbr); zmq_send (requester, "Hello", 5, 0); zmq_recv (requester, buffer, 5, 0); printf ("Received World %dn", request_nbr); } zmq_close (requester); zmq_ctx_destroy (context); return 0; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23//server #include <stdio.h> #include <unistd.h> #include <string.h> #include <assert.h> #include <zmq.h> int main (void) { void *context = zmq_ctx_new (); //创建套接字 上下文环境 void *responder = zmq_socket (context, ZMQ_REP); //设置套接字 ZMQ_REP 请求回应模型 int rc = zmq_bind (responder, "tcp://*:5555"); //建立连接 assert (rc == 0); while (1) { //接收消息 char buffer [5]; zmq_recv (responder, buffer, 5, 0); printf ("Received Hellon"); sleep (1); // Do some 'work' zmq_send (responder, "World", 5, 0); } return 0; }
实现效果:
2.3、与传统TCP的区别
- 使用多种协议:inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;
- 连接是异步的,并由一组消息队列做缓冲;
- 连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;
- 一个套接字可以有多个输入和输出连接;N:M;
- ZMQ没有提供类似accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;
- 应用程序无法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。、
- 当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定;
3、工作流程
3.1、zmq_ctx_new
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78/* zmq.cpp 原文件 */ void *zmq_ctx_new (void) { //首先是初始化网络环境 ZMQ_HAVE_OPENPGM 或者 ZMQ_HAVE_WINDOWS if (!zmq::initialize_network ()) { return NULL; } // Create 0MQ context. /* ctx_t ctx.hpp源文件 zmq::ctx_t::ctx_t () : //套接字是否使用的标志 后续close 最后一个释放 _tag (ZMQ_CTX_TAG_VALUE_GOOD), //初始化信箱、zmq_ctx_term thread、reaper thread、I/O thread... _starting (true), //zmq_ctx_term 是否启用 _terminating (false), //回收线程 _reaper (NULL), //同时socket最大打开数 _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), //同时消息的最大数 _max_msgsz (INT_MAX), //IO 线程数量 _io_thread_count (ZMQ_IO_THREADS_DFLT), // 该上下文是否永远不会终止 _blocky (true), //是否支持ipv6 _ipv6 (false), //是否使用零拷贝消息解析功能 _zero_copy (true) { #ifdef HAVE_FORK _pid = getpid (); #endif #ifdef ZMQ_HAVE_VMCI _vmci_fd = -1; _vmci_family = -1; #endif // Initialise crypto library, if needed. zmq::random_open (); #ifdef ZMQ_USE_NSS NSS_NoDB_Init (NULL); #endif #ifdef ZMQ_USE_GNUTLS gnutls_global_init (); #endif } 1、创建ctx指针 2、check_tag() --> 当前 ctx_t 的状态 _tag == ZMQ_CTX_TAG_VALUE_GOOD 3、terminate() --> _terminatin --> start () --> create socket(并注册信箱) 4、shutdown () --> _reaper(NULL) 回收线程 5、set() --> thread_ctx_t () --> 启用新线程(参数) ... */ zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; if (ctx) { if (!ctx->valid ()) { delete ctx; return NULL; } } return ctx; } /* 所以zmq_ctx_new 只是做了初始化工作 ctx_t提供一个start_thread后续的函数调用中进行的启动(poller_base -> start -> _ctx.start_thread | poller_set -> start -> _ctx.start_thread) */
3.2、zmq_socket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179//1、zmq_socket void *zmq_socket (void *ctx_, int type_) { //ctx_t检查 if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) { errno = EFAULT; return NULL; } zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_); // 返回对象指针 是基类socket_base_t(看一下基类构造形式) zmq::socket_base_t *s = ctx->create_socket (type_); return static_cast<void *> (s); } //2、ctx_t::create_socket zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) { scoped_lock_t locker (_slot_sync); // 一旦调用了zmq_ctx_term,将不能创建新套接字 if (_terminating) { errno = ETERM; return NULL; } if (unlikely (_starting)) { // start_thread 启动 if (!start ()) return NULL; } // 如果当前已达到套接字上限,返回错误 if (_empty_slots.empty ()) { errno = EMFILE; return NULL; } // 选择索引 const uint32_t slot = _empty_slots.back (); _empty_slots.pop_back (); // 生成唯一id const int sid = (static_cast<int> (max_socket_id.add (1))) + 1; // 创建套接字,并注册在其身上的mailbox socket_base_t *s = socket_base_t::create (type_, this, slot, sid); if (!s) { _empty_slots.push_back (slot); return NULL; } _sockets.push_back (s); //该 ctx_t 上的 i_mailbox 数组 _slots[slot] = s->get_mailbox (); return s; } //3、ctx_t::start bool zmq::ctx_t::start () { //数组中的 mailboxes 进行初始化,增加回收线程 _opt_sync.lock (); const int term_and_reaper_threads_count = 2; const int mazmq = _max_sockets; const int ios = _io_thread_count; _opt_sync.unlock (); const int slot_count = mazmq + ios + term_and_reaper_threads_count; try { //vector 重设 capacity 上限 _slots.reserve (slot_count); _empty_slots.reserve (slot_count - term_and_reaper_threads_count); } catch (const std::bad_alloc &) { errno = ENOMEM; return false; } _slots.resize (term_and_reaper_threads_count); // 将关闭线程的 mailbox 绑定到 ctx 上 _slots[term_tid] = &_term_mailbox; // 创建回收线程并启动 _reaper = new (std::nothrow) reaper_t (this, reaper_tid); if (!_reaper) { errno = ENOMEM; goto fail_cleanup_slots; } if (!_reaper->get_mailbox ()->valid ()) goto fail_cleanup_reaper; _slots[reaper_tid] = _reaper->get_mailbox (); _reaper->start (); //创建指定数量的io线程启动且注册,当然包括其mailbox _slots.resize (slot_count, NULL); for (int i = term_and_reaper_threads_count; i != ios + term_and_reaper_threads_count; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); if (!io_thread) { errno = ENOMEM; goto fail_cleanup_reaper; } if (!io_thread->get_mailbox ()->valid ()) { delete io_thread; goto fail_cleanup_reaper; } _io_threads.push_back (io_thread); _slots[i] = io_thread->get_mailbox (); //io_thread 会使用 ctx_t 上的start_thread来启动成员函数 worker_routine ,进而启动当前平台下的io接口的 //loop(), 再接下来就是经典的 reactor 模式, 从响应的fd中,找到对应的 poll_entry_t //通过判断响应的事件来调用挂接在io_thread上的对象的 in_event 或者 out_event 函数 io_thread->start (); } // In the unused part of the slot array, create a list of empty slots. for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1; i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) { _empty_slots.push_back (i); } //启动完成 _starting = false; return true; fail_cleanup_reaper: _reaper->stop (); delete _reaper; _reaper = NULL; fail_cleanup_slots: _slots.clear (); return false; } //4、zmq::socket_base_t zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) : //调用 own_t 的构造函数,用于维护对象的生命周期 own_t (parent_, tid_), _sync (), _tag (0xbaddecaf), _ctx_terminated (false), _destroyed (false), _poller (NULL), _handle (static_cast<poller_t::handle_t> (NULL)), _last_tsc (0), _ticks (0), _rcvmore (false), _monitor_socket (NULL), _monitor_events (0), _thread_safe (thread_safe_), _reaper_signaler (NULL), _monitor_sync () { options.socket_id = sid_; options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0); options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0; //根据线程安全选项来决定是否生成线程安全的 mailbox 对象 if (_thread_safe) { _mailbox = new (std::nothrow) mailbox_safe_t (&_sync); zmq_assert (_mailbox); } else { mailbox_t *m = new (std::nothrow) mailbox_t (); zmq_assert (m); if (m->get_fd () != retired_fd) _mailbox = m; else { LIBZMQ_DELETE (m); _mailbox = NULL; } } } /* zmq_socket 在ctx上插入一个socket_base_t对象并将其指针抛出 由own_t来维护生命周期 */
3.3、zmq_bind
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133int zmq_bind (void *s_, const char *addr_) { //转化成socket_base_t 指针 (socket_base_t定义了很多操作) zmq::socket_base_t *s = as_socket_base_t (s_); if (!s) return -1; //进行地址的解析和绑定地址 return s->bind (addr_); } //1、zmq::socket_base_t::bind int zmq::socket_base_t::bind (const char *endpoint_uri_) { //根据线程安全拍段进行上锁准备 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL); if (unlikely (_ctx_terminated)) { errno = ETERM; return -1; } // 执行可能存在的被挂起的命令 int rc = process_commands (0, false); if (unlikely (rc != 0)) { return -1; } //为分割对传入的协议和地址端口进行分片 //并对传入协议进行检查 std::string protocol; std::string address; if (parse_uri (endpoint_uri_, protocol, address) || check_protocol (protocol)) { return -1; } // ...... //以下传输方式需要在io线程中进行,所以我们选择一个io线程 io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; return -1; } if (protocol == protocol_name::tcp) { //创建tcp 监听对象 tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (io_thread, this, options); alloc_assert (listener); //设置地址 rc = listener->set_local_address (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE (listener); event_bind_failed (make_unconnected_bind_endpoint_pair (address), zmq_errno ()); return -1; } // Save last endpoint URI listener->get_local_address (_last_endpoint); //将节点插入子树中 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint), static_cast<own_t *> (listener), NULL); options.connected = true; return 0; } // ...... zmq_assert (false); return -1; } //2、zmq::ctx_t::choose_io_thread 传入绑定的CPU下标 zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) { if (_io_threads.empty ()) return NULL; //根据cpu偏好以及当前的io压力来选择压力最小的io线程并返回 int min_load = -1; io_thread_t *selected_io_thread = NULL; for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size; i++) { if (!affinity_ || (affinity_ & (uint64_t (1) << i))) { const int load = _io_threads[i]->get_load (); if (selected_io_thread == NULL || load < min_load) { min_load = load; selected_io_thread = _io_threads[i]; } } } return selected_io_thread; } //3、zmq::socket_base_t::add_endpoint void zmq::socket_base_t::add_endpoint ( const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_) { // 将新节点插入endpoint_ launch_child (endpoint_); //插入ctx _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (), endpoint_pipe_t (endpoint_, pipe_)); if (pipe_ != NULL) pipe_->set_endpoint_pair (endpoint_pair_); } //4、zmq::own_t::launch_child void zmq::own_t::launch_child (own_t *object_) { // 插入 object_->set_owner (this); // 向object_所属的io线程发送plug消息,在执行process_plug send_plug (object_); // 设置object_归属权 send_own (this, object_); } /* zmq 在进行bind操作后,并不是马上绑定上的,虽然时间很短但其实是一个异步的流程 tcp_listener_t 在plug初始化过程中,首先加入到当前的socket_base_t, 然后向tcp_listener_t发起一个plug消息,在io_thread的mailbox中进行缓存,并再下次loop循环中进行 tcp_listener_t process_plug 为什么process_plug用消息启动:zmq开启多线程模式时,直接执行 process_plug 操作可能会不在 tcp_listener_t 所属线程中执行,也就是破坏了 actor 设计初衷(猜测) */
4、zmq_connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266int zmq_connect (void *s_, const char *addr_) { zmq::socket_base_t *s = as_socket_base_t (s_); if (!s) return -1; return s->connect (addr_); } int zmq::socket_base_t::connect (const char *endpoint_uri_) { scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL); return connect_internal (endpoint_uri_); } int zmq::socket_base_t::connect_internal (const char *endpoint_uri_) { if (unlikely (_ctx_terminated)) { errno = ETERM; return -1; } // 执行任何可能被挂起的命令 int rc = process_commands (0, false); if (unlikely (rc != 0)) { return -1; } // 解析 endpoint_uri_字符串 以 : // 为边界进行分别 std::string protocol; std::string address; if (parse_uri (endpoint_uri_, protocol, address) || check_protocol (protocol)) { return -1; } //...... //DEALER SUB PUB REQ 不支持对一个端点同时开启多个会话,进行判断当前会话是否存在 const bool is_single_connect = (options.type == ZMQ_DEALER || options.type == ZMQ_SUB || options.type == ZMQ_PUB || options.type == ZMQ_REQ); if (unlikely (is_single_connect)) { if (0 != _endpoints.count (endpoint_uri_)) { // There is no valid use for multiple connects for SUB-PUB nor // DEALER-ROUTER nor REQ-REP. Multiple connects produces // nonsensical results. return 0; } } //选择io线程去运行我们的会话对象 io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; return -1; } address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ()); alloc_assert (paddr); // Resolve address (if needed by the protocol) if (protocol == protocol_name::tcp) { // Do some basic sanity checks on tcp:// address syntax // - hostname starts with digit or letter, with embedded '-' or '.' // - IPv6 address may contain hex chars and colons. // - IPv6 link local address may contain % followed by interface name / zone_id // (Reference: https://tools.ietf.org/html/rfc4007) // - IPv4 address may contain decimal digits and dots. // - Address must end in ":port" where port is *, or numeric // - Address may contain two parts separated by ':' // Following code is quick and dirty check to catch obvious errors, // without trying to be fully accurate. //对提供的字符串进行解析 const char *check = address.c_str (); if (isalnum (*check) || isxdigit (*check) || *check == '[' || *check == ':') { check++; while (isalnum (*check) || isxdigit (*check) || *check == '.' || *check == '-' || *check == ':' || *check == '%' || *check == ';' || *check == '[' || *check == ']' || *check == '_' || *check == '*') { check++; } } // Assume the worst, now look for success rc = -1; //检查地址是否是安全有效的 if (*check == 0) { // Do we have a valid port string? (cannot be '*' in connect check = strrchr (address.c_str (), ':'); if (check) { check++; if (*check && (isdigit (*check))) rc = 0; // Valid } } if (rc == -1) { errno = EINVAL; LIBZMQ_DELETE (paddr); return -1; } /推迟解决方案配置 paddr->resolved.tcp_addr = NULL; } #ifdef ZMQ_HAVE_WS #ifdef ZMQ_HAVE_WSS else if (protocol == protocol_name::ws || protocol == protocol_name::wss) { if (protocol == protocol_name::wss) { paddr->resolved.wss_addr = new (std::nothrow) wss_address_t (); alloc_assert (paddr->resolved.wss_addr); rc = paddr->resolved.wss_addr->resolve (address.c_str (), false, options.ipv6); } else #else else if (protocol == protocol_name::ws) { #endif { paddr->resolved.ws_addr = new (std::nothrow) ws_address_t (); alloc_assert (paddr->resolved.ws_addr); rc = paddr->resolved.ws_addr->resolve (address.c_str (), false, options.ipv6); } if (rc != 0) { LIBZMQ_DELETE (paddr); return -1; } } #endif #if defined ZMQ_HAVE_IPC else if (protocol == protocol_name::ipc) { paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t (); alloc_assert (paddr->resolved.ipc_addr); int rc = paddr->resolved.ipc_addr->resolve (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE (paddr); return -1; } } #endif if (protocol == protocol_name::udp) { if (options.type != ZMQ_RADIO) { errno = ENOCOMPATPROTO; LIBZMQ_DELETE (paddr); return -1; } paddr->resolved.udp_addr = new (std::nothrow) udp_address_t (); alloc_assert (paddr->resolved.udp_addr); rc = paddr->resolved.udp_addr->resolve (address.c_str (), false, options.ipv6); if (rc != 0) { LIBZMQ_DELETE (paddr); return -1; } } // TBD - Should we check address for ZMQ_HAVE_NORM??? #ifdef ZMQ_HAVE_OPENPGM if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) { struct pgm_addrinfo_t *res = NULL; uint16_t port_number = 0; int rc = pgm_socket_t::init_address (address.c_str (), &res, &port_number); if (res != NULL) pgm_freeaddrinfo (res); if (rc != 0 || port_number == 0) { return -1; } } #endif #if defined ZMQ_HAVE_TIPC else if (protocol == protocol_name::tipc) { paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t (); alloc_assert (paddr->resolved.tipc_addr); int rc = paddr->resolved.tipc_addr->resolve (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE (paddr); return -1; } const sockaddr_tipc *const saddr = reinterpret_cast<const sockaddr_tipc *> ( paddr->resolved.tipc_addr->addr ()); // Cannot connect to random Port Identity if (saddr->addrtype == TIPC_ADDR_ID && paddr->resolved.tipc_addr->is_random ()) { LIBZMQ_DELETE (paddr); errno = EINVAL; return -1; } } #endif #if defined ZMQ_HAVE_VMCI else if (protocol == protocol_name::vmci) { paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ()); alloc_assert (paddr->resolved.vmci_addr); int rc = paddr->resolved.vmci_addr->resolve (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE (paddr); return -1; } } #endif // Create session. session_base_t *session = session_base_t::create (io_thread, true, this, options, paddr); errno_assert (session); // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. (same for NORM, currently?) #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM const bool subscribe_to_all = protocol == protocol_name::pgm || protocol == protocol_name::epgm || protocol == protocol_name::norm || protocol == protocol_name::udp; #elif defined ZMQ_HAVE_OPENPGM const bool subscribe_to_all = protocol == protocol_name::pgm || protocol == protocol_name::epgm || protocol == protocol_name::udp; #elif defined ZMQ_HAVE_NORM const bool subscribe_to_all = protocol == protocol_name::norm || protocol == protocol_name::udp; #else const bool subscribe_to_all = protocol == protocol_name::udp; #endif pipe_t *newpipe = NULL; if (options.immediate != 1 || subscribe_to_all) { // Create a bi-directional pipe. object_t *parents[2] = {this, session}; pipe_t *new_pipes[2] = {NULL, NULL}; const bool conflate = get_effective_conflate_option (options); int hwms[2] = {conflate ? -1 : options.sndhwm, conflate ? -1 : options.rcvhwm}; bool conflates[2] = {conflate, conflate}; rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to the socket object. attach_pipe (new_pipes[0], subscribe_to_all, true); newpipe = new_pipes[0]; // Attach remote end of the pipe to the session object later on. session->attach_pipe (new_pipes[1]); } // ......... // Save last endpoint URI paddr->to_string (_last_endpoint); add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_), static_cast<own_t *> (session), newpipe); return 0; }
5、zmq_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) { zmq::socket_base_t *s = as_socket_base_t (s_); if (!s) return -1; //初始化 zmq_msg_t zmq_msg_t msg; int rc = zmq_msg_init (&msg); errno_assert (rc == 0); const int nbytes = s_recvmsg (s, &msg, flags_); if (unlikely (nbytes < 0)) { const int err = errno; rc = zmq_msg_close (&msg); errno_assert (rc == 0); errno = err; return -1; } // 判断是否超过给定的大小 const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_; // We exp如果比给定的大小 大,则进行拷贝 if (to_copy) { assert (buf_); memcpy (buf_, zmq_msg_data (&msg), to_copy); } //释放消息 (当一个消息不再被使用的时候应该立刻调用zmq_msg_close()进行资源释放,否则可能引起内存泄露) rc = zmq_msg_close (&msg); errno_assert (rc == 0); return nbytes; } static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) { //调用 socket_base_t 的recv函数 //以我们目前来说实际上就是调用的 rep_t 的 xrecv,在 socket_base_t 中,recv 调用的虚函数 xrecv 而 xrecv 将会被子类重写 const int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_); if (unlikely (rc < 0)) return -1; // Truncate returned size to INT_MAX to avoid overflow to negative values const size_t sz = zmq_msg_size (msg_); return static_cast<int> (sz < INT_MAX ? sz : INT_MAX); }
4、工作流程时序图
4.1、接收端(server)
4.2、发送端(client)
4.3、zmq类层次
①、object_t,主要用于发送命令和处理命令,所有继承object_t的子类都具备该类的功能
②、io_thread_t,内含一个poller,可监听句柄的读、写、异常状态,继承自object_t,具有接收命令、处理命令、发送命令的功能
③、io_object_t,可以获取一个io_thread_t的poller,从而具备poller功能,所有继承自该类的子类都具有pollere功能,可监听句柄的读、写、异常状态
④、reaper_t,zmq的回收线程
⑤、own_t,zmq的对象树结点,或者说多叉树的结点,其主要用于对象的销毁,可以想到,对象的销毁就是这棵树的销毁过程,必须要使用深度优先的算法来销毁。关于zmq对象树在Internal Architecture of libzmq有详细讲解
⑥、tcp_connector_t,zmq_socket的连接器,使用她来建立tcp连接
⑦、tcp_listener_t,zmq_socket的监听器
⑧、stream_engine,负责处理io事件中的一种----网络事件,把网络字节流转换成zeromq的msg_t消息传递给session_base_t。另外一些和版本兼容相关的杂务也stream_engine处理的。stream_engine_t处理完杂务,到session_base_t就只看见msg_t了。
⑨、session_base_t,管理zmq_socket的连接和通信,主要与engine进行交换
⑩、socket_base_t,zeromq的socket,在zmq中,被当成一种特殊的”线程“,具有收发命令的功能
1
2
3
4//参考资料 https://blog.csdn.net/tbyzs/article/category/1710475 zeromq源码分析 https://www.cnblogs.com/zengzy/category/777608.html zeromq源码学习笔记
最后
以上就是彪壮冥王星最近收集整理的关于服务端ZMQ(一)——源码解析服务端ZMQ(一)——源码解析的全部内容,更多相关服务端ZMQ(一)——源码解析服务端ZMQ(一)——源码解析内容请搜索靠谱客的其他文章。
发表评论 取消回复