概述
之前的例程有说到超出工作单元数量的输入数据会使得整个进程进入死锁状态,然后改进了一下,让用户可以选择是获取数据再加入新的数据还是放弃加入新数据。
还有一点就是,我认为pipe_t中的mutex锁是多余的,因为其他线程只会修改工作单元内部的数据,而并不会影响工作流pipe链表的结构,而更改pipe_t的数据的只有pipe_result和pipe_start这两个函数,但是这两个函数只会在主线程中被调用,所以他们的运行是串行的,并不会发生线程上的冲突。
我试着将所有锁定,解锁pipe的mutex的代码都注释掉,测试来很多遍目前还没有发现问题。
#include<pthread.h>
#include "errors.h"
typedef struct stage_tag{
pthread_mutex_t
mutex;
pthread_cond_t
avail;
pthread_cond_t
ready;
int
data_ready;
long
data;
pthread_t
thread;
struct stage_tag
*next;
}stage_t;
typedef struct pipe_tag{
//pthread_mutex_t mutex;
stage_t
*head;
stage_t
*tail;
int
stages;
int
active;
}pipe_t;
int pipe_send(stage_t *stage,long data){
int status;
status = pthread_mutex_lock(&stage->mutex);
if(status != 0)
return status;
while(stage->data_ready){
status = pthread_cond_wait(&stage->ready,&stage->mutex);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
}
stage->data = data;
stage->data_ready = 1;
status = pthread_cond_signal(&stage->avail);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
status = pthread_mutex_unlock(&stage->mutex);
return status;
}
void *pipe_stage(void *arg){
stage_t *stage = (stage_t*)arg;
stage_t *next_stage = stage->next;
int status;
status = pthread_mutex_lock(&stage->mutex);
if(status != 0)
err_abort(status,"Lock pipe stage");
while(true){
while(stage->data_ready != 1){
status = pthread_cond_wait(&stage->avail,&stage->mutex);
if(status != 0)
err_abort(status,"Wait for previous stage");
}
pipe_send(next_stage, stage->data + 1);
stage->data_ready = 0;
status = pthread_cond_signal(&stage->ready);
if(status != 0)
err_abort(status,"Wake next stage");
}
}
int pipe_create(pipe_t * pipe,int stages){
int pipe_index;
stage_t **link = &pipe->head, *new_stage, *stage;
int status;
// status = pthread_mutex_init(&pipe->mutex,NULL);
// if(status != 0)
//
err_abort(status , "Init pipe mutex");
pipe->stages = stages;
pipe->active = 0;
for(pipe_index = 0; pipe_index <= stages; pipe_index++){
new_stage = (stage_t*)malloc(sizeof(stage_t));
if(new_stage == NULL)
errno_abort("Allocate stage");
status = pthread_mutex_init(&new_stage->mutex,NULL);
if(status != 0)
err_abort(status,"Init stage mutex");
status = pthread_cond_init(&new_stage->avail,NULL);
if(status != 0)
err_abort(status,"Init avail condition");
status = pthread_cond_init(&new_stage->ready,NULL);
if(status != 0)
err_abort(status, "Init ready condition");
new_stage->data_ready = 0;
*link = new_stage;
link = &new_stage->next;
}
*link = (stage_t*)NULL;
pipe->tail = new_stage;
for(stage = pipe->head;stage->next != NULL;stage = stage->next){
status = pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage);
if(status != 0)
err_abort(status,"Create pipe stage");
}
return 0;
}
int pipe_result(pipe_t *pipe,long *result){
stage_t *tail = pipe->tail;
long value;
int empty = 0;
int status;
// status = pthread_mutex_lock(&pipe->mutex);
// if(status != 0)
//
err_abort(status, "Lock pipe mutex");
if(pipe->active <= 0)
empty = 1;
else
pipe->active--;
// status = pthread_mutex_unlock(&pipe->mutex);
// if(status != 0)
//
err_abort(status, "Unlock pipe mutex");
if(empty)
return 0;
pthread_mutex_lock(&tail->mutex);
while(!tail->data_ready)
pthread_cond_wait(&tail->avail,&tail->mutex);
*result = tail->data;
tail->data_ready = 0;
pthread_cond_signal(&tail->ready);
pthread_mutex_unlock(&tail->mutex);
return 1;
}
int pipe_start(pipe_t *pipe, long value){
int status;
// status = pthread_mutex_lock(&pipe->mutex);
// if(status != 0)
//
err_abort(status,"Lock pipe mutex");
pipe->active++;
if(pipe->active > pipe->stages + 1){//如果活跃中的工作单元比工作单元总数+1(tail所指向的工作单元)还多,则说明工作流已满
char c = '