Concurrency. Mutex and Futex
PROGRAMMING
Concurrency. Mutex and Futex
2021-02-09
By
David "DeMO" Martínez Oliveira

In the previous instalment we introduced the concept of Mutex and dived into the basics of its implementation, that is, atomic memory modification. As we announced at the end of that article, that is just half of the trick and now it's time to get the other half sorted out.

Before improving our mutex implementation let's illustrate what's the issue with our current implementation. For that, we will need to make a small modification to our test program.

Improving our test program

To be able to see what we want to show, we need to force the threads to find the mutex busy or locked if you prefer, so what we are going to do is to add a small delay in the thread function. We will also add a third thread... This is probably not needed, but, in general, when testing, having only two-of-a-kind sometimes produces degenerated cases that seems to work and they are really masking a real problem.

Said that, this is the new test program

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>

#define MAX_ITER 1000

int             shared_var = 0;
pthread_mutex_t shared_var_mutex = PTHREAD_MUTEX_INITIALIZER;

typedef struct tpar {
  int id;
  int cnt;
} TPAR;

void * task (void *_p)
{
  int v, err, old;
  TPAR  *p = (TPAR*)_p;

  while (1) {
    printf ("%02d: %d\n", p->id, shared_var);
    pthread_mutex_lock (&shared_var_mutex);
    
    v = shared_var;
    shared_var = v + 1;
    
    usleep (100);

    pthread_mutex_unlock (&shared_var_mutex);
    p->cnt++;

  }
}

int main ()
{
  int        i, res;
  void       *r;
  TPAR       tp1, tp2,tp3;
  pthread_t  tid1, tid2, tid3;
  pthread_mutex_t mutex;

  tp1.id = 1;
  tp1.cnt = 0;
  if (pthread_create (&tid1, NULL, task, &tp1) < 0) exit (EXIT_FAILURE);

  tp2.id = 2;
  tp2.cnt = 0;
  if (pthread_create (&tid2, NULL, task, &tp2) < 0)  exit (EXIT_FAILURE);

  tp3.id = 3;
  tp3.cnt = 0;
  if (pthread_create (&tid3, NULL, task, &tp3) < 0)  exit (EXIT_FAILURE);
  
  i = 0;
  while (1) {
    i++;
    usleep (100);
    if (i >= MAX_ITER)  break;
  }
  
  pthread_cancel (tid1);
  pthread_cancel (tid2);
  pthread_cancel (tid3);

  pthread_join (tid1, &r);
  pthread_join (tid2, &r);
  pthread_join (tid3, &r);
  
  printf ("Thread %d  run %d times\n", tp1.id, tp1.cnt);
  printf ("Thread %d  run %d times\n", tp2.id, tp2.cnt);
  printf ("Thread %d  run %d times\n", tp3.id, tp3.cnt);
  printf ("Shared Value : %d  (total threads : %d) \n",
      shared_var, tp1.cnt+tp2.cnt+tp3.cnt);
  if (shared_var != tp1.cnt+tp2.cnt+tp3.cnt)
    printf ("\n*** FAIL ***\n");
  else
    printf ("\n*** SUCCESS ***\n");
  return 0;
}

Pretty straightforward, isn't it?. We just have another thread and a usleep call in the critical region. Now let's run the program:

(...)
03: 92992
01: 92993
02: 92994
03: 92995

What the ...! The program is not finishing!... What's going on?

Abrupt cancelling is not a great idea

So, let's divert from our main topic for a second to analyse this problem. Sure, in general, just cancelling a thread without any further control is not a good idea, in general, abruptly finishing something is not a good idea, unless it is a critical error... but the question is why is our test program hanging, when it was working just fine before the change.

I have to say that it took me a while and some research to figure out what was the problem.... Why was pthread_join waiting forever on an, in principle, already cancelled thread?

I added full error checking to the program, but it didn't helped so I will not add that code here. It will not throw any light on the problem and will take quite some space.

So, the problem arise because of the way, threads are cancelled. When we call the pthread_cancel and the function returns, it doesn't mean that the thread has been cancelled, it just means that we have tell the thread that we want to cancel it and the thread has received that instruction... The actual moment when the thread actually finish its execution depends on the cancellation semantics which is set via the pthread_setcanceltype function.

