Even when Condition Variables are a constituent part of a Monitor, they can be used independently. Actually, the POSIX threads library that we are using in this series of articles does not include the concept of Monitor, but it provides condition variables.
Producer/Consumer
To illustrate the use of condition variables we are going to use the, maybe, most classical example possible: The Producer/Consumer problem using a bounded buffer.
In this problem, one or more Producers generate some data, out of some processing (that doesn't really matter, it could be anything) and stores that data into a buffer. Simultaneously, one or more Consumers read these data items from the buffer to perform some further action with it.
The buffer used to interchange this data is a shared resource that needs to be protected and it also has a limited size. This means:
- When the buffer is
FULL
, Producers have to wait as there is no room to store any further data into the buffer - When the buffer is
EMPTY
, Consumers have to wait until new data is added to the buffer.
In practice, the buffer is either implemented as a stack or a circular buffer, so storing and retrieving of data is straightforward. We will use the stack approach in the coming examples.
I have used this code many times in network applications. A general server program, just accepts connections (potentially in multiple ports) and then push all the messages received into one or more queues (sometimes you need some priority management as some messages have to be processed faster than others... ABORT
, RESET
....). Then the actual server processing can be done by multiple Worker Threads (the consumers), that just get messages from the shared buffer and do their stuff.... Alternatively, another buffer to send replies back to the network client may reproduce the same schema but following the backwards path.
Producer/Consumer main program
Before introducing the condition variables, and in the aim of better understand the difference with other synchronisation mechanism, let's start implementing the producer/consumer problem with mutexes. Then let's modify it to use condition variables and later analyse the differences.
The general part of the program is shown below:
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#define RUN_TIME 20
#define MAX_SIZE 64
#define MAX_THREADS 8
#define MAX_PRODUCERS 6
/* Shared buffer */
int shared_buf[MAX_SIZE];
int n = 0;
int n_transactions = 0;
void *producer (void *arg) {
while (1) { (...) }
}
void *consumer (void *arg) {
while (1) {
(...)
n_transactions++;
}
}
int main () {
pthread_t tid[MAX_THREADS];
int i;
srand (time(NULL));
/* Launch threads */
for (i = 0; i < MAX_THREADS; i++)
pthread_create (&tid[i], NULL, (i > MAX_PRODUCERS - 1? consumer : producer), (void*)i);
sleep (RUN_TIME);
printf ("Buffer size: %d || %d Producers || %d Consumers\n",
MAX_SIZE, MAX_PRODUCERS, MAX_THREADS-MAX_PRODUCERS);
printf ("%d transactions processed\n", n_transactions);
return 0;
}
The code above is common to both implementation. It just declares the buffer and a counter to keep track of the number of items in the buffer, then it launches a predefined number of Producer Threads (MAX_PRODUCERS
) and of Consumer Threads.
Each consumer will increase the number of transactions performed so we can have some throughput measurement.
Finally, the main thread will just let the consumers and producers run for a predetermined time RUN_TIME
secs, and show the results. Nothing really special.
Mutex Implementation
For the mutex implementation we will just protect the buffer with a mutex. Each thread, either Producer or Consumer will have to lock the access to the buffer before performing any action on it.
Then, the Producer's first step is to check if there is room in the buffer to store its data. If not, it has to release the mutex hoping that some Consumer consumes some data and a slot becomes available in the buffer.
Same happens to the Consumer, but in this case, the condition to wait is that the buffer is empty and therefore there is no data to consume. With these details, a potential implementation will look like this:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *producer (void *arg) {
int id = (int)arg;
while (1) {
pthread_mutex_lock (&mutex); // Lock the resource
while (n == MAX_SIZE - 1) { // If Buffer is full
pthread_mutex_unlock (&mutex);
usleep (0); // Make sure the CPU is released
pthread_mutex_lock (&mutex);
}
// Store the value
shared_buf[n] = rand () % 100;
printf ("\033[32m Producer %d : %d (%d items)\033[m\n", id, shared_buf[n], n+1);
n++;
pthread_mutex_unlock (&mutex);
// We are done.... Now we produce next value that may take 100 usecs
usleep (100);
}
}
void *consumer (void *arg) {
int id = (int)arg;
while (1) {
pthread_mutex_lock (&mutex);
while (n == 0) { // If buffer is empty we need to wait
pthread_mutex_unlock (&mutex);
usleep (0);
pthread_mutex_lock (&mutex);
}
n_transactions ++;
n--;
printf ("\033[31m Consumer %d : %d (%d items-%d)\033[m\n", id, shared_buf[n], n, n_transactions);
pthread_mutex_unlock (&mutex);
// Whatever the consumer does with the value requires 100usec
usleep (100);
}
}
No surprises here. Basically we first need to get mutual exclusion access to the resource, then wait for a condition, and finally do our stuff. If we run this program, we will get an output similar to this (remember that exact values changes depending on your HW, system load, cache state, etc...)
Yes, we added colours to make it look cooler :)
$ /usr/bin/time -v ./test01 Buffer size: 64 || 6 Producers || 2 Consumers 240300 transactions processed Command being timed: "./test01" User time (seconds): 4.61 System time (seconds): 12.35 Percent of CPU this job got: 84%
This implementation work pretty well but the CPU usage looks a bit high. Before continuing let's make a small diversion regarding the mysterious usleep (0)
in the code.
Testing Artefacts
Concurrent programming is tricky because things change very much from simple tests like this to real applications and sometime we believe that something works in a determined way, but that is not really the case, it is just a testing artefact.
In our current case, our test is very simplistic. Let's look at the pseudo active wait at the beginning of the Consumer and Producer threads. That codes runs very fast. Which in other words means that, the thread will release the mutex and acquire it again in its current CPU quantum and therefore will be impossible to change the condition... because no Consumer/Producer will ever have a chance to change the buffer status, until the current thread quantum expires and a context switch is forced.
NOTE:
In multi-process OSes, processes and threads are scheduled by the so-called scheduler. Preemptive multiprocessing can be implemented in many different ways, but in general all implementation assigns a minimal time to each process that is known as quantum or time slice. This time has a minimal value in order to prevent starvation.Stopping a process/thread and starting another one has a cost. This is usually known as context switching. The kernel has to change internal structures to reflect the new state and even bring back memory pages from disk. If the time the new process/thread is going to run (the quantum) is similar or less than the context switching, the OS will spend all the time just switching context and will not be able to give any time to the process to run.... This is known as Starvation. The CPU is working non-stop but no real processing is performed.
On the other hand, whenever a process gets it quantum, it will use the CPU for all that time unless a it gives it away, by starting an IO operation or running a syscall. In any other case, the CPU will be exclusively used by that process for that quantum.
In our test, the waiting loop will run for that quantum non-stop consuming cycles that are not being used for any real processing. Remember that the mutex uses atomic operations before trying to go to sleep... if they already have the lock and nobody will interrupt the process during these quantum, the process will be locking and unlocking the mutex using atomic operations and therefore without any chance of a context switching until the quantum is over.
Adding the usleep(0)
we are not really saying the OS that we want our process to sleep 0 microseconds ... we are saying.... "Hey guy, I need something else to happen to continue, so you better try to run some other task and see if that helps".
To verify this just comment out the usleep
in both functions. The result will be something like this:
Buffer size: 64 || 6 Producers || 2 Consumers 226758 transactions processed Command being timed: "./test01" User time (seconds): 28.85 System time (seconds): 30.11 Percent of CPU this job got: 294%
You can see the even higher CPU usage in this case... as we are basically doing an active wait for the whole process quantum.
Note that things are a bit more complicated on the real world. Multiprocessor and multicore systems introduce more complexity in the above description but overall this kind of things happens.
Condition Variables
So, we have finally got to the condition variable concept. Condition Variables are just that, variables that allows a process/thread to wait on a specific condition. This means that when the process/thread is waked up is because the condition have been meet (at least recently) and it will have a chance to do its processing.
Compare this to our previous implementation where the different threads are competing the whole time to blindly acquire the mutex without knowing if the condition has been satisfied. Also note that on certain conditions there is no big difference between both cases...
Said, that, this is how our implementation will look like:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t full = PTHREAD_COND_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
void *producer (void *arg) {
int id = (int)arg;
while (1) {
pthread_mutex_lock (&mutex);
while (n == MAX_SIZE - 1) pthread_cond_wait (&full, &mutex);
shared_buf[n] = rand () % 100;
printf ("\033[32m Producer %d : %d (%d items)\033[m\n", id, shared_buf[n], n+1);
n++;
pthread_cond_signal (&empty);
pthread_mutex_unlock (&mutex);
usleep (100);
}
}
void *consumer (void *arg) {
int id = (int)arg;
while (1) {
pthread_mutex_lock (&mutex);
while (n == 0) pthread_cond_wait (&empty, &mutex);
n_transactions++;
n--;
printf ("\033[31m Consumer %d : %d (%d items-%d)\033[m\n", id, shared_buf[n], n, n_transactions);
pthread_cond_signal (&full);
pthread_mutex_unlock (&mutex);
usleep (100);
}
}
Let's get back to the code in a sec. First let's check the outcome of this change:
Buffer size: 64 || 6 Producers || 2 Consumers 226471 transactions processed Command being timed: "./test02" User time (seconds): 2.87 System time (seconds): 7.52 Percent of CPU this job got: 52% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:20.00
This new implementation uses, roughly, 30% less CPU but it is also true that it completes a bit less number of transactions. This is reasonable for a CPU bounded program like this... The CPU speed is what determines the number of transactions (there is no IO or any other syscall to wait on)... so the more CPU we use the faster we go.
Using Condition Variables
Looking to the code above you may have already realised how to use condition variables for your applications. Let's summarise it here for the reader convenience.
The way to use condition variables associated to some shared resource is as follows:
- First you have to acquire the mutex that allows mutual exclusion access to the resource.
- After that, you can safely check the condition to determine if you have to wait or not. If you do not have to wait, just continue
- If you have to wait, you have to call
pthread_cond_wait
on your condition variable and also pass the mutex as parameter. This function will send the current thread to sleep until the indicated condition variable is signalled... but first we need to release the mutex... otherwise the resource will be locked and no one will be ever able to signal the condition.pthread_cond_wait
does all this as an atomic operation. - The wait call has to be executed in a loop, because the condition may have changed since the condition variable was signaled and the thread gets executed.
Then the actually logic on how to signal the condition variables (you do that using pthread_cond_signal
) depends on the specific problem. In this case:
- Producers wait on condition variable
full
when the buffer is full - Consumers wait on condition variable
empty
when the buffer is empty - Whenever a new data is added to the buffer, the
empty
condition is signalled so a thread waiting because the buffer was empty can restart execution - Whenever a data is removed from the buffer, the
full
condition is signalled so a thread waiting because the buffer was full can restart execution.
As discussed when we looked into the use of futex
, we can just wake up one thread using pthread_cond_signal
or wake up all of them using pthread_cond_broadcast
.
Final words
You may have noted that we have coded our example in a way that we can check different configurations. Also, for each operation we are printing the current buffer size. For the examples above we have been using 2 producers and 6 consumers with identical processing time. This leads to the results shown above. If you take a look to the program output you will see that the buffer is empty most of the time.... That means that we can make it smaller and save some memory... or reduce the number of consumer so the buffer is used more.
If we make the producer number equal to 1 (7 consumers). The mutex version will take around 90% of CPU usage against a 30% for the condition variable version, and our buffer will be holding max 1 item all the time. If we reverse the configuration, 1 consumer and 7 producer, the CPU usage will be the same (in our implementation both functions are almost identical) but the buffer will be full all the time...
As we make the number of producers and consumers similar (4 for instance), the values obtained for both versions are pretty similar, both CPU usage and the use of the buffer now is more uniform.
You can see how depending on the boundary conditions the execution flow of the application is different and the parameters impacting performance also changes. In case you want to explore this further (I have already done it but this post is already too long and it is nicer for you to find by yourself), I have created a new version of the test program with further instrumentation. You can get it on my github but I will just include the condition variable version here... the mutex version can be easily modified just checking this code.
It is also interesting to modify the processing time for the producer and the consumer. Sometimes producing the data may take a long time (reading a remote sensor for instance), or consuming it may take longer (commanding some remote IoT device based on the consumed data). That also has an impact on the waiting times of the threads as well as the use of the buffer.
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#define RUN_TIME 20
#define MAX_SIZE 64
#define MAX_THREADS 8
#define MAX_PRODUCERS 3
/* Shared buffer */
int shared_buf[MAX_SIZE];
int n = 0;
/* Statistics */
int n_transactions = 0;
int max_buf = 0;
long acc_buf = 0;
int prod_wait = 0;
int cons_wait = 0;
pthread_mutex_t mutex;
pthread_cond_t full;
pthread_cond_t empty;
void *producer (void *arg) {
int id = (int)arg;
while (1) {
pthread_mutex_lock (&mutex);
while (n == MAX_SIZE - 1) { prod_wait++; pthread_cond_wait (&full, &mutex);}
shared_buf[n] = rand () % 100;
printf ("\033[32m Producer %d : %d (%d items)\033[m\n", id, shared_buf[n], n+1);
n++;
if (n > max_buf) max_buf = n;
acc_buf += n;
pthread_cond_signal (&empty);
pthread_mutex_unlock (&mutex);
usleep (100);
}
}
void *consumer (void *arg) {
int id = (int)arg;
while (1) {
pthread_mutex_lock (&mutex);
while (n == 0) {cons_wait++; pthread_cond_wait (&empty, &mutex);}
n_transactions++;
n--;
printf ("\033[31m Consumer %d : %d (%d items-%d)\033[m\n", id, shared_buf[n], n, n_transactions);
pthread_cond_signal (&full);
pthread_mutex_unlock (&mutex);
usleep (100);
}
}
int main () {
pthread_t tid[MAX_THREADS];
int i;
srand (time(NULL));
/* Initialise synchronisation objects*/
pthread_mutex_init (&mutex, NULL);
pthread_cond_init (&empty, NULL);
pthread_cond_init (&full, NULL);
/* Launch threads */
for (i = 0; i < MAX_THREADS; i++)
pthread_create (&tid[i], NULL, (i > MAX_PRODUCERS - 1? consumer : producer), (void*)i);
sleep (RUN_TIME);
printf ("WAIT: Prod: %d || Cons: %d\n", prod_wait, cons_wait);
printf ("Buffer size: %d (%d-%ld) || %d Producers || %d Consumers\n",
MAX_SIZE, max_buf, acc_buf / n_transactions, MAX_PRODUCERS, MAX_THREADS-MAX_PRODUCERS);
printf ("%d transactions processed\n", n_transactions);
return 0;
}
Conclusion
Condition variables are a very useful and elegant synchronisation mechanism to add to our concurrent programming arsenal... but there are more. Condition variables can be implemented using a mutex and a futex that will take care of the thrad sleeping and also the waiting queue. It is an interesting exercise but we will skip it for this instalment. Let me know in the comments if such a content will be of interest.
Other posts in the series: Concurrency. Introduction Concurrency. Race Conditions Concurrency.Mutex and Futex■