add threads CV code and Makefile and README
This commit is contained in:
parent
bb9c1e9ab5
commit
aa9deb93d0
|
@ -0,0 +1,22 @@
|
|||
|
||||
BINARIES = main-two-cvs-while main-two-cvs-if main-one-cv-while main-two-cvs-while-extra-unlock
|
||||
HEADERS = common.h common_threads.h main-header.h main-common.c pc-header.h
|
||||
|
||||
all: $(BINARIES)
|
||||
|
||||
clean:
|
||||
rm -f $(BINARIES)
|
||||
|
||||
main-one-cv-while: main-one-cv-while.c $(HEADERS)
|
||||
gcc -o main-one-cv-while main-one-cv-while.c -Wall -pthread
|
||||
|
||||
main-two-cvs-if: main-two-cvs-if.c $(HEADERS)
|
||||
gcc -o main-two-cvs-if main-two-cvs-if.c -Wall -pthread
|
||||
|
||||
main-two-cvs-while: main-two-cvs-while.c $(HEADERS)
|
||||
gcc -o main-two-cvs-while main-two-cvs-while.c -Wall -pthread
|
||||
|
||||
main-two-cvs-while-extra-unlock: main-two-cvs-while-extra-unlock.c $(HEADERS)
|
||||
gcc -o main-two-cvs-while-extra-unlock main-two-cvs-while-extra-unlock.c -Wall -pthread
|
||||
|
||||
|
|
@ -0,0 +1,228 @@
|
|||
|
||||
# Overview
|
||||
|
||||
This homework lets you explore some real code that uses locks and condition
|
||||
variables to implement various forms of the producer/consumer queue discussed
|
||||
in the chapter. You'll look at the real code, run it in various
|
||||
configurations, and use it to learn about what works and what doesn't, as well
|
||||
as other intricacies.
|
||||
|
||||
The different versions of the code correspond to different ways to "solve"
|
||||
the producer/consumer problem. Most are incorrect; one is correct. Read the
|
||||
chapter to learn more about what the producer/consumer problem is, and what
|
||||
the code generally does.
|
||||
|
||||
The first step is to download the code and type make to build all the
|
||||
variants. You should see four:
|
||||
- `main-one-cv-while.c`: The producer/consumer problem solved with a
|
||||
single condition variable.
|
||||
- `main-two-cvs-if.c`: Same but with two condition variables and using
|
||||
an if to check whether to sleep.
|
||||
- `main-two-cvs-while.c`: Same but with two condition variables and while to
|
||||
check whether to sleep. This is the correct version.
|
||||
- `main-two-cvs-while-extra-unlock.c`: Same but releasing the lock and
|
||||
then reacquiring it around the fill and get routines.
|
||||
|
||||
It's also useful to look at `pc-header.h` which contains common code for
|
||||
all of these different main programs, and the Makefile so as to build the
|
||||
code properly.
|
||||
|
||||
Each program takes the following flags:
|
||||
- `-l <number of items each producer produces>`
|
||||
- `-m <size of the shared producer/consumer buffer>`
|
||||
- `-p <number of producers>`
|
||||
- `-c <number of consumers>`
|
||||
- `-P <sleep string: how producer should sleep at various points>`
|
||||
- `-C <sleep string: how consumer should sleep at various points>`
|
||||
- `-v [verbose flag: trace what is happening and print it]`
|
||||
- `-t [timing flag: time entire execution and print total time]`
|
||||
|
||||
The first four arguments are relatively self-explanatory: `-l` specifies how
|
||||
many times each producer should loop (and thus how many data items each
|
||||
producer produces), `-m` controls the size of the shared buffer (greater than or
|
||||
equal to one), and `-p` and `-c` set how many producers and consumers there are,
|
||||
respectively.
|
||||
|
||||
What is more interesting are the two sleep strings, one for producers, and one
|
||||
for consumers. These flags allow you to make each thread sleep at certain
|
||||
points in an execution and thus switch to other threads; doing so allows you
|
||||
to play with each solution and perhaps pinpoint specific problems or study
|
||||
other facets of the producer/consumer problem.
|
||||
|
||||
The string is specified as follows. If there are three producers, for example,
|
||||
the string should specify sleep times for each producer separately, with a
|
||||
colon separator. The sleep string for these three producers would look
|
||||
something like this:
|
||||
|
||||
```sh
|
||||
sleep_string_for_p0:sleep_string_for_p1:sleep_string_for_p2
|
||||
```
|
||||
|
||||
Each sleep string, in turn, is a comma-separated list, which is used to
|
||||
decide how much to sleep at each sleep point in the code. To understand this
|
||||
per-producer or per-consumer sleep string better, let's look at the code in
|
||||
main-two-cvs-while.c, specifically at the producer code. In this code
|
||||
snippet, a producer thread loops for a while, putting elements into the shared
|
||||
buffer via calls to `do_fill()`. Around this filling routine are some
|
||||
waiting and signaling code to ensure the buffer is not full when the producer
|
||||
tries to fill it. See the chapter for more details.
|
||||
|
||||
```c
|
||||
void *producer(void *arg) {
|
||||
int id = (int) arg;
|
||||
int i;
|
||||
for (i = 0; i < loops; i++) { p0;
|
||||
Mutex_lock(&m); p1;
|
||||
while (num_full == max) { p2;
|
||||
Cond_wait(&empty, &m); p3;
|
||||
}
|
||||
do_fill(i); p4;
|
||||
Cond_signal(&fill); p5;
|
||||
Mutex_unlock(&m); p6;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
```
|
||||
|
||||
As you can see from the code, there are a number of points labeled p0, p1,
|
||||
etc. These points are where the code can be put to sleep. The consumer code
|
||||
(not shown here) has similar wait points (c0, etc.).
|
||||
|
||||
Specifying a sleep string for a producer is easy: just identify how long the
|
||||
producer should sleep at each point p0, p1, ..., p6. For example, the string
|
||||
`1,0,0,0,0,0,0` specifies that the producer should sleep for 1 second at marker
|
||||
p0 (before grabbing the lock), and then not at all each time through the loop.
|
||||
|
||||
Now let's show the output of running one of these programs. To begin, let's
|
||||
assume that the user runs with just one producer and one consumer. Let's not
|
||||
add any sleeping at all (this is the default behavior). The buffer
|
||||
size, in this example, is set to 2 (-m 2).
|
||||
|
||||
First, let's build the code:
|
||||
|
||||
```sh
|
||||
prompt> make
|
||||
gcc -o main-two-cvs-while main-two-cvs-while.c -Wall -pthread
|
||||
gcc -o main-two-cvs-if main-two-cvs-if.c -Wall -pthread
|
||||
gcc -o main-one-cv-while main-one-cv-while.c -Wall -pthread
|
||||
gcc -o main-two-cvs-while-extra-unlock main-two-cvs-while-extra-unlock.c -Wall -pthread
|
||||
```
|
||||
|
||||
Now we can run something:
|
||||
|
||||
```sh
|
||||
prompt> ./main-two-cvs-while -l 3 -m 2 -p 1 -c 1 -v
|
||||
```
|
||||
|
||||
In this case, if you trace the code (with the verbose flag, -v), you will get
|
||||
the following output (or something like it) on the screen:
|
||||
|
||||
```sh
|
||||
NF P0 C0
|
||||
0 [*--- --- ] p0
|
||||
0 [*--- --- ] c0
|
||||
0 [*--- --- ] p1
|
||||
1 [u 0 f--- ] p4
|
||||
1 [u 0 f--- ] p5
|
||||
1 [u 0 f--- ] p6
|
||||
1 [u 0 f--- ] p0
|
||||
1 [u 0 f--- ] c1
|
||||
0 [ --- *--- ] c4
|
||||
0 [ --- *--- ] c5
|
||||
0 [ --- *--- ] c6
|
||||
0 [ --- *--- ] c0
|
||||
0 [ --- *--- ] p1
|
||||
1 [f--- u 1 ] p4
|
||||
1 [f--- u 1 ] p5
|
||||
1 [f--- u 1 ] p6
|
||||
1 [f--- u 1 ] p0
|
||||
1 [f--- u 1 ] c1
|
||||
0 [*--- --- ] c4
|
||||
0 [*--- --- ] c5
|
||||
0 [*--- --- ] c6
|
||||
0 [*--- --- ] c0
|
||||
0 [*--- --- ] p1
|
||||
1 [u 2 f--- ] p4
|
||||
1 [u 2 f--- ] p5
|
||||
1 [u 2 f--- ] p6
|
||||
1 [u 2 f--- ] c1
|
||||
0 [ --- *--- ] c4
|
||||
0 [ --- *--- ] c5
|
||||
0 [ --- *--- ] c6
|
||||
1 [f--- uEOS ] [main: added end-of-stream marker]
|
||||
1 [f--- uEOS ] c0
|
||||
1 [f--- uEOS ] c1
|
||||
0 [*--- --- ] c4
|
||||
0 [*--- --- ] c5
|
||||
0 [*--- --- ] c6
|
||||
|
||||
Consumer consumption:
|
||||
C0 -> 3
|
||||
```
|
||||
|
||||
Before describing what is happening in this simple example, let's
|
||||
understand the depiction of the shared buffer in the output, as is
|
||||
shown on the left. At first it is empty (an empty slot indicated by
|
||||
`---`, and the two empty slots shown as `[*--- --- ]`); the output
|
||||
also shows the number of entries in the buffer (`num_full`), which
|
||||
starts at 0. Then, after P0 puts a value in it, its state changes to
|
||||
`[u 0 f--- ]` (and `num_full` changes to 1). You might also notice a
|
||||
few additional markers here; the u marker shows where the `use_ptr` is
|
||||
(this is where the next consumer to consume a value will get something
|
||||
from); similarly, the f marker shows where the fill_ptr is (this is
|
||||
where the next producer will produce a value). When you see the `*`
|
||||
marker, it just means the `use_ptr` and `fill_ptr` are pointing to the
|
||||
same slot.
|
||||
|
||||
Along the right you can see the trace of which step each producer and
|
||||
consumer is about to execute. For example, the producer grabs the lock
|
||||
(`p1`), and then, because the buffer has an empty slot, produces a
|
||||
value into it (`p4`). It then continues until the point where it
|
||||
releases the lock (`p6`) and then tries to reacquire it. In this
|
||||
example, the consumer acquires the lock instead and consumes the value
|
||||
(`c1, c4`). Study the trace further to understand how the
|
||||
producer/consumer solution works with a single producer and consumer.
|
||||
|
||||
Now let's add some pauses to change the behavior of the trace. In this case,
|
||||
let's say we want to make the producer sleep so that the consumer can run
|
||||
first. We can accomplish this as follows:
|
||||
|
||||
```sh
|
||||
prompt> ./main-two-cvs-while -l 1 -m 2 -p 1 -c 1 -P 1,0,0,0,0,0,0 -C 0 -v
|
||||
|
||||
The results:
|
||||
NF P0 C0
|
||||
0 [*--- --- ] p0
|
||||
0 [*--- --- ] c0
|
||||
0 [*--- --- ] c1
|
||||
0 [*--- --- ] c2
|
||||
0 [*--- --- ] p1
|
||||
1 [u 0 f--- ] p4
|
||||
1 [u 0 f--- ] p5
|
||||
1 [u 0 f--- ] p6
|
||||
1 [u 0 f--- ] p0
|
||||
1 [u 0 f--- ] c3
|
||||
0 [ --- *--- ] c4
|
||||
0 [ --- *--- ] c5
|
||||
0 [ --- *--- ] c6
|
||||
0 [ --- *--- ] c0
|
||||
0 [ --- *--- ] c1
|
||||
0 [ --- *--- ] c2
|
||||
...
|
||||
```
|
||||
|
||||
As you can see, the producer hits the `p0` marker in the code and then grabs
|
||||
the first value from its sleep specification, which in this case is 1, and
|
||||
thus each sleeps for 1 second before even trying to grab the lock. Thus, the
|
||||
consumer gets to run, grabs the lock, but finds the queue empty, and thus
|
||||
sleeps (releasing the lock). The producer then runs (eventually), and all
|
||||
proceeds as you might expect.
|
||||
|
||||
Do note: a sleep specification must be given for each producer and
|
||||
consumer. Thus, if you create two producers and three consumers (with
|
||||
`-p 2 -c 3`), you must specify sleep strings for each (e.g., `-P 0:1`
|
||||
or `-C 0,1,2:0:3,3,3,1,1,1`). Sleep strings can be shorter than the
|
||||
number of sleep points in the code; the remaining sleep slots are
|
||||
initialized to be zero.
|
||||
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
#ifndef __common_h__
|
||||
#define __common_h__
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#define Malloc(s) ({ void *p = malloc(s); assert(p != NULL); p; })
|
||||
#define Time_GetSeconds() ({ struct timeval t; int rc = gettimeofday(&t, NULL); assert(rc == 0); (double) t.tv_sec + (double) t.tv_usec/1e6; })
|
||||
|
||||
#endif // __common_h__
|
|
@ -0,0 +1,37 @@
|
|||
#ifndef __common_threads_h__
|
||||
#define __common_threads_h__
|
||||
|
||||
#include <pthread.h>
|
||||
#include <assert.h>
|
||||
#include <sched.h>
|
||||
|
||||
#ifdef __linux__
|
||||
#include <semaphore.h>
|
||||
#endif
|
||||
|
||||
#define Pthread_create(thread, attr, start_routine, arg) assert(pthread_create(thread, attr, start_routine, arg) == 0);
|
||||
#define Pthread_join(thread, value_ptr) assert(pthread_join(thread, value_ptr) == 0);
|
||||
|
||||
#define Pthread_mutex_init(m, v) assert(pthread_mutex_init(m, v) == 0);
|
||||
#define Pthread_mutex_lock(m) assert(pthread_mutex_lock(m) == 0);
|
||||
#define Pthread_mutex_unlock(m) assert(pthread_mutex_unlock(m) == 0);
|
||||
|
||||
#define Pthread_cond_init(cond, v) assert(pthread_cond_init(cond, v) == 0);
|
||||
#define Pthread_cond_signal(cond) assert(pthread_cond_signal(cond) == 0);
|
||||
#define Pthread_cond_wait(cond, mutex) assert(pthread_cond_wait(cond, mutex) == 0);
|
||||
|
||||
#define Mutex_init(m) assert(pthread_mutex_init(m, NULL) == 0);
|
||||
#define Mutex_lock(m) assert(pthread_mutex_lock(m) == 0);
|
||||
#define Mutex_unlock(m) assert(pthread_mutex_unlock(m) == 0);
|
||||
|
||||
#define Cond_init(cond) assert(pthread_cond_init(cond, NULL) == 0);
|
||||
#define Cond_signal(cond) assert(pthread_cond_signal(cond) == 0);
|
||||
#define Cond_wait(cond, mutex) assert(pthread_cond_wait(cond, mutex) == 0);
|
||||
|
||||
#ifdef __linux__
|
||||
#define Sem_init(sem, value) assert(sem_init(sem, 0, value) == 0);
|
||||
#define Sem_wait(sem) assert(sem_wait(sem) == 0);
|
||||
#define Sem_post(sem) assert(sem_post(sem) == 0);
|
||||
#endif // __linux__
|
||||
|
||||
#endif // __common_threads_h__
|
|
@ -0,0 +1,136 @@
|
|||
// Common usage() prints out stuff for command-line help
|
||||
void usage() {
|
||||
fprintf(stderr, "usage: \n");
|
||||
fprintf(stderr, " -l <number of items each producer produces>\n");
|
||||
fprintf(stderr, " -m <size of the shared producer/consumer buffer>\n");
|
||||
fprintf(stderr, " -p <number of producers>\n");
|
||||
fprintf(stderr, " -c <number of consumers>\n");
|
||||
fprintf(stderr, " -P <sleep string: how each producer should sleep at various points in execution>\n");
|
||||
fprintf(stderr, " -C <sleep string: how each consumer should sleep at various points in execution>\n");
|
||||
fprintf(stderr, " -v [ verbose flag: trace what is happening and print it ]\n");
|
||||
fprintf(stderr, " -t [ timing flag: time entire execution and print total time ]\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Common main() for all four programs
|
||||
// - Does arg parsing
|
||||
// - Starts producers and consumers
|
||||
// - Once producers are finished, puts END_OF_STREAM
|
||||
// marker into shared queue to signal end to consumers
|
||||
// - Then waits for consumers and prints some final info
|
||||
int main(int argc, char *argv[]) {
|
||||
loops = 1;
|
||||
max = 1;
|
||||
consumers = 1;
|
||||
producers = 1;
|
||||
|
||||
char *producer_pause_string = NULL;
|
||||
char *consumer_pause_string = NULL;
|
||||
|
||||
opterr = 0;
|
||||
int c;
|
||||
while ((c = getopt (argc, argv, "l:m:p:c:P:C:vt")) != -1) {
|
||||
switch (c) {
|
||||
case 'l':
|
||||
loops = atoi(optarg);
|
||||
break;
|
||||
case 'm':
|
||||
max = atoi(optarg);
|
||||
break;
|
||||
case 'p':
|
||||
producers = atoi(optarg);
|
||||
break;
|
||||
case 'c':
|
||||
consumers = atoi(optarg);
|
||||
break;
|
||||
case 'P':
|
||||
producer_pause_string = optarg;
|
||||
break;
|
||||
case 'C':
|
||||
consumer_pause_string = optarg;
|
||||
break;
|
||||
case 'v':
|
||||
do_trace = 1;
|
||||
break;
|
||||
case 't':
|
||||
do_timing = 1;
|
||||
break;
|
||||
default:
|
||||
usage();
|
||||
}
|
||||
}
|
||||
|
||||
assert(loops > 0);
|
||||
assert(max > 0);
|
||||
assert(producers <= MAX_THREADS);
|
||||
assert(consumers <= MAX_THREADS);
|
||||
|
||||
if (producer_pause_string != NULL)
|
||||
parse_pause_string(producer_pause_string, "producers", producers, producer_pause_times);
|
||||
if (consumer_pause_string != NULL)
|
||||
parse_pause_string(consumer_pause_string, "consumers", consumers, consumer_pause_times);
|
||||
|
||||
// make space for shared buffer, and init it ...
|
||||
buffer = (int *) Malloc(max * sizeof(int));
|
||||
int i;
|
||||
for (i = 0; i < max; i++) {
|
||||
buffer[i] = EMPTY;
|
||||
}
|
||||
|
||||
do_print_headers();
|
||||
|
||||
double t1 = Time_GetSeconds();
|
||||
|
||||
// start up all threads; order doesn't matter here
|
||||
pthread_t pid[MAX_THREADS], cid[MAX_THREADS];
|
||||
int thread_id = 0;
|
||||
for (i = 0; i < producers; i++) {
|
||||
Pthread_create(&pid[i], NULL, producer, (void *) (long long) thread_id);
|
||||
thread_id++;
|
||||
}
|
||||
for (i = 0; i < consumers; i++) {
|
||||
Pthread_create(&cid[i], NULL, consumer, (void *) (long long) thread_id);
|
||||
thread_id++;
|
||||
}
|
||||
|
||||
// now wait for all PRODUCERS to finish
|
||||
for (i = 0; i < producers; i++) {
|
||||
Pthread_join(pid[i], NULL);
|
||||
}
|
||||
|
||||
// end case: when producers are all done
|
||||
// - put "consumers" number of END_OF_STREAM's in queue
|
||||
// - when consumer sees -1, it exits
|
||||
for (i = 0; i < consumers; i++) {
|
||||
Mutex_lock(&m);
|
||||
while (num_full == max)
|
||||
Cond_wait(empty_cv, &m);
|
||||
do_fill(END_OF_STREAM);
|
||||
do_eos();
|
||||
Cond_signal(fill_cv);
|
||||
Mutex_unlock(&m);
|
||||
}
|
||||
|
||||
// now OK to wait for all consumers
|
||||
int counts[consumers];
|
||||
for (i = 0; i < consumers; i++) {
|
||||
Pthread_join(cid[i], (void *) &counts[i]);
|
||||
}
|
||||
|
||||
double t2 = Time_GetSeconds();
|
||||
|
||||
if (do_trace) {
|
||||
printf("\nConsumer consumption:\n");
|
||||
for (i = 0; i < consumers; i++) {
|
||||
printf(" C%d -> %d\n", i, counts[i]);
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
if (do_timing) {
|
||||
printf("Total time: %.2f seconds\n", t2-t1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,171 @@
|
|||
#ifndef __main_header_h__
|
||||
#define __main_header_h__
|
||||
|
||||
// all this is for the homework side of things
|
||||
|
||||
int do_trace = 0;
|
||||
int do_timing = 0;
|
||||
|
||||
#define p0 do_pause(id, 1, 0, "p0");
|
||||
#define p1 do_pause(id, 1, 1, "p1");
|
||||
#define p2 do_pause(id, 1, 2, "p2");
|
||||
#define p3 do_pause(id, 1, 3, "p3");
|
||||
#define p4 do_pause(id, 1, 4, "p4");
|
||||
#define p5 do_pause(id, 1, 5, "p5");
|
||||
#define p6 do_pause(id, 1, 6, "p6");
|
||||
|
||||
#define c0 do_pause(id, 0, 0, "c0");
|
||||
#define c1 do_pause(id, 0, 1, "c1");
|
||||
#define c2 do_pause(id, 0, 2, "c2");
|
||||
#define c3 do_pause(id, 0, 3, "c3");
|
||||
#define c4 do_pause(id, 0, 4, "c4");
|
||||
#define c5 do_pause(id, 0, 5, "c5");
|
||||
#define c6 do_pause(id, 0, 6, "c6");
|
||||
|
||||
int producer_pause_times[MAX_THREADS][7];
|
||||
int consumer_pause_times[MAX_THREADS][7];
|
||||
|
||||
// needed to avoid interleaving of print out from threads
|
||||
pthread_mutex_t print_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
void do_print_headers() {
|
||||
if (do_trace == 0)
|
||||
return;
|
||||
int i;
|
||||
printf("%3s ", "NF");
|
||||
for (i = 0; i < max; i++) {
|
||||
printf(" %3s ", " ");
|
||||
}
|
||||
printf(" ");
|
||||
|
||||
for (i = 0; i < producers; i++)
|
||||
printf("P%d ", i);
|
||||
for (i = 0; i < consumers; i++)
|
||||
printf("C%d ", i);
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
void do_print_pointers(int index) {
|
||||
if (use_ptr == index && fill_ptr == index) {
|
||||
printf("*");
|
||||
} else if (use_ptr == index) {
|
||||
printf("u");
|
||||
} else if (fill_ptr == index) {
|
||||
printf("f");
|
||||
} else {
|
||||
printf(" ");
|
||||
}
|
||||
}
|
||||
|
||||
void do_print_buffer() {
|
||||
int i;
|
||||
printf("%3d [", num_full);
|
||||
for (i = 0; i < max; i++) {
|
||||
do_print_pointers(i);
|
||||
if (buffer[i] == EMPTY) {
|
||||
printf("%3s ", "---");
|
||||
} else if (buffer[i] == END_OF_STREAM) {
|
||||
printf("%3s ", "EOS");
|
||||
} else {
|
||||
printf("%3d ", buffer[i]);
|
||||
}
|
||||
}
|
||||
printf("] ");
|
||||
}
|
||||
|
||||
void do_eos() {
|
||||
if (do_trace) {
|
||||
Mutex_lock(&print_lock);
|
||||
do_print_buffer();
|
||||
//printf("%3d [added end-of-stream marker]\n", num_full);
|
||||
printf("[main: added end-of-stream marker]\n");
|
||||
Mutex_unlock(&print_lock);
|
||||
}
|
||||
}
|
||||
|
||||
void do_pause(int thread_id, int is_producer, int pause_slot, char *str) {
|
||||
int i;
|
||||
if (do_trace) {
|
||||
Mutex_lock(&print_lock);
|
||||
do_print_buffer();
|
||||
|
||||
// skip over other thread's spots
|
||||
for (i = 0; i < thread_id; i++) {
|
||||
printf(" ");
|
||||
}
|
||||
printf("%s\n", str);
|
||||
Mutex_unlock(&print_lock);
|
||||
}
|
||||
|
||||
int local_id = thread_id;
|
||||
int pause_time;
|
||||
if (is_producer) {
|
||||
pause_time = producer_pause_times[local_id][pause_slot];
|
||||
} else {
|
||||
local_id = thread_id - producers;
|
||||
pause_time = consumer_pause_times[local_id][pause_slot];
|
||||
}
|
||||
// printf(" PAUSE %d\n", pause_time);
|
||||
sleep(pause_time);
|
||||
}
|
||||
|
||||
void ensure(int expression, char *msg) {
|
||||
if (expression == 0) {
|
||||
fprintf(stderr, "%s\n", msg);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
void parse_pause_string(char *str, char *name, int expected_pieces,
|
||||
int pause_array[MAX_THREADS][7]) {
|
||||
|
||||
// string looks like this (or should):
|
||||
// 1,2,0:2,3,4,5
|
||||
// n-1 colons if there are n producers/consumers
|
||||
// comma-separated for sleep amounts per producer or consumer
|
||||
int index = 0;
|
||||
|
||||
char *copy_entire = strdup(str);
|
||||
char *outer_marker;
|
||||
int colon_count = 0;
|
||||
char *p = strtok_r(copy_entire, ":", &outer_marker);
|
||||
while (p) {
|
||||
// init array: default sleep is 0
|
||||
int i;
|
||||
for (i = 0; i < 7; i++)
|
||||
pause_array[index][i] = 0;
|
||||
|
||||
// for each piece, comma separated
|
||||
char *inner_marker;
|
||||
char *copy_piece = strdup(p);
|
||||
char *c = strtok_r(copy_piece, ",", &inner_marker);
|
||||
int comma_count = 0;
|
||||
|
||||
int inner_index = 0;
|
||||
while (c) {
|
||||
int pause_amount = atoi(c);
|
||||
ensure(inner_index < 7, "you specified a sleep string incorrectly... (too many comma-separated args)");
|
||||
// printf("setting %s pause %d to %d\n", name, inner_index, pause_amount);
|
||||
pause_array[index][inner_index] = pause_amount;
|
||||
inner_index++;
|
||||
|
||||
c = strtok_r(NULL, ",", &inner_marker);
|
||||
comma_count++;
|
||||
}
|
||||
free(copy_piece);
|
||||
index++;
|
||||
|
||||
// continue with colon separated list
|
||||
p = strtok_r(NULL, ":", &outer_marker);
|
||||
colon_count++;
|
||||
}
|
||||
|
||||
free(copy_entire);
|
||||
if (expected_pieces != colon_count) {
|
||||
fprintf(stderr, "Error: expected %d %s in sleep specification, got %d\n", expected_pieces, name, colon_count);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif // __main_header_h__
|
|
@ -0,0 +1,80 @@
|
|||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/time.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "common_threads.h"
|
||||
|
||||
#include "pc-header.h"
|
||||
|
||||
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
#include "main-header.h"
|
||||
|
||||
void do_fill(int value) {
|
||||
// ensure empty before usage
|
||||
ensure(buffer[fill_ptr] == EMPTY, "error: tried to fill a non-empty buffer");
|
||||
buffer[fill_ptr] = value;
|
||||
fill_ptr = (fill_ptr + 1) % max;
|
||||
num_full++;
|
||||
}
|
||||
|
||||
int do_get() {
|
||||
int tmp = buffer[use_ptr];
|
||||
ensure(tmp != EMPTY, "error: tried to get an empty buffer");
|
||||
buffer[use_ptr] = EMPTY;
|
||||
use_ptr = (use_ptr + 1) % max;
|
||||
num_full--;
|
||||
return tmp;
|
||||
}
|
||||
|
||||
void *producer(void *arg) {
|
||||
int id = (int) arg;
|
||||
// make sure each producer produces unique values
|
||||
int base = id * loops;
|
||||
int i;
|
||||
for (i = 0; i < loops; i++) { p0;
|
||||
Mutex_lock(&m); p1;
|
||||
while (num_full == max) { p2;
|
||||
Cond_wait(&cv, &m); p3;
|
||||
}
|
||||
do_fill(base + i); p4;
|
||||
Cond_signal(&cv); p5;
|
||||
Mutex_unlock(&m); p6;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *consumer(void *arg) {
|
||||
int id = (int) arg;
|
||||
int tmp = 0;
|
||||
int consumed_count = 0;
|
||||
while (tmp != END_OF_STREAM) { c0;
|
||||
Mutex_lock(&m); c1;
|
||||
while (num_full == 0) { c2;
|
||||
Cond_wait(&cv, &m); c3;
|
||||
}
|
||||
tmp = do_get(); c4;
|
||||
Cond_signal(&cv); c5;
|
||||
Mutex_unlock(&m); c6;
|
||||
consumed_count++;
|
||||
}
|
||||
|
||||
// return consumer_count-1 because END_OF_STREAM does not count
|
||||
return (void *) (long long) (consumed_count - 1);
|
||||
}
|
||||
|
||||
// must set these appropriately to use "main-common.c"
|
||||
pthread_cond_t *fill_cv = &cv;
|
||||
pthread_cond_t *empty_cv = &cv;
|
||||
|
||||
// all codes use this common base to start producers/consumers
|
||||
// and all the other related stuff
|
||||
#include "main-common.c"
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/time.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "common_threads.h"
|
||||
#include "pc-header.h"
|
||||
|
||||
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
|
||||
pthread_cond_t fill = PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
#include "main-header.h"
|
||||
|
||||
void do_fill(int value) {
|
||||
// ensure empty before usage
|
||||
ensure(buffer[fill_ptr] == EMPTY, "error: tried to fill a non-empty buffer");
|
||||
buffer[fill_ptr] = value;
|
||||
fill_ptr = (fill_ptr + 1) % max;
|
||||
num_full++;
|
||||
}
|
||||
|
||||
int do_get() {
|
||||
int tmp = buffer[use_ptr];
|
||||
ensure(tmp != EMPTY, "error: tried to get an empty buffer");
|
||||
buffer[use_ptr] = EMPTY;
|
||||
use_ptr = (use_ptr + 1) % max;
|
||||
num_full--;
|
||||
return tmp;
|
||||
}
|
||||
|
||||
void *producer(void *arg) {
|
||||
int id = (int) arg;
|
||||
// make sure each producer produces unique values
|
||||
int base = id * loops;
|
||||
int i;
|
||||
for (i = 0; i < loops; i++) { p0;
|
||||
Mutex_lock(&m); p1;
|
||||
if (num_full == max) { p2;
|
||||
Cond_wait(&empty, &m); p3;
|
||||
}
|
||||
do_fill(base + i); p4;
|
||||
Cond_signal(&fill); p5;
|
||||
Mutex_unlock(&m); p6;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *consumer(void *arg) {
|
||||
int id = (int) arg;
|
||||
int tmp = 0;
|
||||
int consumed_count = 0;
|
||||
while (tmp != END_OF_STREAM) { c0;
|
||||
Mutex_lock(&m); c1;
|
||||
if (num_full == 0) { c2;
|
||||
Cond_wait(&fill, &m); c3;
|
||||
}
|
||||
tmp = do_get(); c4;
|
||||
Cond_signal(&empty); c5;
|
||||
Mutex_unlock(&m); c6;
|
||||
consumed_count++;
|
||||
}
|
||||
|
||||
// return consumer_count-1 because END_OF_STREAM does not count
|
||||
return (void *) (long long) (consumed_count - 1);
|
||||
}
|
||||
|
||||
// must set these appropriately to use "main-common.c"
|
||||
pthread_cond_t *fill_cv = &fill;
|
||||
pthread_cond_t *empty_cv = ∅
|
||||
|
||||
// all codes use this common base to start producers/consumers
|
||||
// and all the other related stuff
|
||||
#include "main-common.c"
|
||||
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/time.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "common_threads.h"
|
||||
|
||||
#include "pc-header.h"
|
||||
|
||||
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
|
||||
pthread_cond_t fill = PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
#include "main-header.h"
|
||||
|
||||
void do_fill(int value) {
|
||||
// ensure empty before usage
|
||||
ensure(buffer[fill_ptr] == EMPTY, "error: tried to fill a non-empty buffer");
|
||||
buffer[fill_ptr] = value;
|
||||
fill_ptr = (fill_ptr + 1) % max;
|
||||
num_full++;
|
||||
}
|
||||
|
||||
int do_get() {
|
||||
int tmp = buffer[use_ptr];
|
||||
ensure(tmp != EMPTY, "error: tried to get an empty buffer");
|
||||
buffer[use_ptr] = EMPTY;
|
||||
use_ptr = (use_ptr + 1) % max;
|
||||
num_full--;
|
||||
return tmp;
|
||||
}
|
||||
|
||||
void *producer(void *arg) {
|
||||
int id = (int) arg;
|
||||
// make sure each producer produces unique values
|
||||
int base = id * loops;
|
||||
int i;
|
||||
for (i = 0; i < loops; i++) { p0;
|
||||
Mutex_lock(&m); p1;
|
||||
while (num_full == max) { p2;
|
||||
Cond_wait(&empty, &m); p3;
|
||||
}
|
||||
Mutex_unlock(&m);
|
||||
do_fill(base + i); p4;
|
||||
Mutex_lock(&m);
|
||||
Cond_signal(&fill); p5;
|
||||
Mutex_unlock(&m); p6;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *consumer(void *arg) {
|
||||
int id = (int) arg;
|
||||
int tmp = 0;
|
||||
int consumed_count = 0;
|
||||
while (tmp != END_OF_STREAM) { c0;
|
||||
Mutex_lock(&m); c1;
|
||||
while (num_full == 0) { c2;
|
||||
Cond_wait(&fill, &m); c3;
|
||||
}
|
||||
Mutex_unlock(&m);
|
||||
tmp = do_get(); c4;
|
||||
Mutex_lock(&m);
|
||||
Cond_signal(&empty); c5;
|
||||
Mutex_unlock(&m); c6;
|
||||
consumed_count++;
|
||||
}
|
||||
|
||||
// return consumer_count-1 because END_OF_STREAM does not count
|
||||
return (void *) (long long) (consumed_count - 1);
|
||||
}
|
||||
|
||||
// must set these appropriately to use "main-common.c"
|
||||
pthread_cond_t *fill_cv = &fill;
|
||||
pthread_cond_t *empty_cv = ∅
|
||||
|
||||
// all codes use this common base to start producers/consumers
|
||||
// and all the other related stuff
|
||||
#include "main-common.c"
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/time.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "common_threads.h"
|
||||
|
||||
#include "pc-header.h"
|
||||
|
||||
// used in producer/consumer signaling protocol
|
||||
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
|
||||
pthread_cond_t fill = PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
#include "main-header.h"
|
||||
|
||||
void do_fill(int value) {
|
||||
// ensure empty before usage
|
||||
ensure(buffer[fill_ptr] == EMPTY, "error: tried to fill a non-empty buffer");
|
||||
buffer[fill_ptr] = value;
|
||||
fill_ptr = (fill_ptr + 1) % max;
|
||||
num_full++;
|
||||
}
|
||||
|
||||
int do_get() {
|
||||
int tmp = buffer[use_ptr];
|
||||
ensure(tmp != EMPTY, "error: tried to get an empty buffer");
|
||||
buffer[use_ptr] = EMPTY;
|
||||
use_ptr = (use_ptr + 1) % max;
|
||||
num_full--;
|
||||
return tmp;
|
||||
}
|
||||
|
||||
void *producer(void *arg) {
|
||||
int id = (int) arg;
|
||||
// make sure each producer produces unique values
|
||||
int base = id * loops;
|
||||
int i;
|
||||
for (i = 0; i < loops; i++) { p0;
|
||||
Mutex_lock(&m); p1;
|
||||
while (num_full == max) { p2;
|
||||
Cond_wait(&empty, &m); p3;
|
||||
}
|
||||
do_fill(base + i); p4;
|
||||
Cond_signal(&fill); p5;
|
||||
Mutex_unlock(&m); p6;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *consumer(void *arg) {
|
||||
int id = (int) arg;
|
||||
int tmp = 0;
|
||||
int consumed_count = 0;
|
||||
while (tmp != END_OF_STREAM) { c0;
|
||||
Mutex_lock(&m); c1;
|
||||
while (num_full == 0) { c2;
|
||||
Cond_wait(&fill, &m); c3;
|
||||
}
|
||||
tmp = do_get(); c4;
|
||||
Cond_signal(&empty); c5;
|
||||
Mutex_unlock(&m); c6;
|
||||
consumed_count++;
|
||||
}
|
||||
|
||||
// return consumer_count-1 because END_OF_STREAM does not count
|
||||
return (void *) (long long) (consumed_count - 1);
|
||||
}
|
||||
|
||||
// must set these appropriately to use "main-common.c"
|
||||
pthread_cond_t *fill_cv = &fill;
|
||||
pthread_cond_t *empty_cv = ∅
|
||||
|
||||
// all codes use this common base to start producers/consumers
|
||||
// and all the other related stuff
|
||||
#include "main-common.c"
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
#ifndef __pc_header_h__
|
||||
#define __pc_header_h__
|
||||
|
||||
#define MAX_THREADS (100) // maximum number of producers/consumers
|
||||
|
||||
int producers = 1; // number of producers
|
||||
int consumers = 1; // number of consumers
|
||||
|
||||
int *buffer; // the buffer itself: malloc in main()
|
||||
int max; // size of the producer/consumer buffer
|
||||
|
||||
int use_ptr = 0; // tracks where next consume should come from
|
||||
int fill_ptr = 0; // tracks where next produce should go to
|
||||
int num_full = 0; // counts how many entries are full
|
||||
|
||||
int loops; // number of items that each producer produces
|
||||
|
||||
#define EMPTY (-2) // buffer slot has nothing in it
|
||||
#define END_OF_STREAM (-1) // consumer who grabs this should exit
|
||||
|
||||
#endif // __pc_header_h__
|
||||
|
Loading…
Reference in New Issue