pthreads supports two semantics. PTHREAD_CANCEL_ASYNCHRONOUS and PTHREAD_CANCEL_DEFERRED, being this last one the default value. When the deferred cancellation type is selected, what happens is that the thread is informed about the cancellation and cancellation only will happen in the so-called cancellation points.

Cancellation points are, points inside the thread code where the cancellation request can be checked. A full list of cancellation points can be found in section 7 of the manual for pthreads (man 7 pthreads).

So, the problem is that pthread_mutex_lock is not a valid cancellation point, but usleep is. So when cancelling what happens is that the first thread in the critical region gets the cancel request when running the usleep... the other threads are at that point waiting on the mutex, but, as the thread is not releasing the mutex, the other threads just wait forever in pthread_mutex_lock because that is not a valid cancellation point. In other words, even when they received the cancellation request, as it is a deferred one, it will never happen because they are waiting in a function that is not a valid cancellation point....

To verify this we just need to add some pthread_mutex_unlock call before the joins... However, depending on the order in which the threads got blocked this may even fail some times... Well, this is my best guess without going into deeper into the problem... If I'm wrong and you know why, please, do not hesitate to leave a comment.

Summing up, we can make our program work just changing the cancel type to PTHRTEAD_CANCEL_ASYNCHRONOUS or we can write a better test program where we really control how to stop the threads ourselves.

A better test. Take 2

So, this is the new version of our test program:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define MAX_ITER 100000

int             shared_var = 0;
pthread_mutex_t shared_var_mutex = PTHREAD_MUTEX_INITIALIZER;

typedef struct tpar {
  int id;
  int cnt;
  int flag;
} TPAR;

void * task (void *_p)
{
  int v, err;
  TPAR  *p = (TPAR*)_p;
  
  while (p->flag == 0) {
    printf ("%02d: %d\n", p->id, shared_var);
    
    pthread_mutex_lock (&shared_var_mutex)
    v = shared_var;
    shared_var = v + 1;
    usleep (100);
    pthread_mutex_unlock (&shared_var_mutex);

    p->cnt++;
  }
}

int main ()
{
  int        i;
  void       *r;
  TPAR       tp1, tp2,tp3;
  pthread_t  tid1, tid2, tid3;

  tp1.id = 1;
  tp1.flag = tp1.cnt = 0;
  if (pthread_create (&tid1, NULL, task, &tp1) < 0) exit (EXIT_FAILURE);

  tp2.id = 2;
  tp2.flag = tp2.cnt = 0;
  if (pthread_create (&tid2, NULL, task, &tp2) < 0)  exit (EXIT_FAILURE);

  tp3.id = 3;
  tp3.flag = tp3.cnt = 0;
  if (pthread_create (&tid3, NULL, task, &tp3) < 0)  exit (EXIT_FAILURE);
  
  i = 0;
  while (1) {
    i++;
    usleep (100);
    if (i >= MAX_ITER)  break;      
  }
  
  printf ("******************************************\n");
  tp1.flag = 1;
  tp2.flag = 1;
  tp3.flag = 1;

  pthread_join (tid1, &r);
  pthread_join (tid2, &r);
  pthread_join (tid3, &r);
   
  printf ("Thread %d  run %d times\n", tp1.id, tp1.cnt);
  printf ("Thread %d  run %d times\n", tp2.id, tp2.cnt);
  printf ("Thread %d  run %d times\n", tp3.id, tp3.cnt);
  
  printf ("Shared Value : %d  (total threads : %d) \n",
      shared_var, tp1.cnt+tp2.cnt+tp3.cnt);
      
  if (shared_var != tp1.cnt+tp2.cnt+tp3.cnt)
    printf ("\n*** FAIL ***\n");
  else
    printf ("\n*** SUCCESS ***\n");
  return 0;
}

The change is very simple. We just control the infinite loop in the thread using a external variable that we pass as parameter. If you try this version you will see that it just works OK. So now, we can go further in our understanding of Mutexes.

