?
9.3??實(shí)驗(yàn)內(nèi)容——“生產(chǎn)者消費(fèi)者”實(shí)驗(yàn)
1.實(shí)驗(yàn)?zāi)康?/h4>
“生產(chǎn)者消費(fèi)者”問(wèn)題是一個(gè)著名的同時(shí)性編程問(wèn)題的集合。通過(guò)學(xué)習(xí)經(jīng)典的“生產(chǎn)者消費(fèi)者”問(wèn)題的實(shí)驗(yàn),讀者可以進(jìn)一步熟悉Linux中的多線程編程,并且掌握用信號(hào)量處理線程間的同步和互斥問(wèn)題。
2.實(shí)驗(yàn)內(nèi)容
“生產(chǎn)者—消費(fèi)者”問(wèn)題描述如下。
有一個(gè)有限緩沖區(qū)和兩個(gè)線程:生產(chǎn)者和消費(fèi)者。他們分別不停地把產(chǎn)品放入緩沖區(qū)和從緩沖區(qū)中拿走產(chǎn)品。一個(gè)生產(chǎn)者在緩沖區(qū)滿的時(shí)候必須等待,一個(gè)消費(fèi)者在緩沖區(qū)空的時(shí)候也必須等待。另外,因?yàn)榫彌_區(qū)是臨界資源,所以生產(chǎn)者和消費(fèi)者之間必須互斥執(zhí)行。它們之間的關(guān)系如圖9.4所示。
圖9.4??生產(chǎn)者消費(fèi)者問(wèn)題描述
這里要求使用有名管道來(lái)模擬有限緩沖區(qū),并且使用信號(hào)量來(lái)解決“生產(chǎn)者—消費(fèi)者”問(wèn)題中的同步和互斥問(wèn)題。
3.實(shí)驗(yàn)步驟
(1)信號(hào)量的考慮。
這里使用3個(gè)信號(hào)量,其中兩個(gè)信號(hào)量avail和full分別用于解決生產(chǎn)者和消費(fèi)者線程之間的同步問(wèn)題,mutex是用于這兩個(gè)線程之間的互斥問(wèn)題。其中avail表示有界緩沖區(qū)中的空單元數(shù),初始值為N;full表示有界緩沖區(qū)中非空單元數(shù),初始值為0;mutex是互斥信號(hào)量,初始值為1。
(2)畫出流程圖。
本實(shí)驗(yàn)流程圖如圖9.5所示。
圖9.5??“生產(chǎn)者—消費(fèi)者”實(shí)驗(yàn)流程圖
?
(3)編寫代碼
本實(shí)驗(yàn)的代碼中采用的有界緩沖區(qū)擁有3個(gè)單元,每個(gè)單元為5個(gè)字節(jié)。為了盡量體現(xiàn)每個(gè)信號(hào)量的意義,在程序中生產(chǎn)過(guò)程和消費(fèi)過(guò)程是隨機(jī)(采取0~5s的隨機(jī)時(shí)間間隔)進(jìn)行的,而且生產(chǎn)者的速度比消費(fèi)者的速度平均快兩倍左右(這種關(guān)系可以相反)。生產(chǎn)者一次生產(chǎn)一個(gè)單元的產(chǎn)品(放入“hello”字符串),消費(fèi)者一次消費(fèi)一個(gè)單元的產(chǎn)品。
/*producer-customer.c*/
#include?<stdio.h>
#include?<stdlib.h>
#include?<unistd.h>
#include?<fcntl.h>
#include?<pthread.h>
#include?<errno.h>
#include?<semaphore.h>
#include?<sys/ipc.h>
#define?MYFIFO????????????"myfifo"?????/*?緩沖區(qū)有名管道的名字?*/
#define?BUFFER_SIZE???????3????????????/*?緩沖區(qū)的單元數(shù)?*/
#define?UNIT_SIZE?????????5?????????????/*?每個(gè)單元的大小?*/
#define?RUN_TIME??????????30????????????/*?運(yùn)行時(shí)間?*/
#define?DELAY_TIME_LEVELS?????5.0?????/*?周期的最大值?*/
int?fd;
time_t?end_time;
sem_t?mutex,?full,?avail;??????????????/*?3個(gè)信號(hào)量?*/
/*生產(chǎn)者線程*/
void?*producer(void?*arg)
{
?????int?real_write;
?????int?delay_time?=?0;
?????
?????while(time(NULL)?<?end_time)
?????{
??????????delay_time?=?(int)(rand()?*?DELAY_TIME_LEVELS/(RAND_MAX)?/?2.0)?+?1;
??????????sleep(delay_time);
??????????/*P操作信號(hào)量avail和mutex*/
??????????sem_wait(&avail);
??????????sem_wait(&mutex);
??????????printf("nProducer:?delay?=?%dn",?delay_time);
??????????/*生產(chǎn)者寫入數(shù)據(jù)*/
??????????if?((real_write?=?write(fd,?"hello",?UNIT_SIZE))?==?-1)
??????????{
???????????????if(errno?==?EAGAIN)
???????????????{
????????????????????printf("The?FIFO?has?not?been?read?yet.Please?try?latern");
???????????????}
??????????}
??????????else
??????????{
???????????????printf("Write?%d?to?the?FIFOn",?real_write);
??????????}
??????????
??????????/*V操作信號(hào)量full和mutex*/
??????????sem_post(&full);
??????????sem_post(&mutex);
?????}?????
?????pthread_exit(NULL);
}
/*?消費(fèi)者線程*/
void?*customer(void?*arg)
{?
?????unsigned?char?read_buffer[UNIT_SIZE];
?????int?real_read;
?????int?delay_time;
?????
?????while(time(NULL)?<?end_time)
?????{
??????????delay_time?=?(int)(rand()?*?DELAY_TIME_LEVELS/(RAND_MAX))?+?1;
??????????sleep(delay_time);
??????????/*P操作信號(hào)量full和mutex*/
??????????sem_wait(&full);
??????????sem_wait(&mutex);
??????????memset(read_buffer,?0,?UNIT_SIZE);
????????????????????printf("nCustomer:?delay?=?%dn",?delay_time);
??????????if?((real_read?=?read(fd,?read_buffer,?UNIT_SIZE))?==?-1)
??????????{
???????????????if?(errno?==?EAGAIN)
???????????????{
???????????????????printf("No?data?yetn");
???????????????}
??????????}
??????????printf("Read?%s?from?FIFOn",?read_buffer);
??????????/*V操作信號(hào)量avail和mutex*/
??????????sem_post(&avail);
??????????sem_post(&mutex);
?????}
?????pthread_exit(NULL);
}
int?main()
{
?????pthread_t?thrd_prd_id,thrd_cst_id;
?????pthread_t?mon_th_id;
?????int?ret;
?????
?????srand(time(NULL));
?????end_time?=?time(NULL)?+?RUN_TIME;
?????/*創(chuàng)建有名管道*/
?????if((mkfifo(MYFIFO,?O_CREAT|O_EXCL)?<?0)?&&?(errno?!=?EEXIST))
?????{
??????????printf("Cannot?create?fifon");
??????????return?errno;
?????}???????????????
?????/*打開(kāi)管道*/
?????fd?=?open(MYFIFO,?O_RDWR);
?????if?(fd?==?-1)
?????{
??????????printf("Open?fifo?errorn");
??????????return?fd;
?????}?????
?????/*初始化互斥信號(hào)量為1*/
?????ret?=?sem_init(&mutex,?0,?1);
?????/*初始化avail信號(hào)量為N*/
?????ret?+=?sem_init(&avail,?0,?BUFFER_SIZE);
?????/*初始化full信號(hào)量為0*/
?????ret?+=?sem_init(&full,?0,?0);
?????if?(ret?!=?0)
?????{
??????????printf("Any?semaphore?initialization?failedn");
??????????return?ret;
?????}
?????/*創(chuàng)建兩個(gè)線程*/
?????ret?=?pthread_create(&thrd_prd_id,?NULL,?producer,?NULL);
?????if?(ret?!=?0)
?????{
??????????printf("Create?producer?thread?errorn");
??????????return?ret;
?????}
?????ret?=?pthread_create(&thrd_cst_id,?NULL,?customer,?NULL);
?????if(ret?!=?0)
?????{
??????????printf("Create?customer?thread?errorn");
??????????return?ret;
?????}
?????pthread_join(thrd_prd_id,?NULL);
?????pthread_join(thrd_cst_id,?NULL);
?????close(fd);
?????unlink(MYFIFO);
?????return?0;
}
?
4.實(shí)驗(yàn)結(jié)果
運(yùn)行該程序,得到如下結(jié)果:
$?./producer_customer
……
Producer:?delay?=?3
Write?5?to?the?FIFO
Customer:?delay?=?3
Read?hello?from?FIFO
Producer:?delay?=?1
Write?5?to?the?FIFO
Producer:?delay?=?2
Write?5?to?the?FIFO
Customer:?delay?=?4
Read?hello?from?FIFO
Customer:?delay?=?1
Read?hello?from?FIFO
Producer:?delay?=?2
Write?5?to?the?FIFO
……