复制代码
1
2
3
4
5正如文档所说,zmq套接字不是线程安全的。应用程序端上的锁是不够的。每个套接字必须从一个线程创建、使用和关闭。那么如果实现每个套接字必须从一个线程创建、使用和关闭呢? 答案是使用inproc传输在线程之间通信。例如,请参见actor模式:https://github.com/zeromq/czmq#czmq-actors 和guide: http://zguide.zeromq.org/ 下面列出一个代码示例:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73//接收发送线程:只负责接收和发送消息 void run() { //从客户端接收消息和发送消息到客户端的套接字 void* rspSock = zmq_socket(context, ZMQ_DEALER); int r = zmq_connect(rspSock, cp->zmqAddr.c_str()); if (r == -1) { LOG(ERROR) << "zmq_connect failure "; zmq_close(rspSock); return; } //ipc接收套接字,从其他线程接收消息,然后在本线程通过rspSock发送给客户端,保证同一个套接字接收和发送 void* rspSockRecv = zmq_socket(context, ZMQ_DEALER); int rt = zmq_bind (rspSockRecv, "inproc://#1"); if (rt == -1) { zmq_close(rspSockRecv); return; } //把context传入给ipc发送线程,确保多线程的ipc使用同一个context zmqSender = std::make_shared<ZmqSender>(context, "inproc://#1"); //同时轮训两个套接字,rspSock接收客户端的请求,rspSockRecv接收ipc的请求,然后通过rspSock转发ipc的请求给客户端 zmq_pollitem_t items[] = { {rspSock, 0, ZMQ_POLLIN, 0} , {rspSockRecv, 0, ZMQ_POLLIN, 0}}; while (true) { int rc = zmq_poll(items, 2, 10 * ZMQ_POLL_MSEC ); if (rc == -1) { LOG(ERROR) << "Error zmq_poll return -1: " << zmq_strerror(errno); break; } //接收ipc的请求,然后通过rspSock转发ipc的请求给客户端 if (items [1].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv(rspSockRecv); zframe_t *addr = zmsg_pop(msg); zframe_t *cont = zmsg_pop(msg); zframe_send(&addr, rspSock, ZFRAME_REUSE + ZFRAME_MORE); zframe_send(&cont, rspSock, ZFRAME_REUSE); if (addr) zframe_destroy(&addr); if (cont) zframe_destroy(&cont); zmsg_destroy(&msg); } //接收客户端的请求,放到消息队列,其他线程处理消息 if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv(rspSock); zframe_t *addr = zmsg_pop(msg); zframe_t *cont = zmsg_pop(msg); assert (cont); assert (zframe_is (cont)); char* contstr = zframe_strdup(cont); if (strlen(contstr) > 0) { //处理逻辑,放到消息队列 if (cont) zframe_destroy(&cont); zmsg_destroy(&msg); if (contstr) { free(contstr); contstr = nullptr; } } } zmq_close(rspSock); zmq_close(rspSockRecv); }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29///ipc发送消息线程 void ZmqSender::run() { //ipc发送消息套接字,与ipc接收套接字使用同一个context void* rspSock = zmq_socket(context, ZMQ_DEALER); //地址是 int ret = zmq_connect (rspSock, "inproc://#1"); if (ret == -1) { zmq_close(rspSock); return; } MQ::ZmqMessage msg; while(status){ if(get(msg)) { //发送消息给ipc接收套接字 zframe_send(&msg.addr, rspSock, ZFRAME_REUSE + ZFRAME_MORE); zframe_t *frame = zframe_new(msg.response.c_str(), msg.response.size()+1); zframe_send(&frame, rspSock, ZFRAME_REUSE); if (msg.addr) zframe_destroy(&msg.addr); if (frame) zframe_destroy(&frame); } } zmq_close(rspSock); }
主要的思路就是多线程利用ipc通讯。
最后
以上就是自由乌龟最近收集整理的关于关于多线程使用zeromq出现各种断言或段错误的问题的全部内容,更多相关关于多线程使用zeromq出现各种断言或段错误内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复