For that purpose, let's write a version of this program using a naive mutex implementation with atomic built-in functions. The main function doesn't change. It is only the thread function what changes slightly. This is the added/modified code:

#define MUTEX_BUSY  1
#define MUTEX_FREE  0

typedef struct mmux_t {
  int  v;
} MY_MUTEX;

MY_MUTEX my_mutex_var = {MUTEX_FREE};

int my_mutex_lock (MY_MUTEX *mmux) {
  while (!(__sync_bool_compare_and_swap (&mmux->v, MUTEX_FREE, MUTEX_BUSY)));
}

int my_mutex_unlock (MY_MUTEX *mmux) {
  mmux->v = MUTEX_FREE;
}

void * task (void *_p)
{
  int v;
  TPAR  *p = (TPAR*)_p;

 while (p->flag == 0) {
    printf ("%02d: %d\n", p->id, shared_var);
    my_mutex_lock (&my_mutex_var);
    v = shared_var;
    shared_var = v + 1;

    usleep (100);
    my_mutex_unlock (&my_mutex_var);
    
    p->cnt++;
  }
}

So we just use sync_bool_compare_and_swap for our lock implementation and for the unlock we just reset it. For the reset we could also use an atomic operation however, simple assignments are likely already atomic... Anyhow it will probably be better to use that, but it doesn't have any impact on what we want to discuss, so let's keep it simple. (I should further investigate this in the future).

If we run this program, it works the same than the previous version using pthread_mutex... so what is the difference?. Let's assume the pthread_mutex version is named test02 an the my_mutex version is named test03.

$ /usr/bin/time -v ./test02
(...)
*** SUCCESS ***
        Command being timed: "./test02"
        User time (seconds): 0.14
        System time (seconds): 0.23
        Percent of CPU this job got: 23%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:01.61
(...)
        Exit status: 0
$ /usr/bin/time -v ./test03
*** SUCCESS ***
        Command being timed: "./test03"
        User time (seconds): 3.09
        System time (seconds): 0.16
        Percent of CPU this job got: 206%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:01.58
        (...)
        Exit status: 0

NOTE: For some reason, on my system I have to use the whole path for time to work with the -v flag

As you can see, total execution time is roughly the same (0:1.60), but the program using my_mutex takes a lot more time at user space and the use of the CPU is a almost 10 times higher.

The my_mutex implementation uses what is known as active wait. Basically the thread is continuously polling the mutex value, or in other words, the program is running all the time even when it is doing nothing. This is sometimes needed in special circumstances, and in those cases we talk about spin_locks very common in the kernel code, but for user space applications, wasting CPU doing nothing is, in general, a bad thing to do.

So, the difference is that the pthread_mutex version puts the thread to sleep effectively releasing the CPU to do other tasks. This is way more efficient and the right way to implement this synchronisation mechanism.

Note:What comes next is heavily based on the classic paper Futexes Are Tricky from Ulrich Drepper. This is a mandatory reading about this topic.

Meet the futex

It may be possible to do a user space implementation of a mutex forcing the threads to sleep, but I do not really know how. Process and threads are usually going on sleep mode commanded by the kernel... and the way we can ask the kernel to do things for us is using system calls...

So, the system call that we want to use is the so-called futex or Fast User-Space mUTEX system call. futex will allow us to do exactly what we want. A futex try to do most of the work in user space (as we have already seen) and just go into kernel mode to put threads to sleep or waking them up.

In general the futex system call provides multiple operations to enable the implementation of other types of synchronisation (not just simple mutual exclusion) but, for now, we will just focus on these two operations named respectively : FUTEX_WAIT and FUTEX_WAKE.

The futex system call is one of those with a bunch of parameters that are used or not depending on the operation to be performed (being that operation one of the parameters itself). Pretty similar to ptrace for instance. So, we will better write two wrapper functions to use our operations in order to make the code shorter and more readable.

Also, note that glibc does not provide a wrapper for the futex syscall, so we have to provide our own. See the futex man page for details and an example. Somebody have already done this for us.

So, we need to add the following code at the beginning of our program to be able to use futexs:

#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>

static int
futex(int *uaddr, int futex_op, int val,
      const struct timespec *timeout, int *uaddr2, int val3)
{
  return syscall(SYS_futex, uaddr, futex_op, val,
         timeout, uaddr, val3);
}


