我是靠谱客的博主 彪壮枫叶,这篇文章主要介绍[并发并行]_[中级]_[实现Pthread线程并发读写锁rwlock]场景说明例子参考,现在分享给大家,希望可以做个参考。

场景

1.所有STL的类实例都是并发读线程安全的,除了shared_ptr,iostream 可允许并发写.在设计对象结构时,往往会用到std::vector,std::map 作为容器存储. 并在多线程程序里并发读写. 当然如果牺牲性能只在一个特定线程里读/写当然也可以,只是性能会降低,而且会带来异步执行的烦恼.

2.当然如果多线程可以读写,那么对这个共享对象是需要加锁的, 而多线程的坏处是你并不知道其他线程是否也是读, 如果都是读, 那么可以利用STL类实例的并发读线程安全特性同时共享读, 而在写的时候是独占写, 这样性能就提高很多.

3.Windows没有临界区读写锁, 但是pthread有一个特性就是 pthread_rwlock_t, 它正是pthread实现自己的读写锁接口. 这里我们参考 Programming With POSIX Threads 大作的例子 7.1.2 实现了一个自己的Read/Write 锁: 并发读, 独占写.

说明

1.pthread通过使用 pthread_cond_t 来实现线程之间的通讯,等待来达到加锁解锁的目的. rwlock实现了自己的 rwl_readlock 从而不需要长时间锁住对象, 这样就能提高mutex 的利用率, 简直达到了产品级别水平.

2.我这里修改了rwlock.c, 修复了一些读优先问题,引入了一个 priority 变量, 从而让 rwlock_t 容易切换读优先还是写优先. pthread 自带的读写锁后来才发现, pthread_rwlock_t 的优先策略没搞懂.

例子

rwlock.h

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#ifndef RWLOCK_H #define RWLOCK_H /* * rwlock.h * * This header file describes the "reader/writer lock" synchronization * construct. The type rwlock_t describes the full state of the lock * including the POSIX 1003.1c synchronization objects necessary. * * A reader/writer lock allows a thread to lock shared data either for shared * read access or exclusive write access. * * The rwl_init() and rwl_destroy() functions, respectively, allow you to * initialize/create and destroy/free the reader/writer lock. */ #include <pthread.h> typedef enum LockPriority1 { kLockPriorityRead = 0, kLockPriorityWrite }LockPriority; /* * Structure describing a read-write lock. */ typedef struct rwlock_tag { pthread_mutex_t mutex; pthread_cond_t read; /* wait for read */ pthread_cond_t write; /* wait for write */ int valid; /* set when valid */ int r_active; /* readers active */ int w_active; /* writer active */ int r_wait; /* readers waiting */ int w_wait; /* writers waiting */ LockPriority priority; /* priority for read or write mutex*/ } rwlock_t; #define RWLOCK_VALID 0xfacade /* * Support static initialization of barriers */ #define RWL_INITIALIZER {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, RWLOCK_VALID, 0, 0, 0, 0} /* * Define read-write lock functions */ extern int rwl_init (rwlock_t *rwlock); extern int rwl_destroy (rwlock_t *rwlock); extern int rwl_readlock (rwlock_t *rwlock); extern int rwl_readtrylock (rwlock_t *rwlock); extern int rwl_readunlock (rwlock_t *rwlock); extern int rwl_writelock (rwlock_t *rwlock); extern int rwl_writetrylock (rwlock_t *rwlock); extern int rwl_writeunlock (rwlock_t *rwlock); #endif

