概述
场景
1.所有STL的类实例都是并发读线程安全的,除了shared_ptr,iostream 可允许并发写.在设计对象结构时,往往会用到std::vector
,std::map
作为容器存储. 并在多线程程序里并发读写. 当然如果牺牲性能只在一个特定线程里读/写当然也可以,只是性能会降低,而且会带来异步执行的烦恼.
2.当然如果多线程可以读写,那么对这个共享对象是需要加锁的, 而多线程的坏处是你并不知道其他线程是否也是读, 如果都是读, 那么可以利用STL类实例的并发读线程安全特性同时共享读, 而在写的时候是独占写, 这样性能就提高很多.
3.Windows没有临界区读写锁, 但是pthread有一个特性就是 pthread_rwlock_t
, 它正是pthread实现自己的读写锁接口. 这里我们参考 Programming With POSIX Threads 大作的例子 7.1.2 实现了一个自己的Read/Write 锁: 并发读, 独占写.
说明
1.pthread通过使用 pthread_cond_t
来实现线程之间的通讯,等待来达到加锁解锁的目的. rwlock
实现了自己的 rwl_readlock
从而不需要长时间锁住对象, 这样就能提高mutex
的利用率, 简直达到了产品级别水平.
2.我这里修改了rwlock.c
, 修复了一些读优先问题,引入了一个 priority
变量, 从而让 rwlock_t
容易切换读优先还是写优先. pthread 自带的读写锁后来才发现, pthread_rwlock_t
的优先策略没搞懂.
例子
rwlock.h
#ifndef RWLOCK_H
#define RWLOCK_H
/*
* rwlock.h
*
* This header file describes the "reader/writer lock" synchronization
* construct. The type rwlock_t describes the full state of the lock
* including the POSIX 1003.1c synchronization objects necessary.
*
* A reader/writer lock allows a thread to lock shared data either for shared
* read access or exclusive write access.
*
* The rwl_init() and rwl_destroy() functions, respectively, allow you to
* initialize/create and destroy/free the reader/writer lock.
*/
#include <pthread.h>
typedef enum LockPriority1
{
kLockPriorityRead = 0,
kLockPriorityWrite
}LockPriority;
/*
* Structure describing a read-write lock.
*/
typedef struct rwlock_tag {
pthread_mutex_t mutex;
pthread_cond_t read; /* wait for read */
pthread_cond_t write; /* wait for write */
int valid; /* set when valid */
int r_active; /* readers active */
int w_active; /* writer active */
int r_wait; /* readers waiting */
int w_wait; /* writers waiting */
LockPriority priority; /* priority for read or write mutex*/
} rwlock_t;
#define RWLOCK_VALID 0xfacade
/*
* Support static initialization of barriers
*/
#define RWL_INITIALIZER
{PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
PTHREAD_COND_INITIALIZER, RWLOCK_VALID, 0, 0, 0, 0}
/*
* Define read-write lock functions
*/
extern int rwl_init (rwlock_t *rwlock);
extern int rwl_destroy (rwlock_t *rwlock);
extern int rwl_readlock (rwlock_t *rwlock);
extern int rwl_readtrylock (rwlock_t *rwlock);
extern int rwl_readunlock (rwlock_t *rwlock);
extern int rwl_writelock (rwlock_t *rwlock);
extern int rwl_writetrylock (rwlock_t *rwlock);
extern int rwl_writeunlock (rwlock_t *rwlock);
#endif
rwlock.cpp
/*
* rwlock.c
*
* This file implements the "read-write lock" synchronization
* construct.
*
* A read-write lock allows a thread to lock shared data either
* for shared read access or exclusive write access.
*
* The rwl_init() and rwl_destroy() functions, respectively,
* allow you to initialize/create and destroy/free the
* read-write lock.
*
* The rwl_readlock() function locks a read-write lock for
* shared read access, and rwl_readunlock() releases the
* lock. rwl_readtrylock() attempts to lock a read-write lock
* for read access, and returns EBUSY instead of blocking.
*
* The rwl_writelock() function locks a read-write lock for
* exclusive write access, and rwl_writeunlock() releases the
* lock. rwl_writetrylock() attempts to lock a read-write lock
* for write access, and returns EBUSY instead of blocking.
*/
#include <Windows.h>
#include <pthread.h>
#include "errors.h"
#include "rwlock.h"
#include <iostream>
/*
* Initialize a read-write lock
*/
int rwl_init (rwlock_t *rwl)
{
int status;
rwl->priority = kLockPriorityRead;
rwl->r_active = 0;
rwl->r_wait = rwl->w_wait = 0;
rwl->w_active = 0;
status = pthread_mutex_init (&rwl->mutex, NULL);
if (status != 0)
return status;
status = pthread_cond_init (&rwl->read, NULL);
if (status != 0) {
/* if unable to create read CV, destroy mutex */
pthread_mutex_destroy (&rwl->mutex);
return status;
}
status = pthread_cond_init (&rwl->write, NULL);
if (status != 0) {
/* if unable to create write CV, destroy read CV and mutex */
pthread_cond_destroy (&rwl->read);
pthread_mutex_destroy (&rwl->mutex);
return status;
}
rwl->valid = RWLOCK_VALID;
return 0;
}
/*
* Destroy a read-write lock
*/
int rwl_destroy (rwlock_t *rwl)
{
int status, status1, status2;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock (&rwl->mutex);
if (status != 0)
return status;
/*
* Check whether any threads own the lock; report "BUSY" if
* so.
*/
if (rwl->r_active > 0 || rwl->w_active) {
pthread_mutex_unlock (&rwl->mutex);
return EBUSY;
}
/*
* Check whether any threads are known to be waiting; report
* EBUSY if so.
*/
if (rwl->r_wait != 0 || rwl->w_wait != 0) {
pthread_mutex_unlock (&rwl->mutex);
return EBUSY;
}
rwl->valid = 0;
status = pthread_mutex_unlock (&rwl->mutex);
if (status != 0)
return status;
status = pthread_mutex_destroy (&rwl->mutex);
status1 = pthread_cond_destroy (&rwl->read);
status2 = pthread_cond_destroy (&rwl->write);
return (status == 0 ? status : (status1 == 0 ? status1 : status2));
}
/*
* Handle cleanup when the read lock condition variable
* wait is cancelled.
*
* Simply record that the thread is no longer waiting,
* and unlock the mutex.
*/
static void rwl_readcleanup (void *arg)
{
rwlock_t *rwl = (rwlock_t *)arg;
rwl->r_wait--;
pthread_mutex_unlock (&rwl->mutex);
}
/*
* Lock a read-write lock for read access.
*/
int rwl_readlock (rwlock_t *rwl)
{
int status;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock (&rwl->mutex);
if (status != 0)
return status;
switch(rwl->priority)
{
case kLockPriorityRead:
{
if (rwl->w_active) {
rwl->r_wait++;
pthread_cleanup_push (rwl_readcleanup, (void*)rwl);
while (rwl->w_active) {
status = pthread_cond_wait (&rwl->read, &rwl->mutex);
if (status != 0)
break;
}
pthread_cleanup_pop (0);
rwl->r_wait--;
}
break;
}
case kLockPriorityWrite:
{
if (rwl->w_active || rwl->w_wait) {
rwl->r_wait++;
pthread_cleanup_push (rwl_readcleanup, (void*)rwl);
while (rwl->w_active || rwl->w_wait) {
status = pthread_cond_wait (&rwl->read, &rwl->mutex);
if (status != 0)
break;
}
pthread_cleanup_pop (0);
rwl->r_wait--;
}
break;
}
}
if (status == 0)
rwl->r_active++;
pthread_mutex_unlock (&rwl->mutex);
return status;
}
/*
* Attempt to lock a read-write lock for read access (don't
* block if unavailable).
*/
int rwl_readtrylock (rwlock_t *rwl)
{
int status, status2;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock (&rwl->mutex);
if (status != 0)
return status;
switch(rwl->priority)
{
case kLockPriorityRead:
{
if (rwl->w_active)
status = EBUSY;
else
rwl->r_active++;
break;
}
case kLockPriorityWrite:
{
if (rwl->w_active || rwl->w_wait)
status = EBUSY;
else
rwl->r_active++;
break;
}
}
status2 = pthread_mutex_unlock (&rwl->mutex);
return (status2 != 0 ? status2 : status);
}
/*
* Unlock a read-write lock from read access.
*/
int rwl_readunlock (rwlock_t *rwl)
{
int status, status2;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock (&rwl->mutex);
if (status != 0)
return status;
rwl->r_active--;
switch(rwl->priority)
{
case kLockPriorityRead:
{
if (rwl->r_active == 0 && rwl->r_wait == 0 && rwl->w_wait > 0)
status = pthread_cond_signal (&rwl->write);
break;
}
case kLockPriorityWrite:
{
if (rwl->r_active == 0 && rwl->w_wait > 0)
status = pthread_cond_signal (&rwl->write);
break;
}
}
status2 = pthread_mutex_unlock (&rwl->mutex);
return (status2 == 0 ? status : status2);
}
/*
* Handle cleanup when the write lock condition variable
* wait is cancelled.
*
* Simply record that the thread is no longer waiting,
* and unlock the mutex.
*/
static void rwl_writecleanup (void *arg)
{
rwlock_t *rwl = (rwlock_t *)arg;
rwl->w_wait--;
pthread_mutex_unlock (&rwl->mutex);
}
/*
* Lock a read-write lock for write access.
*/
int rwl_writelock (rwlock_t *rwl)
{
int status;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock (&rwl->mutex);
if (status != 0)
return status;
switch(rwl->priority)
{
case kLockPriorityRead:
{
if (rwl->w_active || rwl->r_active > 0 || rwl->r_wait > 0) {
rwl->w_wait++;
pthread_cleanup_push (rwl_writecleanup, (void*)rwl);
while (rwl->w_active || rwl->r_active > 0 || rwl->r_wait > 0) {
status = pthread_cond_wait (&rwl->write, &rwl->mutex);
if (status != 0)
break;
}
pthread_cleanup_pop (0);
rwl->w_wait--;
}
break;
}
case kLockPriorityWrite:
{
if (rwl->w_active || rwl->r_active > 0) {
rwl->w_wait++;
pthread_cleanup_push (rwl_writecleanup, (void*)rwl);
while (rwl->w_active || rwl->r_active > 0) {
status = pthread_cond_wait (&rwl->write, &rwl->mutex);
if (status != 0)
break;
}
pthread_cleanup_pop (0);
rwl->w_wait--;
}
break;
}
}
if (status == 0)
rwl->w_active = 1;
pthread_mutex_unlock (&rwl->mutex);
return status;
}
/*
* Attempt to lock a read-write lock for write access. Don't
* block if unavailable.
*/
int rwl_writetrylock (rwlock_t *rwl)
{
int status, status2;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock (&rwl->mutex);
if (status != 0)
return status;
switch(rwl->priority)
{
case kLockPriorityRead:
{
if (rwl->w_active || rwl->r_active > 0 || rwl->r_wait)
status = EBUSY;
else
rwl->w_active = 1;
break;
}
case kLockPriorityWrite:
{
if (rwl->w_active || rwl->r_active > 0)
status = EBUSY;
else
rwl->w_active = 1;
break;
}
}
status2 = pthread_mutex_unlock (&rwl->mutex);
return (status != 0 ? status : status2);
}
/*
* Unlock a read-write lock from write access.
*/
int rwl_writeunlock (rwlock_t *rwl)
{
int status;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock (&rwl->mutex);
if (status != 0)
return status;
rwl->w_active = 0;
switch(rwl->priority)
{
case kLockPriorityRead:
{
if (rwl->r_wait > 0) {
status = pthread_cond_broadcast (&rwl->read);
if (status != 0) {
pthread_mutex_unlock (&rwl->mutex);
return status;
}
} else if (rwl->w_wait > 0) {
status = pthread_cond_signal (&rwl->write);
if (status != 0) {
pthread_mutex_unlock (&rwl->mutex);
return status;
}
}
break;
}
case kLockPriorityWrite:
{
if (rwl->w_wait > 0) {
status = pthread_cond_signal (&rwl->write);
if (status != 0) {
pthread_mutex_unlock (&rwl->mutex);
return status;
}
} else if (rwl->r_wait > 0) {
status = pthread_cond_broadcast(&rwl->read);
if (status != 0) {
pthread_mutex_unlock (&rwl->mutex);
return status;
}
}
break;
}
}
status = pthread_mutex_unlock (&rwl->mutex);
return status;
}
test-ReadWriteLock.cpp
// test-ReadWriteLock.cpp : 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include <Windows.h>
#include "rwlock.h"
#include "errors.h"
#include <stdlib.h>
#include <vector>
#include <iostream>
#include "pthread.h"
#define err_abort(a,b) perror(b),abort()
#define DEVICES 5
#define THREADS 5
#define ITERATORS 10
rwlock_t gDeviceLock;
pthread_rwlock_t gPthreadRwlock;
std::vector<std::pair<std::string,std::string>> devices;
int rand_r(unsigned int* seed){
srand(*seed);
return rand();
}
typedef struct thread_tag {
bool read;
int thread_index;
pthread_t thread_id;
} thread_t;
void *thread_routine (void *arg)
{
auto thread1 = (thread_t*)(arg);
for(int i = 0; i< ITERATORS;++i){
if( thread1->read){
rwl_readlock(&gDeviceLock);
std::cout << "rwl_readlock r_active: " << gDeviceLock.r_active << " w_active: " << gDeviceLock.w_active << std::endl;
for(int j = 0; j< DEVICES;++j){
auto& one = devices[j];
}
rwl_readunlock(&gDeviceLock);
}else{
rwl_writelock(&gDeviceLock);
std::cout << "rwl_writelock r_active: " << gDeviceLock.r_active << " w_active: " << gDeviceLock.w_active << std::endl;
for(int j = 0; j< DEVICES;++j){
devices[j] = devices[DEVICES-j-1];
}
rwl_writeunlock(&gDeviceLock);
}
}
return NULL;
}
void TestRwlLock(){
thread_t thread1[THREADS];
char name[32];
char value[32];
for(int i = 0; i< DEVICES;++i){
sprintf(name,"name-%d",i);
sprintf(value,"value-%d",i);
devices.push_back(std::make_pair<std::string,std::string>(name,value));
}
for(int i = 0; i< THREADS;++i){
thread1[i].thread_index = i;
thread1[i].read = i % 2;
pthread_create(&thread1[i].thread_id,NULL,thread_routine,&thread1[i]);
}
for(int i =0; i<THREADS;++i){
pthread_join(thread1[i].thread_id,NULL);
}
}
void Func(std::string& one){
}
void *pthread_routine (void *arg)
{
auto thread1 = (thread_t*)(arg);
for(int i = 0; i< 100;++i){
if( thread1->read){
pthread_rwlock_rdlock(&gPthreadRwlock);
std::cout << "pthread_rwlock_rdlock: " << std::endl;
for(int j = 0; j< devices.size();++j){
auto& one = devices[j];
Func(one.first);
}
pthread_rwlock_unlock(&gPthreadRwlock);
}else{
pthread_rwlock_wrlock(&gPthreadRwlock);
std::cout << "pthread_rwlock_wrlock: " << std::endl;
for(int j = 0; j< DEVICES;++j){
devices[j] = devices[DEVICES-j-1];
devices.push_back(std::make_pair<std::string,std::string>("name","value"));
}
pthread_rwlock_unlock(&gPthreadRwlock);
}
}
return NULL;
}
void TestPthreadRwlLock(){
thread_t thread1[THREADS];
char name[32];
char value[32];
for(int i = 0; i< DEVICES;++i){
sprintf(name,"name-%d",i);
sprintf(value,"value-%d",i);
devices.push_back(std::make_pair<std::string,std::string>(name,value));
}
for(int i = 0; i< THREADS;++i){
thread1[i].thread_index = i;
thread1[i].read = i % 2;
pthread_create(&thread1[i].thread_id,NULL,pthread_routine,&thread1[i]);
}
for(int i =0; i<THREADS;++i){
pthread_join(thread1[i].thread_id,NULL);
}
}
int _tmain(int argc, _TCHAR* argv[])
{
// 自定义读写锁
setbuf(stdout,NULL);
rwl_init(&gDeviceLock);
std::cout << "Test Read Priorrity" << std::endl;
TestRwlLock();
std::cout << "Test Write Priorrity" << std::endl;
gDeviceLock.priority = kLockPriorityWrite;
TestRwlLock();
rwl_destroy(&gDeviceLock);
// pthread自带读写锁
std::cout << "Test Phtread RWLock" << std::endl;
pthread_rwlock_init(&gPthreadRwlock,NULL);
TestPthreadRwlLock();
TestPthreadRwlLock();
pthread_rwlock_destroy(&gPthreadRwlock);
system("pause");
return 0;
}
参考
C++标准库里的线程安全问题
Programming With POSIX Threads 7.1.2
最后
以上就是彪壮枫叶为你收集整理的[并发并行]_[中级]_[实现Pthread线程并发读写锁rwlock]场景说明例子参考的全部内容,希望文章能够帮你解决[并发并行]_[中级]_[实现Pthread线程并发读写锁rwlock]场景说明例子参考所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复