int futex_wait (int *addr, int val) {
  return futex (addr, FUTEX_WAIT, val, NULL, NULL, 0);
}
int futex_wake (int *addr, int val) {
  return futex (addr, FUTEX_WAKE, val, NULL, NULL, 0);
}

Yes, a wrapper around the system call plus two functions for easily access to the operations we want (we could have just used a macro here in this case). With these new Object we can modify the lock and unlock functions on our mutex as follows:

int my_mutex_lock (MY_MUTEX *mmux) {
  while (1) {
    if ( __sync_bool_compare_and_swap (&mmux->v, MUTEX_FREE, MUTEX_BUSY)) break;
    futex_wait (&mmux->v, MUTEX_BUSY);
  }
}

int my_mutex_unlock (MY_MUTEX *mmux) {
  mmux->v = MUTEX_FREE;
  futex_wake (&mmux->v, 1);
}

The unlock function is pretty obvious. We have just added a call to futex_wake in order to wake up any process or thread waiting for the futex. The parameter indicates that we just want the kernel to wake up just one thread. FUTEX_WAKE usually takes as parameter either 1 or INT_MAX.

The lock counterpart is a bit trickier. It starts as our previous implementation, atomically setting the mutex to the MUTEX_BUSY state.. In case the mutex was free, we are done. Otherwise, we call the futex_wait function which will send the process to sleep. This way we remove the active wait from our previous implementation.

FUTEX_WAIT will compare the parameter passed to the function and will send the thread to sleep if the value is the one indicated. This way, we ensure that, when we go to sleep, the mutex is set to BUSY... It may happen that the mutex changed in between. In that case, the futex will return immediately (setting errno to EAGAIN) and the thread will have a change to lock it in the next loop iteration.

This is done in a loop in order to force the process to try to acquire the mutex again when woke up by other thread.

Now we can run this version and see what we get:

$ time -v ./test04
*** SUCCESS ***
        Command being timed: "./test04"
        User time (seconds): 0.13
        System time (seconds): 0.41
        Percent of CPU this job got: 33%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:01.64

With this change it looks like we have got pretty close to the performance of the pthread implementation.

Let's do some proper measurements

Comparing versions

In order to compare our versions we will have to run each application a few times and calculate the average value for the parameters we are interested on. For that purpose I wrote the following bash script:

#!/bin/bash

