我是靠谱客的博主 畅快服饰,这篇文章主要介绍C++ 消息循环,现在分享给大家,希望可以做个参考。

消息循环是非常有用的工具,各平台都能见到它的身影,比如 CFRunLoop、Android Looper、Windows 消息循环等。有时候出于一些原因,我们并不想使用平台相关 API 或者三方库,就想自制一个简易的消息循环将就用,该怎么实现呢?以下就演示一种简易方案,请看代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
using clock_type = std::chrono::system_clock; struct message { clock_type::time_point when; std::function<void()> callback; }; class message_loop { public: message_loop(): _stop(false) { // } message_loop(const message_loop&) = delete; message_loop& operator=(const message_loop&) = delete; void run() { while (!_stop) { auto msg = wait_one(); msg.callback(); } } void quit() { post({clock_type::now(), [this](){ _stop = true; } }); } void post(std::function<void()> callable) { post({clock_type::now(), std::move(callable)}); } void post(std::function<void()> callable, std::chrono::milliseconds delay) { post({clock_type::now() + delay, std::move(callable)}); } private: struct msg_prio_comp { inline bool operator() (const message& a, const message& b) { return a.when > b.when; } }; using queue_type = std::priority_queue<message, std::vector<message>, msg_prio_comp>; std::mutex _mtx; std::condition_variable _cv; queue_type _msgs; bool _stop; void post(message msg) { auto lck = acquire_lock(); _msgs.emplace(std::move(msg)); _cv.notify_one(); } std::unique_lock<std::mutex> acquire_lock() { return std::unique_lock<std::mutex>(_mtx); } bool idle() const { return _msgs.empty(); } const message& top() const { return _msgs.top(); } message pop() { auto msg = top(); _msgs.pop(); return msg; } message wait_one() { while (true) { auto lck = acquire_lock(); if (idle()) _cv.wait(lck); else if (top().when <= clock_type::now()) return pop(); else { _cv.wait_until(lck, top().when); // 可能是新消息到达,再循环一次看看 } } } };

接下来,演示一下使用方式:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// std::cout 的一层皮,为的是多线程使用 std::cout 时输出不错乱 class ConsoleLogger { public: ConsoleLogger() { using namespace std; using namespace std::chrono; auto now = system_clock::now(); time_t t = system_clock::to_time_t(now); // 大多数系统上 time_t 的单位是 秒 auto ms = (int)(duration_cast<milliseconds>(now - system_clock::from_time_t(t)).count()); char buf[32] = {0}; char *ptr = buf + strftime(buf, "[%T", localtime(&t)); sprintf(ptr, ".%03d] ", ms); _strm << buf; } ConsoleLogger(ConsoleLogger&& rv): _strm(std::move(rv._strm)) {} ConsoleLogger& operator=(ConsoleLogger&& rv) { _strm = std::move(rv._strm); return *this; } ~ConsoleLogger() { if (_strm.rdbuf()) { _strm << std::endl; std::cout << _strm.rdbuf(); } } template<class _Ty> ConsoleLogger& operator<<(_Ty&& val) { _strm << std::forward<_Ty>(val); return *this; } ConsoleLogger& operator<< (std::ostream& (*pf)(std::ostream&)) { _strm << pf; return *this; } ConsoleLogger& operator<< (std::ios& (*pf)(std::ios&)) { _strm << pf; return *this; } ConsoleLogger& operator<< (std::ios_base& (*pf)(std::ios_base&)) { _strm << pf; return *this; } private: std::stringstream _strm; }; ConsoleLogger log() { return ConsoleLogger(); } int main() { using namespace std; using namespace std::chrono; message_loop *pLoop = nullptr; thread th([&loop](){ message loop; pLoop = &loop; loop.run(); pLoop = nullptr; }); log() << "投递消息#1"; pLoop->post([](){ log() << "消息#1 处理了"; }); log() << "投递消息#2,延迟 500 毫秒"; pLoop->post([](){ log() << "消息#2 处理了"; }, milliseconds(500)); log() << "投递消息#3"; pLoop->post([](){ log() << "消息#3 处理了"; }); log() << "投递消息#4,延迟 1000 毫秒"; pLoop->post([](){ log() << "消息#4 处理了"; }, milliseconds(1000)); this_thread::sleep_for(milliseconds(1500)); pLoop->quit(); log() << "退出"; th.join(); return 0; }

运行上面的示例可能看到如下输出:

复制代码
1
2
3
4
5
6
7
8
9
10
[11:22:33.000] 投递消息#1 [11:22:33.000] 投递消息#2,延迟 500 毫秒 [11:22:33.000] 消息#1 处理了 [11:22:33.000] 投递消息#3 [11:22:33.000] 消息#3 处理了 [11:22:33.000] 投递消息#4,延迟 1000 毫秒 [11:22:33.501] 消息#2 处理了 [11:22:34.000] 消息#4 处理了 [11:22:34.502] 退出

可见,相比单纯的先进先出队列,这个消息循环支持延迟消息,可以用来做简单定时器,覆盖更多使用场景。

当然,这么简单的消息循环,效率上有不少提升空间。在我的 i5 10500 上,针对 1048576 个消息的压测结果为每毫秒能处理约 2400 个消息。效率瓶颈主要在以下几的地方:

  1. 锁粒度太高
  2. priority_queue 在消息多了之后,插入、移除消息的耗时变得可观

针对上述原因,可以采取以下优化措施:

  1. 减小锁粒度或者采用无锁数据结构
  2. 消息循环里面一般按顺序处理的消息居多,因此可以把消息队列分为至少两个:一个先入先出队列;一个带排序的队列(堆)
  3. 采用两个缓冲区。一个用于写,一个用于读
  4. 采用对象池优化内存分配

优化过程要注意以下问题:

  1. 消息的回调函数内可能会再调用 post 发送消息,容易发生死锁。

即使不采用无锁数据结构,只把锁粒度减小,就能把效率翻倍。

其实,循环的方式多种多样,像我遇到的场景就采用了下面的循环:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (!quit) { bool onceMore = myLogic(); if (!onceMore) { while (!quit && !otherCondition()) { message msg = getNext(); msg.callback(); } } else if (hasNext()) { message msg = getNext(); msg.callback(); } }

这种循环的特点是,myLogic() 会尽可能多的执行,同时消息来了也能及时处理,适合一些实时性高的场合。正是因为循环的方式多样,封装好的 message_loop 往往需要提供各种 hook 点,比如空闲处理、进入等待前、唤醒后等等。不过,灵活性增加后,效率就会牺牲一点,这时可以考虑把消息队列和消息循环分开。

最后

以上就是畅快服饰最近收集整理的关于C++ 消息循环的全部内容,更多相关C++内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(87)

评论列表共有 0 条评论

立即
投稿
返回
顶部