In: Computer Science
Research and Program Producer-Consumer Problem
In this assignment you will design a programming solution to the bounded-buffer problem using the producer and consumer processes shown in the lecture notes. The solution presented in the notes uses three semaphores: empty and full, which count the number of empty and full slots in the buffer, and mutex, which is a binary semaphore that protects the actual insertion or removal of items in the buffer. For this assignment, standard counting semaphores will be used for empty and full, Use a mutex lock, rather than a binary semaphore, to represent the mutex. The producer and consumer—running as separate threads will move items to and from a buffer that is synchronized with these empty, full, and mutex structures.
The buffer
Internally, the buffer will consist of a fixed-size array of type buffer item (which will be defined using typedef). The array of buffer _item objects will be manipulated as a circular queue.
The buffer will be manipulated with two functions, insert_item() and remove_item(), which are called by the producer and consumer threads, respectively.
The insert_item() and remove_item) functions will synchronize the producer and consumer using the algorithms outlined in the class lecture. The buffer will also require an initialization function that initializes the mutual exclusion object mutex along with the empty and full semaphores.
The main() function will initialize the buffer and create the separate producer and consumer threads. Once it has created the producer and consumer threads, the main() function will sleep for a period of time and upon awakening, will terminate the application. The main() function will be passed three parameters on the command line:
a. How long to sleep before terminating
b. The number of producer threads
c. The number of consumer threads
Producer and Consumer Threads
The producer thread will alternate between sleeping for a random period of time and inserting a random integer into the buffer. Random numbers will be produced using the rand() function, which produces random integers between 0 and RAND_MAX. The consumer will also sleep for a random period of time and, upon wakening, will attempt to remove an item from the buffer.
Mutex Locks
The code should have commands to create, acquire, and release mutex locks.
Assumption: You will use the book and lecture notes for base info to complete the program. You will conduct your own research to fill in any knowledge holes.
Use POSIX Pthreads.
Take an example that a circular buffer with two pointers in and out to indicate the next available position for depositing data and the position that contains the next data to be retrieved. See the diagram below. There are two groups of threads, producers and consumers. Each producer deposits a data items into thein position and advances the pointer in, and each consumer retrieves the data item in position out and advances the pointer out.
producer will produce an item and place it in a bound-buffer for the consumer. Then the consumer will remove the item from the buffer and print it to the screen.
bound-buffer it is a container with a limit. We have to be very careful in our case that we don’t over fill the buffer or remove something that isn’t there; in c this will produce a segmentation fault.
Here is an example of how registers work when you increment a counter-
register1 = counter;
register1 = register1 + 1;
counter = register1;
Now image two threads manipulating this same example but one thread is decrementing –
(T1) register1 = counter; [register1 = 5]
(T1) register1 = register1 + 1; [register1 = 6]
(T2) register2 = counter; [register2 = 5]
(T2) register2 = register2 – 1; [register2 = 4]
(T1) counter = register1; [counter = 6]
(T2) counter = register2; [counter = 4]
Create the producer threads.
Create the consumer threads.
Put main() to sleep().
Exit the program.
If you are kind of rusty on how pthreads work, I have a previous tutorial that may be some help in understanding them:
Prime numbers using POSIX threads on Linux in [C]
That will give you a clearer picture on how to create a pthread.
Time for the code…
CODE:
/* buffer.h */
typedef int buffer_item;
#define BUFFER_SIZE 5
/* main.c */
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include "buffer.h"
#define RAND_DIVISOR 100000000
#define TRUE 1
/* The mutex lock */
pthread_mutex_t mutex;
/* the semaphores */
sem_t full, empty;
/* the buffer */
buffer_item buffer[BUFFER_SIZE];
/* buffer counter */
int counter;
pthread_t tid; //Thread ID
pthread_attr_t attr; //Set of thread attributes
void *producer(void *param); /* the producer thread */
void *consumer(void *param); /* the consumer thread */
void initializeData() {
/* Create the mutex lock */
pthread_mutex_init(&mutex, NULL);
/* Create the full semaphore and initialize to 0 */
sem_init(&full, 0, 0);
/* Create the empty semaphore and initialize to BUFFER_SIZE */
sem_init(&empty, 0, BUFFER_SIZE);
/* Get the default attributes */
pthread_attr_init(&attr);
/* init buffer */
counter = 0;
}
/* Producer Thread */
void *producer(void *param) {
buffer_item item;
while(TRUE) {
/* sleep for a random period of time */
int rNum = rand() / RAND_DIVISOR;
sleep(rNum);
/* generate a random number */
item = rand();
/* acquire the empty lock */
sem_wait(&empty);
/* acquire the mutex lock */
pthread_mutex_lock(&mutex);
if(insert_item(item)) {
fprintf(stderr, " Producer report error condition\n");
}
else {
printf("producer produced %d\n", item);
}
/* release the mutex lock */
pthread_mutex_unlock(&mutex);
/* signal full */
sem_post(&full);
}
}
/* Consumer Thread */
void *consumer(void *param) {
buffer_item item;
while(TRUE) {
/* sleep for a random period of time */
int rNum = rand() / RAND_DIVISOR;
sleep(rNum);
/* aquire the full lock */
sem_wait(&full);
/* aquire the mutex lock */
pthread_mutex_lock(&mutex);
if(remove_item(&item)) {
fprintf(stderr, "Consumer report error condition\n");
}
else {
printf("consumer consumed %d\n", item);
}
/* release the mutex lock */
pthread_mutex_unlock(&mutex);
/* signal empty */
sem_post(&empty);
}
}
/* Add an item to the buffer */
int insert_item(buffer_item item) {
/* When the buffer is not full add the item
and increment the counter*/
if(counter < BUFFER_SIZE) {
buffer[counter] = item;
counter++;
return 0;
}
else { /* Error the buffer is full */
return -1;
}
}
/* Remove an item from the buffer */
int remove_item(buffer_item *item) {
/* When the buffer is not empty remove the item
and decrement the counter */
if(counter > 0) {
*item = buffer[(counter-1)];
counter--;
return 0;
}
else { /* Error buffer empty */
return -1;
}
}
int main(int argc, char *argv[]) {
/* Loop counter */
int i;
/* Verify the correct number of arguments were passed in */
if(argc != 4) {
fprintf(stderr, "USAGE:./main.out <INT> <INT> <INT>\n");
}
int mainSleepTime = atoi(argv[1]); /* Time in seconds for main to sleep */
int numProd = atoi(argv[2]); /* Number of producer threads */
int numCons = atoi(argv[3]); /* Number of consumer threads */
/* Initialize the app */
initializeData();
/* Create the producer threads */
for(i = 0; i < numProd; i++) {
/* Create the thread */
pthread_create(&tid,&attr,producer,NULL);
}
/* Create the consumer threads */
for(i = 0; i < numCons; i++) {
/* Create the thread */
pthread_create(&tid,&attr,consumer,NULL);
}
/* Sleep for the specified amount of time in milliseconds */
sleep(mainSleepTime);
/* Exit the program */
printf("Exit the program\n");
exit(0);
}
Here, we have the output for the program. As you can see we told main to quit after 10 seconds and we produced 10 producer and 10 consumer threads.
You can try this also
#include <iostream>
#include "ProducerConsumer.h"
// static data variable
static int Buffer[BUFFER_SIZE]; // the buffer
static int In = 0; // next empty slot in the buffer
static int Out = 0; // next available data slot
static Semaphore NotFull("NotFull", BUFFER_SIZE);
static Semaphore NotEmpty("NotEmpty", 0);
static Semaphore BufferLock("BufferLock", 1); // lock protecting the buffer
strstream *Filler(int n)
{
int i;
strstream *Space;
Space = new strstream;
for (i = 0; i < n; i++)
(*Space) << ' ';
(*Space) << '\0';
return Space;
}
ProducerThread::ProducerThread(int No, int numberofdata)
: Number(No), NumberOfData(numberofdata)
{
ThreadName.seekp(0, ios::beg);
ThreadName << "Producer" << No << '\0';
};
/ConsumerThread::ConsumerThread(int No)
: Number(No)
{
ThreadName.seekp(0, ios::beg);
ThreadName << "Consumer" << No << '\0';
}
void ProducerThread::ThreadFunc()
{
Thread::ThreadFunc();
int data;
strstream *Space;
Space=Filler(4);
for (int i = 1; i <= NumberOfData; i++) {
Delay();
NotFull.Wait(); // wait until the buffer has space
BufferLock.Wait(); // lock the buffer
data = rand() % 100 + 1000 * Number; // generate data
Buffer[In] = data; // add data to the buffer
cout << Space->str() << ThreadName.str() << " deposited "
<< data << " to the buffer" << endl;
In = (In + 1) % BUFFER_SIZE; // advance input pointer
BufferLock.Signal(); // release the buffer
NotEmpty.Signal(); // buffer is not full now
}
Delay(); // about to add END
NotFull.Wait(); // wait until the buffer has space
BufferLock.Wait(); // lock the buffer
Buffer[In] = END; // put the END message in
cout << Space->str() << ThreadName.str()
<< " deposited END and Exit" << endl;
In = (In + 1) % BUFFER_SIZE; // advance in pointer
BufferLock.Signal(); // release the buffer
NotEmpty.Signal(); // buffer is not full
Exit();
}
void ConsumerThread::ThreadFunc()
{
Thread::ThreadFunc();
int data = 0 ;
strstream *Space;
Space=Filler(2);
while (true) {
Delay();
NotEmpty.Wait(); // wait until the buffer has data
BufferLock.Wait(); // lock the buffer
data = Buffer[Out]; // get a data item from the buffer
if (data != END) { // If it is not "END"
cout << Space->str() << ThreadName.str()<< " received "
<< data << " from the buffer" << endl;
Out = (Out + 1) % BUFFER_SIZE; // advance buffer pointer
BufferLock.Signal(); // unlock the buffer
NotFull.Signal(); // buffer is not full
}
else {
cout << Space->str() << ThreadName.str()
<< " received END and exits" << endl;
Out = (Out + 1) % BUFFER_SIZE;
BufferLock.Signal();
NotFull.Signal();
break;
}
}
Exit();
}