我是靠谱客的博主 端庄彩虹,这篇文章主要介绍ZMQ之异步管家模式,现在分享给大家,希望可以做个参考。

        上文那种实现管家模式的方法比较简单,client还是简单海盗模式中的,仅仅是用API重写了一下。我在测试机上运行了程序,处理10万条请求大约需要14秒的时间,这和代码也有一些关系,因为复制消息帧的时间浪费了CPU处理时间。但真正的问题在于,我们总是逐个循环进行处理(round-trip),即发送-接收-发送-接收……ZMQ内部禁用了TCP发包优化算法(Nagle's algorithm),但逐个处理循环还是比较浪费。

        理论归理论,还是需要由实践来检验。我们用一个简单的测试程序来看看逐个处理循环是否真的耗时。这个测试程序会发送一组消息,第一次它发一条收一条,第二次则一起发送再一起接收。两次结果应该是一样的,但速度截然不同。

        tripping: Round-trip demonstrator in C

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// // Round-trip 模拟 // // 本示例程序使用多线程的方式启动client、worker、以及代理, // 当client处理完毕时会发送信号给主程序。 // #include "czmq.h" static void client_task (void *args, zctx_t *ctx, void *pipe) { void *client = zsocket_new (ctx, ZMQ_DEALER); zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1); zsocket_connect (client, "tcp://localhost:5555"); printf ("开始测试...n"); zclock_sleep (100); int requests; int64_t start; printf ("同步 round-trip 测试...n"); start = zclock_time (); for (requests = 0; requests < 10000; requests++) { zstr_send (client, "hello"); char *reply = zstr_recv (client); free (reply); } printf (" %d 次/秒n", (1000 * 10000) / (int) (zclock_time () - start)); printf ("异步 round-trip 测试...n"); start = zclock_time (); for (requests = 0; requests < 100000; requests++) zstr_send (client, "hello"); for (requests = 0; requests < 100000; requests++) { char *reply = zstr_recv (client); free (reply); } printf (" %d 次/秒n", (1000 * 100000) / (int) (zclock_time () - start)); zstr_send (pipe, "完成"); } static void * worker_task (void *args) { zctx_t *ctx = zctx_new (); void *worker = zsocket_new (ctx, ZMQ_DEALER); zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1); zsocket_connect (worker, "tcp://localhost:5556"); while (1) { zmsg_t *msg = zmsg_recv (worker); zmsg_send (&msg, worker); } zctx_destroy (&ctx); return NULL; } static void * broker_task (void *args) { // 准备上下文和套接字 zctx_t *ctx = zctx_new (); void *frontend = zsocket_new (ctx, ZMQ_ROUTER); void *backend = zsocket_new (ctx, ZMQ_ROUTER); zsocket_bind (frontend, "tcp://*:5555"); zsocket_bind (backend, "tcp://*:5556"); // 初始化轮询对象 zmq_pollitem_t items [] = { { frontend, 0, ZMQ_POLLIN, 0 }, { backend, 0, ZMQ_POLLIN, 0 } }; while (1) { int rc = zmq_poll (items, 2, -1); if (rc == -1) break; // 中断 if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (frontend); zframe_t *address = zmsg_pop (msg); zframe_destroy (&address); zmsg_pushstr (msg, "W"); zmsg_send (&msg, backend); } if (items [1].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (backend); zframe_t *address = zmsg_pop (msg); zframe_destroy (&address); zmsg_pushstr (msg, "C"); zmsg_send (&msg, frontend); } } zctx_destroy (&ctx); return NULL; } int main (void) { // 创建线程 zctx_t *ctx = zctx_new (); void *client = zthread_fork (ctx, client_task, NULL); zthread_new (ctx, worker_task, NULL); zthread_new (ctx, broker_task, NULL); // 等待client端管道的信号 char *signal = zstr_recv (client); free (signal); zctx_destroy (&ctx); return 0; }

        在我的开发环境中运行结果如下:

复制代码
1
2
3
4
5
Setting up test... Synchronous round-trip test... 9057 calls/second Asynchronous round-trip test... 173010 calls/second

        需要注意的是client在运行开始会暂停一段时间,这是因为在向ROUTER套接字发送消息时,若指定标识的套接字没有连接,那么ROUTER会直接丢弃该消息。这个示例中我们没有使用LRU算法,所以当worker连接速度稍慢时就有可能丢失数据,影响测试结果。

        我们可以看到,逐个处理循环比异步处理要慢将近20倍,让我们把它应用到管家模式中去。

        首先,让我们修改client的API,添加独立的发送和接收方法:

复制代码
1
2
3
4
mdcli_t *mdcli_new (char *broker); void mdcli_destroy (mdcli_t **self_p); int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p); zmsg_t *mdcli_recv (mdcli_t *self);

        然后花很短的时间就能将同步的client API改造成异步的API:

        mdcliapi2: Majordomo asynchronous client API in C

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/* ===================================================================== mdcliapi2.c Majordomo Protocol Client API (async version) Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7. --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "mdcliapi2.h" // 类结构 // 使用成员函数访问属性 struct _mdcli_t { zctx_t *ctx; // 上下文 char *broker; void *client; // 连接至代理的套接字 int verbose; // 在标准输出打印运行状态 int timeout; // 请求超时时间 }; // --------------------------------------------------------------------- // 连接或重连代理 void s_mdcli_connect_to_broker (mdcli_t *self) { if (self->client) zsocket_destroy (self->ctx, self->client); self->client = zsocket_new (self->ctx, ZMQ_DEALER); zmq_connect (self->client, self->broker); if (self->verbose) zclock_log ("I: 正在连接代理 %s...", self->broker); } // --------------------------------------------------------------------- // 构造函数 mdcli_t * mdcli_new (char *broker, int verbose) { assert (broker); mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t)); self->ctx = zctx_new (); self->broker = strdup (broker); self->verbose = verbose; self->timeout = 2500; // 毫秒 s_mdcli_connect_to_broker (self); return self; } // --------------------------------------------------------------------- // 析构函数 void mdcli_destroy (mdcli_t **self_p) { assert (self_p); if (*self_p) { mdcli_t *self = *self_p; zctx_destroy (&self->ctx); free (self->broker); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 设置请求超时时间 void mdcli_set_timeout (mdcli_t *self, int timeout) { assert (self); self->timeout = timeout; } // --------------------------------------------------------------------- // 发送请求给代理 // 取得请求消息的所有权,发送后销毁 int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p) { assert (self); assert (request_p); zmsg_t *request = *request_p; // 在消息顶部加入协议规定的帧 // Frame 0: empty (模拟REQ套接字的行为) // Frame 1: "MDPCxy" (6个字节, MDP/Client x.y) // Frame 2: Service name (看打印字符串) zmsg_pushstr (request, service); zmsg_pushstr (request, MDPC_CLIENT); zmsg_pushstr (request, ""); if (self->verbose) { zclock_log ("I: 发送请求给 '%s' 服务:", service); zmsg_dump (request); } zmsg_send (&request, self->client); return 0; } // --------------------------------------------------------------------- // 获取应答消息,若无则返回NULL; // 该函数不会尝试从代理的崩溃中恢复, // 因为我们没有记录那些未收到应答的请求,所以也无法重发。 zmsg_t * mdcli_recv (mdcli_t *self) { assert (self); // 轮询套接字以获取应答 zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC); if (rc == -1) return NULL; // 中断 // 收到应答后进行处理 if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (self->client); if (self->verbose) { zclock_log ("I: received reply:"); zmsg_dump (msg); } // 不要处理错误,直接报出 assert (zmsg_size (msg) >= 4); zframe_t *empty = zmsg_pop (msg); assert (zframe_streq (empty, "")); zframe_destroy (&empty); zframe_t *header = zmsg_pop (msg); assert (zframe_streq (header, MDPC_CLIENT)); zframe_destroy (&header); zframe_t *service = zmsg_pop (msg); zframe_destroy (&service); return msg; // Success } if (zctx_interrupted) printf ("W: 收到中断消息,正在中止client...n"); else if (self->verbose) zclock_log ("W: 严重错误,放弃请求"); return NULL; }

        下面是对应的测试代码:

        mdclient2: Majordomo client application in C

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// // 异步管家模式 - client示例程序 // 使用mdcli API隐藏MDP协议的具体实现 // // 直接编译源码,而不创建类库 #include "mdcliapi2.c" int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose); int count; for (count = 0; count < 100000; count++) { zmsg_t *request = zmsg_new (); zmsg_pushstr (request, "Hello world"); mdcli_send (session, "echo", &request); } for (count = 0; count < 100000; count++) { zmsg_t *reply = mdcli_recv (session); if (reply) zmsg_destroy (&reply); else break; // 使用Ctrl-C中断 } printf ("收到 %d 个应答n", count); mdcli_destroy (&session); return 0; }

        代理和worker的代码没有变,因为我们并没有改变MDP协议。经过对client的改造,我们可以明显看到速度的提升。如以下是同步状况下处理10万条请求的时间:

复制代码
1
2
3
4
5
6
$ time mdclient 100000 requests/replies processed real 0m14.088s user 0m1.310s sys 0m2.670s

        以下是异步请求的情况:

复制代码
1
2
3
4
5
6
$ time mdclient2 100000 replies received real 0m8.730s user 0m0.920s sys 0m1.550s

        让我们建立10个worker,看看效果如何:

复制代码
1
2
3
4
5
6
$ time mdclient2 100000 replies received real 0m3.863s user 0m0.730s sys 0m0.470s

        由于worker获得消息需要通过LRU队列机制,所以并不能做到完全的异步。但是,worker越多其效果也会越好。在我的测试机上,当worker的数量达到8个时,速度就不再提升了——四核处理器只能做这么多。但是,我们仍然获得了近四倍的速度提升,而改造过程只有几分钟而已。此外,代理其实还没有进行优化,它仍会复制消息,而没有实现零拷贝。不过,我们已经做到每秒处理2.5万次请求-应答,已经很不错了。

        当然,异步的管家模式也并不完美,有一个显著的缺点:它无法从代理的崩溃中恢复。可以看到mdcliapi2的代码中并没有恢复连接的代码,重新连接需要有以下几点作为前提:

                1、每个请求都做了编号,每次应答也含有相应的编号,这就需要修改协议,明确定义。

                2、client的API需要保留并跟踪所有已发送、但仍未收到应答的请求。

                3、如果代理发生崩溃,client会重发所有消息。

        可以看到,高可靠性往往和复杂度成正比,值得在管家模式中应用这一机制吗?这就要看应用场景了。如果是一个名称查询服务,每次会话会调用一次,那不需要应用这一机制;如果是一个位于前端的网页服务,有数千个客户端相连,那可能就需要了。

最后

以上就是端庄彩虹最近收集整理的关于ZMQ之异步管家模式的全部内容,更多相关ZMQ之异步管家模式内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(68)

评论列表共有 0 条评论

立即
投稿
返回
顶部