rwlock.cpp

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
/* * rwlock.c * * This file implements the "read-write lock" synchronization * construct. * * A read-write lock allows a thread to lock shared data either * for shared read access or exclusive write access. * * The rwl_init() and rwl_destroy() functions, respectively, * allow you to initialize/create and destroy/free the * read-write lock. * * The rwl_readlock() function locks a read-write lock for * shared read access, and rwl_readunlock() releases the * lock. rwl_readtrylock() attempts to lock a read-write lock * for read access, and returns EBUSY instead of blocking. * * The rwl_writelock() function locks a read-write lock for * exclusive write access, and rwl_writeunlock() releases the * lock. rwl_writetrylock() attempts to lock a read-write lock * for write access, and returns EBUSY instead of blocking. */ #include <Windows.h> #include <pthread.h> #include "errors.h" #include "rwlock.h" #include <iostream> /* * Initialize a read-write lock */ int rwl_init (rwlock_t *rwl) { int status; rwl->priority = kLockPriorityRead; rwl->r_active = 0; rwl->r_wait = rwl->w_wait = 0; rwl->w_active = 0; status = pthread_mutex_init (&rwl->mutex, NULL); if (status != 0) return status; status = pthread_cond_init (&rwl->read, NULL); if (status != 0) { /* if unable to create read CV, destroy mutex */ pthread_mutex_destroy (&rwl->mutex); return status; } status = pthread_cond_init (&rwl->write, NULL); if (status != 0) { /* if unable to create write CV, destroy read CV and mutex */ pthread_cond_destroy (&rwl->read); pthread_mutex_destroy (&rwl->mutex); return status; } rwl->valid = RWLOCK_VALID; return 0; } /* * Destroy a read-write lock */ int rwl_destroy (rwlock_t *rwl) { int status, status1, status2; if (rwl->valid != RWLOCK_VALID) return EINVAL; status = pthread_mutex_lock (&rwl->mutex); if (status != 0) return status; /* * Check whether any threads own the lock; report "BUSY" if * so. */ if (rwl->r_active > 0 || rwl->w_active) { pthread_mutex_unlock (&rwl->mutex); return EBUSY; } /* * Check whether any threads are known to be waiting; report * EBUSY if so. */ if (rwl->r_wait != 0 || rwl->w_wait != 0) { pthread_mutex_unlock (&rwl->mutex); return EBUSY; } rwl->valid = 0; status = pthread_mutex_unlock (&rwl->mutex); if (status != 0) return status; status = pthread_mutex_destroy (&rwl->mutex); status1 = pthread_cond_destroy (&rwl->read); status2 = pthread_cond_destroy (&rwl->write); return (status == 0 ? status : (status1 == 0 ? status1 : status2)); } /* * Handle cleanup when the read lock condition variable * wait is cancelled. * * Simply record that the thread is no longer waiting, * and unlock the mutex. */ static void rwl_readcleanup (void *arg) { rwlock_t *rwl = (rwlock_t *)arg; rwl->r_wait--; pthread_mutex_unlock (&rwl->mutex); } /* * Lock a read-write lock for read access. */ int rwl_readlock (rwlock_t *rwl) { int status; if (rwl->valid != RWLOCK_VALID) return EINVAL; status = pthread_mutex_lock (&rwl->mutex); if (status != 0) return status; switch(rwl->priority) { case kLockPriorityRead: { if (rwl->w_active) { rwl->r_wait++; pthread_cleanup_push (rwl_readcleanup, (void*)rwl); while (rwl->w_active) { status = pthread_cond_wait (&rwl->read, &rwl->mutex); if (status != 0) break; } pthread_cleanup_pop (0); rwl->r_wait--; } break; } case kLockPriorityWrite: { if (rwl->w_active || rwl->w_wait) { rwl->r_wait++; pthread_cleanup_push (rwl_readcleanup, (void*)rwl); while (rwl->w_active || rwl->w_wait) { status = pthread_cond_wait (&rwl->read, &rwl->mutex); if (status != 0) break; } pthread_cleanup_pop (0); rwl->r_wait--; } break; } } if (status == 0) rwl->r_active++; pthread_mutex_unlock (&rwl->mutex); return status; } /* * Attempt to lock a read-write lock for read access (don't * block if unavailable). */ int rwl_readtrylock (rwlock_t *rwl) { int status, status2; if (rwl->valid != RWLOCK_VALID) return EINVAL; status = pthread_mutex_lock (&rwl->mutex); if (status != 0) return status; switch(rwl->priority) { case kLockPriorityRead: { if (rwl->w_active) status = EBUSY; else rwl->r_active++; break; } case kLockPriorityWrite: { if (rwl->w_active || rwl->w_wait) status = EBUSY; else rwl->r_active++; break; } } status2 = pthread_mutex_unlock (&rwl->mutex); return (status2 != 0 ? status2 : status); } /* * Unlock a read-write lock from read access. */ int rwl_readunlock (rwlock_t *rwl) { int status, status2; if (rwl->valid != RWLOCK_VALID) return EINVAL; status = pthread_mutex_lock (&rwl->mutex); if (status != 0) return status; rwl->r_active--; switch(rwl->priority) { case kLockPriorityRead: { if (rwl->r_active == 0 && rwl->r_wait == 0 && rwl->w_wait > 0) status = pthread_cond_signal (&rwl->write); break; } case kLockPriorityWrite: { if (rwl->r_active == 0 && rwl->w_wait > 0) status = pthread_cond_signal (&rwl->write); break; } } status2 = pthread_mutex_unlock (&rwl->mutex); return (status2 == 0 ? status : status2); } /* * Handle cleanup when the write lock condition variable * wait is cancelled. * * Simply record that the thread is no longer waiting, * and unlock the mutex. */ static void rwl_writecleanup (void *arg) { rwlock_t *rwl = (rwlock_t *)arg; rwl->w_wait--; pthread_mutex_unlock (&rwl->mutex); } /* * Lock a read-write lock for write access. */ int rwl_writelock (rwlock_t *rwl) { int status; if (rwl->valid != RWLOCK_VALID) return EINVAL; status = pthread_mutex_lock (&rwl->mutex); if (status != 0) return status; switch(rwl->priority) { case kLockPriorityRead: { if (rwl->w_active || rwl->r_active > 0 || rwl->r_wait > 0) { rwl->w_wait++; pthread_cleanup_push (rwl_writecleanup, (void*)rwl); while (rwl->w_active || rwl->r_active > 0 || rwl->r_wait > 0) { status = pthread_cond_wait (&rwl->write, &rwl->mutex); if (status != 0) break; } pthread_cleanup_pop (0); rwl->w_wait--; } break; } case kLockPriorityWrite: { if (rwl->w_active || rwl->r_active > 0) { rwl->w_wait++; pthread_cleanup_push (rwl_writecleanup, (void*)rwl); while (rwl->w_active || rwl->r_active > 0) { status = pthread_cond_wait (&rwl->write, &rwl->mutex); if (status != 0) break; } pthread_cleanup_pop (0); rwl->w_wait--; } break; } } if (status == 0) rwl->w_active = 1; pthread_mutex_unlock (&rwl->mutex); return status; } /* * Attempt to lock a read-write lock for write access. Don't * block if unavailable. */ int rwl_writetrylock (rwlock_t *rwl) { int status, status2; if (rwl->valid != RWLOCK_VALID) return EINVAL; status = pthread_mutex_lock (&rwl->mutex); if (status != 0) return status; switch(rwl->priority) { case kLockPriorityRead: { if (rwl->w_active || rwl->r_active > 0 || rwl->r_wait) status = EBUSY; else rwl->w_active = 1; break; } case kLockPriorityWrite: { if (rwl->w_active || rwl->r_active > 0) status = EBUSY; else rwl->w_active = 1; break; } } status2 = pthread_mutex_unlock (&rwl->mutex); return (status != 0 ? status : status2); } /* * Unlock a read-write lock from write access. */ int rwl_writeunlock (rwlock_t *rwl) { int status; if (rwl->valid != RWLOCK_VALID) return EINVAL; status = pthread_mutex_lock (&rwl->mutex); if (status != 0) return status; rwl->w_active = 0; switch(rwl->priority) { case kLockPriorityRead: { if (rwl->r_wait > 0) { status = pthread_cond_broadcast (&rwl->read); if (status != 0) { pthread_mutex_unlock (&rwl->mutex); return status; } } else if (rwl->w_wait > 0) { status = pthread_cond_signal (&rwl->write); if (status != 0) { pthread_mutex_unlock (&rwl->mutex); return status; } } break; } case kLockPriorityWrite: { if (rwl->w_wait > 0) { status = pthread_cond_signal (&rwl->write); if (status != 0) { pthread_mutex_unlock (&rwl->mutex); return status; } } else if (rwl->r_wait > 0) { status = pthread_cond_broadcast(&rwl->read); if (status != 0) { pthread_mutex_unlock (&rwl->mutex); return status; } } break; } } status = pthread_mutex_unlock (&rwl->mutex); return status; }