if [ $# -ne 2 ]
then
   echo "Usage: stats.sh PROGRAM NUM_ITERATIONS"
   exit 1
fi

PRG=$1
N=$2

echo "Running $PRG $N times..."

truncate --size 0 /tmp/res
for ((i = 1; i < $N; i++))
do
    echo -n "."
     /usr/bin/time -f "%E %U %S %P" ./test05 1>/dev/null 2>> /tmp/res

done

echo
awk '{ sub(/%$/,"",$4); total1 += $2; total2 += $3; total4 += $4; count++} END { print total1/count, total2/count, total4/count}' /tmp/res

The script just runs the program that we pass as first parameter the number of times indicated by the second parameter. Then it stores the elapsed time, user and system time and CPU usage in a temporary file.

Finally we use the all-mighty awk to calculate the average value for the times and CPU usage.

To get our values we run the script on the pthread version and our latest version through 30 iterations. We run each five time to verify that the output of each run is similar so our average make some sense.

test05 30 test02 30
0.102069 0.291724 24.2069 0.0958621 0.291379 23.7931
0.0972414 0.294828 24.1034 0.102414 0.304138 24.8966
0.0924138 0.287586 23.3103 0.0948276 0.284483 23.2759
0.0962069 0.297586 24 0.10069 0.285517 23.6552
0.106552 0.295172 24.5862 0.0955172 0.296207 23.8621

The results are very similar. Maybe slightly better for the pthread implementation. We still have some variance after the data smoothing with our script that may be due to interrelations between cache and cores or just the load on my computer that is running quite a few process right now (over 2000 according to gkrellm).

For a more detailed tested we should try to remove as much interference as possible (running the minimal number of processes) and trying to avoid IO operations, what is pretty difficult with a normal desktop operating system. Nevertheless, the values looks good enough for our comparison.

Improving our mutex

As explained in the Drepper paper (Futexes are tricky, there is one further step to improve the mutex implementation that consists on adding a third state to the mutex which now can be: FREE, BUSY or WAITING. This way we can save some system calls on the unlock function... however, the lock gets a bit less optimal.

Back to the change, the difference between BUSY and WAITING is that in the former the mutex is locked but there is no thread waiting for it, therefore, when unlocking the mutex we do not have to call futex_wake, because there is nobody to wake up. Note that, in the previous implementation we were always calling futex_wake even when there was no thread to be waken up.

To take advantage of this we will use the atomic built-in function __sync_fetch_and_sub. This function will return the current value of the provided variable and then substract the indicated quantity all this atomically. This leads us to the following cases:

INITIAL STATE FINAL STATE ACTION
WAITING BUSY There are waiting thread. Wake one up
BUSY FREE Just leave. Nothing to do

With this information the my_mutex_unlock function will look like this:

int my_mutex_unlock (MY_MUTEX *mmux) {
  if ((__sync_fetch_and_sub (&mmux->v, 1)) != MUTEX_BUSY) {
    mmux->v = MUTEX_FREE;
    futex_wake (&mmux->v, 1);
  }
}

Note that we have to set the mutex value to MUTEX_FREE so the waiting threads can acquire it again.

In order for this to work, we also need to change the my_mutex_lock function to take into account this new state. Let's take a look to the code first:

int my_mutex_lock (MY_MUTEX *mmux) {
  int v, r;

  if ((v = __sync_val_compare_and_swap (&mmux->v, 
                                        MUTEX_FREE, MUTEX_BUSY)) != 0)
    {
      do
      {
        if (v == MUTEX_WAITING ||
            __sync_val_compare_and_swap
                          (&mmux->v, MUTEX_BUSY, MUTEX_WAITING) != 0)
          {
            futex_wait (&mmux->v, MUTEX_WAITING);         
          }
    } while ((v = __sync_val_compare_and_swap
          (&mmux->v, MUTEX_FREE, MUTEX_WAITING)) != 0);

    }
}

The lock function is a bit more complicated, but we will go through it in detail.

The first thing it does is to try to acquire the mutex assuming it is FREE. If this succeeds (the first __sync_val_compare_and_swap) it means that the mutex was available and the thread have just locked it (it is now BUSY)... so we are done.

Otherwise the variable v will contain the current state of the mutex that will be either BUSY or WAITING. In any case we will have to wait. The next instruction does that. If the state is already WAITING, then we just need to wait. If it is BUSY, we need to change it to WAITING (because now, at least one thread will be waiting for the mutex) and go to sleep.

Then, whenever the current thread in the mutex, release it, the mutex state can be FREE or BUSY, so the waken up thread needs to check again the mutex value. If it is free we are done and we have acquired the mutex. Otherwise, it means that the mutex is still busy so we have to wait again.

With all these changes, when we run our latest version of our program we get the following result:

$ /usr/bin/time -v ./test05
(...)
*** SUCCESS ***
        Command being timed: "./test05"
        User time (seconds): 0.16
        System time (seconds): 0.40
        Percent of CPU this job got: 34%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:01.64

Which is pretty much the same values we already have got with the pthread version. The main reason is that this new version optimises the unlock function for the case when there is no thread waiting. For our specific test, after adding the usleep in the thread code and a third thread, there is always a thread waiting so the optimised path is not taken so often.

Conclusion

We have learn that using cancel operations on threads is not a wise approach and that there is a lot more than just stopping the thread and returning the control involved. Also we learnt about futex and how to use them to implement a simple mutex abstraction to synchronise thread access to shared resources. The futex system call gives us access to use the kernel to keep put threads to sleep and wake them up just when needed. We have also seen how reducing the number of syscall in a program have an important impact on its performance.

Continue readin:

Concurrency. Condition Variables

Other posts in the series:

Concurrency. Introduction

Concurrency. Race Conditions

Header Image Credits: Syed Hussaini on Unsplash

 
Tu publicidad aquí :)