正如文档所说,zmq套接字不是线程安全的。应用程序端上的锁是不够的。每个套接字必须从一个线程创建、使用和关闭。那么如果实现每个套接字必须从一个线程创建、使用和关闭呢?
答案是使用inproc传输在线程之间通信。例如,请参见actor模式:https://github.com/zeromq/czmq#czmq-actors 和guide: http://zguide.zeromq.org/
下面列出一个代码示例:
//接收发送线程:只负责接收和发送消息
void run()
{
//从客户端接收消息和发送消息到客户端的套接字
void* rspSock = zmq_socket(context, ZMQ_DEALER);
int r = zmq_connect(rspSock, cp->zmqAddr.c_str());
if (r == -1) {
LOG(ERROR) << "zmq_connect failure ";
zmq_close(rspSock);
return;
}
//ipc接收套接字,从其他线程接收消息,然后在本线程通过rspSock发送给客户端,保证同一个套接字接收和发送
void* rspSockRecv = zmq_socket(context, ZMQ_DEALER);
int rt = zmq_bind (rspSockRecv, "inproc://#1");
if (rt == -1) {
zmq_close(rspSockRecv);
return;
}
//把context传入给ipc发送线程,确保多线程的ipc使用同一个context
zmqSender = std::make_shared<ZmqSender>(context, "inproc://#1");
//同时轮训两个套接字,rspSock接收客户端的请求,rspSockRecv接收ipc的请求,然后通过rspSock转发ipc的请求给客户端
zmq_pollitem_t items[] = { {rspSock, 0, ZMQ_POLLIN, 0} ,
{rspSockRecv, 0, ZMQ_POLLIN, 0}};
while (true) {
int rc = zmq_poll(items, 2, 10 * ZMQ_POLL_MSEC );
if (rc == -1) {
LOG(ERROR) << "Error zmq_poll return -1: " << zmq_strerror(errno);
break;
}
//接收ipc的请求,然后通过rspSock转发ipc的请求给客户端
if (items [1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(rspSockRecv);
zframe_t *addr = zmsg_pop(msg);
zframe_t *cont = zmsg_pop(msg);
zframe_send(&addr, rspSock, ZFRAME_REUSE + ZFRAME_MORE);
zframe_send(&cont, rspSock, ZFRAME_REUSE);
if (addr)
zframe_destroy(&addr);
if (cont)
zframe_destroy(&cont);
zmsg_destroy(&msg);
}
//接收客户端的请求,放到消息队列,其他线程处理消息
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(rspSock);
zframe_t *addr = zmsg_pop(msg);
zframe_t *cont = zmsg_pop(msg);
assert (cont);
assert (zframe_is (cont));
char* contstr = zframe_strdup(cont);
if (strlen(contstr) > 0)
{
//处理逻辑,放到消息队列
if (cont)
zframe_destroy(&cont);
zmsg_destroy(&msg);
if (contstr) {
free(contstr);
contstr = nullptr;
}
}
}
zmq_close(rspSock);
zmq_close(rspSockRecv);
}
///ipc发送消息线程
void ZmqSender::run()
{
//ipc发送消息套接字,与ipc接收套接字使用同一个context
void* rspSock = zmq_socket(context, ZMQ_DEALER);
//地址是
int ret = zmq_connect (rspSock, "inproc://#1");
if (ret == -1) {
zmq_close(rspSock);
return;
}
MQ::ZmqMessage msg;
while(status){
if(get(msg))
{
//发送消息给ipc接收套接字
zframe_send(&msg.addr, rspSock, ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new(msg.response.c_str(), msg.response.size()+1);
zframe_send(&frame, rspSock, ZFRAME_REUSE);
if (msg.addr)
zframe_destroy(&msg.addr);
if (frame)
zframe_destroy(&frame);
}
}
zmq_close(rspSock);
}
主要的思路就是多线程利用ipc通讯。
最后
以上就是自由乌龟最近收集整理的关于关于多线程使用zeromq出现各种断言或段错误的问题的全部内容,更多相关关于多线程使用zeromq出现各种断言或段错误内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复