我是靠谱客的博主 迅速蛋挞,最近开发中收集的这篇文章主要介绍POSIX多线程程序设计 工作流例程2,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

之前的例程有说到超出工作单元数量的输入数据会使得整个进程进入死锁状态,然后改进了一下,让用户可以选择是获取数据再加入新的数据还是放弃加入新数据。


还有一点就是,我认为pipe_t中的mutex锁是多余的,因为其他线程只会修改工作单元内部的数据,而并不会影响工作流pipe链表的结构,而更改pipe_t的数据的只有pipe_result和pipe_start这两个函数,但是这两个函数只会在主线程中被调用,所以他们的运行是串行的,并不会发生线程上的冲突。


我试着将所有锁定,解锁pipe的mutex的代码都注释掉,测试来很多遍目前还没有发现问题。


#include<pthread.h>
#include "errors.h"
typedef struct stage_tag{
pthread_mutex_t
mutex;
pthread_cond_t
avail;
pthread_cond_t
ready;
int
data_ready;
long
data;
pthread_t
thread;
struct stage_tag
*next;
}stage_t;
typedef struct pipe_tag{
//pthread_mutex_t	mutex;
stage_t
*head;
stage_t
*tail;
int
stages;
int
active;
}pipe_t;
int pipe_send(stage_t *stage,long data){
int status;
status = pthread_mutex_lock(&stage->mutex);
if(status != 0)
return status;
while(stage->data_ready){
status = pthread_cond_wait(&stage->ready,&stage->mutex);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
}
stage->data = data;
stage->data_ready = 1;
status = pthread_cond_signal(&stage->avail);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
status = pthread_mutex_unlock(&stage->mutex);
return status;
}
void *pipe_stage(void *arg){
stage_t *stage = (stage_t*)arg;
stage_t *next_stage = stage->next;
int status;
status = pthread_mutex_lock(&stage->mutex);
if(status != 0)
err_abort(status,"Lock pipe stage");
while(true){
while(stage->data_ready != 1){
status = pthread_cond_wait(&stage->avail,&stage->mutex);
if(status != 0)
err_abort(status,"Wait for previous stage");
}
pipe_send(next_stage, stage->data + 1);
stage->data_ready = 0;
status = pthread_cond_signal(&stage->ready);
if(status != 0)
err_abort(status,"Wake next stage");
}
}
int pipe_create(pipe_t * pipe,int stages){
int pipe_index;
stage_t **link = &pipe->head, *new_stage, *stage;
int status;
//	status = pthread_mutex_init(&pipe->mutex,NULL);
//	if(status != 0)
//
err_abort(status , "Init pipe mutex");
pipe->stages = stages;
pipe->active = 0;
for(pipe_index = 0; pipe_index <= stages; pipe_index++){
new_stage = (stage_t*)malloc(sizeof(stage_t));
if(new_stage == NULL)
errno_abort("Allocate stage");
status = pthread_mutex_init(&new_stage->mutex,NULL);
if(status != 0)
err_abort(status,"Init stage mutex");
status = pthread_cond_init(&new_stage->avail,NULL);
if(status != 0)
err_abort(status,"Init avail condition");
status = pthread_cond_init(&new_stage->ready,NULL);
if(status != 0)
err_abort(status, "Init ready condition");
new_stage->data_ready = 0;
*link = new_stage;
link = &new_stage->next;
}
*link = (stage_t*)NULL;
pipe->tail = new_stage;
for(stage = pipe->head;stage->next != NULL;stage = stage->next){
status = pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage);
if(status != 0)
err_abort(status,"Create pipe stage");
}
return 0;
}
int pipe_result(pipe_t *pipe,long *result){
stage_t *tail = pipe->tail;
long value;
int empty = 0;
int status;
//	status = pthread_mutex_lock(&pipe->mutex);
//	if(status != 0)
//
err_abort(status, "Lock pipe mutex");
if(pipe->active <= 0)
empty = 1;
else
pipe->active--;
//	status = pthread_mutex_unlock(&pipe->mutex);
//	if(status != 0)
//
err_abort(status, "Unlock pipe mutex");
if(empty)
return 0;
pthread_mutex_lock(&tail->mutex);
while(!tail->data_ready)
pthread_cond_wait(&tail->avail,&tail->mutex);
*result = tail->data;
tail->data_ready = 0;
pthread_cond_signal(&tail->ready);
pthread_mutex_unlock(&tail->mutex);
return 1;
}
int pipe_start(pipe_t *pipe, long value){
int status;
//	status = pthread_mutex_lock(&pipe->mutex);
//	if(status != 0)
//
err_abort(status,"Lock pipe mutex");
pipe->active++;
if(pipe->active > pipe->stages + 1){//如果活跃中的工作单元比工作单元总数+1(tail所指向的工作单元)还多,则说明工作流已满
char c = '';
while(true){//下面是简单的处理过程
printf("**The pipe is full**n");
printf("Get result and add new data?(y/n):");
scanf("%c",&c);
if(c == 'y'){
//
status = pthread_mutex_unlock(&pipe->mutex);
//
if(status != 0)
//
err_abort(status, "Unlock pipe mutex");
long res;
pipe_result(pipe,&res);
printf("Result is %ldn",res);
//
status = pthread_mutex_lock(&pipe->mutex);
//
if(status != 0)
//
err_abort(status,"Lock pipe mutex");
break;
}else if(c == 'n'){
pipe->active--;
//
status = pthread_mutex_unlock(&pipe->mutex);
//
if(status != 0)
//
err_abort(status, "Unlock pipe mutex");
return 0;
}
}
}//至此处理过程结束
//	status = pthread_mutex_unlock(&pipe->mutex);
//	if(status != 0)
//
err_abort(status, "Unlock pipe mutex");
pipe_send(pipe->head,value);
return 0;
}
int main(int argc, char *argv[]){
pipe_t my_pipe;
long value,result;
int status;
char line[128];
pipe_create(&my_pipe, 10);
printf("Enter integer values, or "=" for next resultn");
while(true){
printf("Data>");
if(fgets(line,sizeof(line),stdin) == NULL)exit(0);
printf("%s",line);
if(strlen(line) <= 1) continue;
if(strlen(line) <= 2 && line[0] == '='){
if(pipe_result(&my_pipe,&result))
printf("Result is %ldn",result);
else
printf("Pipe is emptyn");
}else{
if(sscanf(line,"%ld",&value) < 1)
fprintf(stderr, "Enter an integer valuen");
else pipe_start(&my_pipe,value);
}
}
}


最后

以上就是迅速蛋挞为你收集整理的POSIX多线程程序设计 工作流例程2的全部内容,希望文章能够帮你解决POSIX多线程程序设计 工作流例程2所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部