test-ReadWriteLock.cpp

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// test-ReadWriteLock.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <Windows.h> #include "rwlock.h" #include "errors.h" #include <stdlib.h> #include <vector> #include <iostream> #include "pthread.h" #define err_abort(a,b) perror(b),abort() #define DEVICES 5 #define THREADS 5 #define ITERATORS 10 rwlock_t gDeviceLock; pthread_rwlock_t gPthreadRwlock; std::vector<std::pair<std::string,std::string>> devices; int rand_r(unsigned int* seed){ srand(*seed); return rand(); } typedef struct thread_tag { bool read; int thread_index; pthread_t thread_id; } thread_t; void *thread_routine (void *arg) { auto thread1 = (thread_t*)(arg); for(int i = 0; i< ITERATORS;++i){ if( thread1->read){ rwl_readlock(&gDeviceLock); std::cout << "rwl_readlock r_active: " << gDeviceLock.r_active << " w_active: " << gDeviceLock.w_active << std::endl; for(int j = 0; j< DEVICES;++j){ auto& one = devices[j]; } rwl_readunlock(&gDeviceLock); }else{ rwl_writelock(&gDeviceLock); std::cout << "rwl_writelock r_active: " << gDeviceLock.r_active << " w_active: " << gDeviceLock.w_active << std::endl; for(int j = 0; j< DEVICES;++j){ devices[j] = devices[DEVICES-j-1]; } rwl_writeunlock(&gDeviceLock); } } return NULL; } void TestRwlLock(){ thread_t thread1[THREADS]; char name[32]; char value[32]; for(int i = 0; i< DEVICES;++i){ sprintf(name,"name-%d",i); sprintf(value,"value-%d",i); devices.push_back(std::make_pair<std::string,std::string>(name,value)); } for(int i = 0; i< THREADS;++i){ thread1[i].thread_index = i; thread1[i].read = i % 2; pthread_create(&thread1[i].thread_id,NULL,thread_routine,&thread1[i]); } for(int i =0; i<THREADS;++i){ pthread_join(thread1[i].thread_id,NULL); } } void Func(std::string& one){ } void *pthread_routine (void *arg) { auto thread1 = (thread_t*)(arg); for(int i = 0; i< 100;++i){ if( thread1->read){ pthread_rwlock_rdlock(&gPthreadRwlock); std::cout << "pthread_rwlock_rdlock: " << std::endl; for(int j = 0; j< devices.size();++j){ auto& one = devices[j]; Func(one.first); } pthread_rwlock_unlock(&gPthreadRwlock); }else{ pthread_rwlock_wrlock(&gPthreadRwlock); std::cout << "pthread_rwlock_wrlock: " << std::endl; for(int j = 0; j< DEVICES;++j){ devices[j] = devices[DEVICES-j-1]; devices.push_back(std::make_pair<std::string,std::string>("name","value")); } pthread_rwlock_unlock(&gPthreadRwlock); } } return NULL; } void TestPthreadRwlLock(){ thread_t thread1[THREADS]; char name[32]; char value[32]; for(int i = 0; i< DEVICES;++i){ sprintf(name,"name-%d",i); sprintf(value,"value-%d",i); devices.push_back(std::make_pair<std::string,std::string>(name,value)); } for(int i = 0; i< THREADS;++i){ thread1[i].thread_index = i; thread1[i].read = i % 2; pthread_create(&thread1[i].thread_id,NULL,pthread_routine,&thread1[i]); } for(int i =0; i<THREADS;++i){ pthread_join(thread1[i].thread_id,NULL); } } int _tmain(int argc, _TCHAR* argv[]) { // 自定义读写锁 setbuf(stdout,NULL); rwl_init(&gDeviceLock); std::cout << "Test Read Priorrity" << std::endl; TestRwlLock(); std::cout << "Test Write Priorrity" << std::endl; gDeviceLock.priority = kLockPriorityWrite; TestRwlLock(); rwl_destroy(&gDeviceLock); // pthread自带读写锁 std::cout << "Test Phtread RWLock" << std::endl; pthread_rwlock_init(&gPthreadRwlock,NULL); TestPthreadRwlLock(); TestPthreadRwlLock(); pthread_rwlock_destroy(&gPthreadRwlock); system("pause"); return 0; }

参考

C++标准库里的线程安全问题
Programming With POSIX Threads 7.1.2

最后

以上就是彪壮枫叶最近收集整理的关于[并发并行]_[中级]_[实现Pthread线程并发读写锁rwlock]场景说明例子参考的全部内容,更多相关[并发并行]_[中级]_[实现Pthread线程并发读写锁rwlock]场景说明例子参考内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部