概述
简介
给出一个不安全的情况,假设我们有一个socket用于和用户通信,用户会发送多个请求,我们处理请求需要一定的时间,同时使用异步并发的模型来处理对应的请求,即一个io_context::run
会有多个线程执行。那么boost::asio的基本代码框架如下,给出伪代码示例:
var scoket; // 一个socket
async_read(socket, buf, [](){
// 处理一些耗时任务
write(socket, data); // 把data写入socket发给客户端
});
假设客户第一次发送请求到async_read
函数后,t1
线程执行的lambda回调;在t1执行耗时任务的时候,客户端又发来第二个请求,t2
线程也执行一个耗时任务。巧合的是,t1
和t2
在同一时间段完成了任务处理,并把数据通过socket
发给客户端,那么此时并发写出现了竞争,因为scoket
并发写是不安全的,如果写的数据再多一点,那么一定会竞争了。
那么问题的本质就是,socket
不是线程安全的,一个时间段内,只能有一个线程对socket
执行写操作。
针对这个情况,我们有两种解决方案:
- 在
write
写scoket
的时候加mutex
互斥量,同时只能有一个线程写。 - 让回调函数串行化执行,即一个socket的回调函数不能同时在多个线程中执行。
两种方式各有利弊:第一个方式我们需要在异步的调用中使用互斥量,代码复杂度非常高,而且生命周期管理复杂,可能要借助全局变量或者类的互斥量来处理,但是能并发处理耗时任务,看起来效率高。
第二种方式,使用strand
机制串行化,即让所有的关于socket
的操作都在一个线程执行,代码复杂度低,让操作系统和库来管理调度,但是缺陷是耗时的工作也只能在一个线程中处理,可能无法充分利用并行化操作。不过,如果请求量大,而且耗时请求分布均匀的话,两者的差距会减小,而且strand
机制的优势也逐渐变大。
当然,上面两种方式只是推测的,实际情况需要多次测试才能看出效果。
在工程中,基本都是采用strand
的方式处理。
strand
控制并发
资料来自于:http://www.crazygaze.com/blog/2016/03/17/how-strands-work-and-why-you-should-use-them/
下图给出strand的核心思想:
蓝色的WT是工作线程,工作线程从io_context
的事件循环中获取Executor
,并执行。如果工作队列中的Executor
是在Strand
中,那么strand
可以保证在strand
中的任务,不会被多个线程并发的执行,即这些任务在时间上是串行化的。
代码示例
CMakeLists.txt
# cmake_minimum_required(VERSION <specify CMake version here>)
project(boost_asio)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_FLAGS -pthread)
add_executable(boost_asio main.cpp TcpServer.hpp Session.hpp)
Session.hpp
//
// Created by Erick on 2020/2/19.
//
#ifndef BOOST_ASIO_SESSION_HPP
#define BOOST_ASIO_SESSION_HPP
#include <boost/asio/ip/tcp.hpp>
#include <iostream>
class Session : public std::enable_shared_from_this<Session> {
using SessionSocketPtr = std::shared_ptr<boost::asio::ip::tcp::socket>;
public:
explicit Session(SessionSocketPtr sk, boost::asio::io_context &ioc) : m_socket(std::move(sk)), m_strand(ioc) {}
void start() {
boost::asio::async_read_until(*m_socket, m_streamBuf, "rn",
// Executor绑定到strand上,这样对应的回调函数就不会出现上面说的并发情况了
boost::asio::bind_executor(this->m_strand, [self = shared_from_this()](
const boost::system::error_code &ec,
std::size_t bytes_transferred) {
if (ec.failed()) {
std::cout << "session error: " << ec.message()
<< ", thread_id: "
<< std::this_thread::get_id()
<< std::endl;
return;
}
std::cout << "Thread: "
<< std::this_thread::get_id()
<< ", Get User data: "
<< std::istream(
&self->m_streamBuf).rdbuf()
<< std::endl;
self->start(); // 异步继续读
}));
}
private:
SessionSocketPtr m_socket;
boost::asio::streambuf m_streamBuf;
boost::asio::io_context::strand m_strand; // 每个session会有一个strand,需要串行化的都在内部处理
};
#endif //BOOST_ASIO_SESSION_HPP
TcpServer.hpp
//
// Created by Erick on 2020/2/19.
//
#ifndef BOOST_ASIO_TCPSERVER_HPP
#define BOOST_ASIO_TCPSERVER_HPP
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include "Session.hpp"
// 异步的TCP服务器
class TcpServer : public std::enable_shared_from_this<TcpServer> {
public:
explicit TcpServer(boost::asio::io_context &ioc, int port) :
io_context(ioc),
m_acceptor(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {
std::cout << "start TcpServer...n";
}
TcpServer(TcpServer &) = delete;
TcpServer(TcpServer &&) = delete;
void start() {
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(io_context);
m_acceptor.async_accept(*socket,
[socket, self = shared_from_this()](boost::system::error_code ec) {
if (ec.failed()) {
std::cout << "async_accept error: " << ec.message() << std::endl;
}
auto session = std::make_shared<Session>(socket, self->io_context);
session->start(); // 这里启动一个会话
std::cout << "thread_id: " << std::this_thread::get_id() << ", create a sessionn";
self->start(); // 继续重新启动
});
}
private:
boost::asio::io_context &io_context;
boost::asio::ip::tcp::acceptor m_acceptor;
};
#endif //BOOST_ASIO_TCPSERVER_HPP
main.cpp
#include <iostream>
#include <thread>
#include "TcpServer.hpp"
const auto N = std::thread::hardware_concurrency();
using work_guard_type = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
int main() {
std::cout << "begin asio model" << std::endl;
boost::asio::io_context io_context;
work_guard_type work_guard(io_context.get_executor());
std::vector<std::thread> threads;
for (auto i = 0; i < N; ++i) {
threads.emplace_back(std::thread([&]() {
io_context.run();
}));
}
int port;
std::cout << "input port: ";
std::cin >> port;
auto tcpServer = std::make_shared<TcpServer>(io_context, port);
tcpServer->start(); // 异步启动服务器,不阻塞
for (auto &t: threads) {
if (t.joinable()) {
t.join();
}
}
std::cout << "end asio model" << std::endl;
return 0;
}
参考
- http://www.cppblog.com/Khan/archive/2017/10/14/215295.html
- http://www.crazygaze.com/blog/2016/03/17/how-strands-work-and-why-you-should-use-them/
- https://zhuanlan.zhihu.com/p/87388918
最后
以上就是腼腆毛衣为你收集整理的boost.asio无锁异步并发简介strand控制并发代码示例参考的全部内容,希望文章能够帮你解决boost.asio无锁异步并发简介strand控制并发代码示例参考所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复