Concurrency. Java Mutex and Conditions
PROGRAMMING
Concurrency. Java Mutex and Conditions
2021-03-06
By
David "DeMO" Martínez Oliveira

Before diving deeper into more complicated concurrent programming elements, let's take a quick look to what Java can do for the examples we had already seen. I was planing to do this at the very end, but I have realised that such an article will be too long so it is better to start looking to it right now.

You may be wondering.... why Java? Well, the answer is that I had to learnt how to do concurrent programming in Java for some reasons that doesn't really matter. But, despite of my own reasons, it is true that there is some need for concurrent programming in Java in industry... even when I do not really understand why.

Anyhow, in this first installment we will explore the basic language support and take a sneak peek into the concurrency package that provides more advanced constructions.

Synchronised methods

There are not many languages out there (I mean mainstream) that provides built-in support for concurrent programming. Ada, Erlang and Go to some extend are some of them, but most programming languages relies on external libraries that interface to the OS which is the one in charge of the actual concurrent behaviour.

Java provides two elements for concurrent programming. One (actually there are a bunch of options for this) to create threads and another one for simple synchronisation. Thread creation is supported through the standard class Thread and the standard interface Runable.

Starting with Java 8 and the introduction of lambda expressions more fancy ways to start a thread are possible, but overall they are just different ways of using the Thread class or the Runable interface.

Just for completeness (it would be useful for me to have simple code examples available at some place)... and this looks as good place as any other.

Java Thread sub-classing

public class MyThread extends Thread {
  @Override
  public void run () {
    // This is the thread code
  }
}

This technique just creates a new class that inherits from the Thread class. Then we just need to overriding the run method with the code we want to run in our thread.

Java Runnable Interface

public class MyFunc implements Runnable {
  public void run () {
    // This is the thread code
  }
  Thread myThread = new Thread (myFunc);
}

Which is the same than the previous example, but specifying the function using an interface.

Java Thread Anonymous sub-classing

Thread myThread = new Thread () {
  public void run () {
    // This is the thread code
  }
}

This is a more compact way to declare a one-time use function. These kind of constructions are common on GUI development to code event handlers for widgets to substitute the previous listeners implementation of older Java GUI libraries

Java Lambda Runnable

  Runnable myfunc = () -> {
    // This is the thread code
    };
  Thread myThread = new Thread (myfunc);
  myThread.start ();

That is equivalent to the code:

  Thread myThread = new Thread (new Runnable () {
    public void run () {
      // This is the thread code
    }
  });
  myThread.start();

Java lambda Thread

    Thread myThread = new Thread (() -> {
        // This is the thread code
      });
    myThread.start ();

Or using a more compact format and without creating a variable

    new Thread (() -> {
        // This is the thread code
    }).start ();

This code is equivalent to:

Thread myThread = new Thread () {
  public vouid run () {
    // This is the thread code
  }
};
myThread.start();

Note that the code to run by the thread needs to be added into the run method, but the way to start the thread is using the method start of the thread class.

Our mutex example in Java

The mutex example we used in previous instalments, simply launches several threads that try to increase a shared counter as fast as they can. Each time they manage to successfully increase the shared counter they increase its own internal one. After running for a while, as far as everything is properly synchronised the value in the shared counter should equal the sum of the values of the internal counters for each thread.

In the C version we just needed a function and some parameters to pass to that function. The run method on Java threads doesn't accept parameters so we need to create a class and add some fields to store our thread internal counter.... Well, at least this is the solution I come up with :).

This is our initial code:

public class ttest1 {
  static int sharedVar = 0;  // Shared counter

  static class Task extends Thread {
    int         cnt;   // Thread internal counter
    boolean     flag;  // Thread stop flag
    
    Task (String name) { // Constructor
      super (name);      // Uses Thread (superclass) constructor to store name
      cnt = 0;
      flag = true;
    }
    
    @Override
    public void run () { // The actual thread code
      while (flag) {
        try {
          System.out.println ("B:" + getName() + " : " + sharedVar+ " Local: " + cnt);
          int tmp = sharedVar;
          tmp = tmp + 1;
          Thread.sleep (1);
          sharedVar = tmp;
          cnt++;
        } catch (Exception e) {
          e.printStackTrace(System.out);
        }
      }
    }
  }

  public static void main (String[] arg) {
    Task t1, t2;
    t1 = new Task("Thread1");
    t2 = new Task("Thread2");
    t1.start ();
    t2.start ();
    
    try {
      Thread.sleep (2*1000);
      t1.flag = false;
      t2.flag= false;
      t1.join();
      t2.join();
      System.out.println ("Shared Var : " + sharedVar + " Cycles: " + (t1.cnt+t2.cnt));
      if (sharedVar == t1.cnt+t2.cnt)
    System.out.println ("*** SUCCESS ***");
      else
    System.out.println ("*** FAIL ***");
      
    } catch (Exception e) {
      e.printStackTrace(System.out);
    }
  }
}

The code should look very familiar to you loyal reader of this humble blog :). We add a small delay while updating the shared counter in order to force misbehaviour that may not happen if we just do a sharedVar++. If we run the application like this, it will fail and we already know the reason.

For our C version we used a mutex to ensure mutual exclusion access to the shared variable, but instead of that, we are going to use the second Java element provided by the language, the so-called synchronised blocks. (Actually syntax is different but in essence we are using a mutex)

Synchronised blocks ensure mutual exclusion access to the indicated block, and block here can be either a method or a code block between curly brackets.

Because of the way our test program was written, we are forced to use a code block. The only existing method is the thread run method, and synchronising that will have no effect as each thread has its own....and therefore access to the code is always granted (you can try).

    // Task
    @Override
    public synchronized void run () {
      while (flag) {
       (...)

Ensuring mutual exclusion with a code block

If you have quickly gone over some Java concurrent programming tutorial, your first idea to solve this problem could be to use some code like this:

      synchronized (this) {
        int tmp = sharedVar;
        tmp = tmp + 1;
        Thread.sleep (1);
        sharedVar = tmp;
      }

But this will miserably fail. The reason is that we are synchronising thread against the object this, which, in our example, is the thread itself. So the code before will use a different this (a different object) to control the access to the critical access and therefore, the access will be always granted.

We will see in a way the proper way to use this type of code (making good use of this), but in order to make our current program work properly we need to synchronise against an object that is the same for all different threads.

So, the first thing is to create such an object

public class ttest1 {
  static int sharedVar = 0;
  static Object objSync = new Object ();
  (...)

This will be a generic object reference (just what synchronized expects) referenced through a static variable in the class that will be accessible for all the inner classes (note the static in the Task class definition). Now, we can rewrite our run method as follows:

      synchronized (objSync) {
        int tmp = sharedVar;
        tmp = tmp + 1;
        Thread.sleep (1);
        sharedVar = tmp;
      }

And we will get the expected result.

Creating a thread-safe counter

In order to illustrate the other forms of using synchronized in a Java program, let's introduce our SyncCounter class. It looks like this:

public class SyncCounter {
  int cnt = 0;
  public void inc () {
    int tmp;
    try {
      tmp = cnt;
      tmp =tmp + 1;
      Thread.sleep (1);
      cnt =tmp;
    } catch (Exception e) {
      e.printStackTrace(System.out);
    }
  }
}

So, we have put the sharedVar in our previous example and the code to manipulate it (to increment it) in a class. Sure, that is a heavy refactoring!!! Now our main program will look like this:


public class MutexMetSync {
  static SyncCounter sharedVar = new SyncCounter ();

  static class Task extends Thread {
    int         cnt;
    boolean     flag;
      
    Task (String name) { // Constructor 
      super (name);
      cnt = 0;
      flag = true;
    }
    
    @Override
    public void run () {     // Task
      while (flag) {
        try {
          System.out.println ("B:" + getName() + " : " + sharedVar.cnt + " Local: " + cnt);
          sharedVar.inc ();
          cnt++;
        } catch (Exception e) {
          e.printStackTrace(System.out);
        }
      }
    }
    
  }

  public static void main (String[] arg) {
    Task t1, t2;

    System.out.println ("Mutex example with Synchronisation blocks");
    t1 = new Task("Thread1");
    t2 = new Task("Thread2");
    t1.start ();
    t2.start ();
    
    try {
      Thread.sleep (2*1000);
      t1.flag = false;
      t2.flag= false;
      t1.join();
      t2.join();
      System.out.println ("Shared Var : " + sharedVar.cnt + " Cycles: " + (t1.cnt+t2.cnt));
      if (sharedVar.cnt == t1.cnt+t2.cnt)
    System.out.println ("*** SUCCESS ***");
      else
    System.out.println ("*** FAIL ***");
      
    } catch (Exception e) {
      e.printStackTrace(System.out);
    }
  }
}

We have just substituted our sharedVar together with our synchronisation object for the new class.

You may have notice that there is no synchronized in all the code.... Yes, this code, as it is fails. In order to fix it we can do two things (actually three but the third one I leave it uo to you to figure our hint:we have already done that):

First, we can use a synchronized block in the inc function like this:

  public void inc () {
    int tmp;
    try {
      synchronized (this) {
        tmp = cnt;
        tmp =tmp + 1;
        Thread.sleep (1);
        cnt =tmp;
      }
    } catch (Exception e) {
      e.printStackTrace(System.out);
    }
  }

Now, this references the SyncCounter instance that we ensured is the same for all threads passing it as a parameter to the thread constructor.

Second, we can just synchronize the whole method as, in this case, all the code in the method has to be executed in mutual exclusion. something like this:

  public synchronized void inc () {
    (....)
  }

So, this is it for the built-in Java capabilities. This works fine for simple cases, but it is not enough in general. For that reason, Java provides a very complete package named java.util concurrent, with all kinds of ready to use classes to satisfy most of your concurrent programming carvings.

Java Locks

Even went the aforementioned package has lots of classes, there is no specific mutex class. That is very fine as the built-in synchronized keyword allows us to code critical regions as we would do with a mutex.

However, java.util.concurrent.lock provides ways to deal with generic locks (a kind of generic mutex) and also java.util.concurrent.atomic provides classes to atomically modify values.

To illustrate the use of locks (as I have seen those more often than the atomic ones, we'll see an example of those later), let's rewrite our test program using a lock. The code will look like this:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ttest2 {
  static int sharedVar = 0;
  static Lock lock = new ReentrantLock();
  
  static class Task extends Thread { 
    int     cnt, id;
    boolean flag;

    Task (String name) { // Constructor
      super (name);
      cnt = 0;
      flag = true;
    }
    
    public void run () {
      int tmp;
      
      while (flag) {
        try {
          System.out.println ("B:" + id + " : " + sharedVar + " Local: " + cnt);
      
          lock.lock(); // Critical Region Starts
          tmp = sharedVar;
          tmp = tmp + 1;
          Thread.sleep (1);
          sharedVar = tmp;
          cnt++;
          lock.unlock(); // Critical Region Ends
        } catch (Exception e) {
          e.printStackTrace(System.out);
        }
      }
    }
  }

  public static void main (String[] arg) {
    Task t1, t2;

    System.out.println ("Mutex example with Synchronisation blocks");
    t1 = new Task("Thread1");
    t2 = new Task("Thread2");
    t1.start ();
    t2.start ();
    
    try {
      Thread.sleep (2*1000);
      t1.flag = false;
      t2.flag= false;
      t1.join();
      t2.join();
      System.out.println ("Shared Var : " + sharedVar + " Cycles: " + (t1.cnt+t2.cnt));
      if (sharedVar == t1.cnt+t2.cnt)
    System.out.println ("*** SUCCESS ***");
      else
    System.out.println ("*** FAIL ***");
      
    } catch (Exception e) {
      e.printStackTrace(System.out);
    }
  }
}

We can easily identify the so-called ReentrantLock and how to lock and unlock it. This code looks much more similar to the C code we had used in previous instalment. The reason is that this is a lower-level construct more similar to the pthread interface we used in our C version.

Producer/consumer

Other class provided by the java.util.concurrent package is Condition that allows us to implement condition variables, and solve the producer/consumer problem the same way we did it on C.

The Java implementation of the producer/consumer is pretty straightforward, however there are some caveats to discuss. In this case we have defined a BoundedBuffer class that will do all the synchronisation stuff... (this code is taken from Java documentation for the Condition).

package com.mycompany.producer.consumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    
    final int[] items = new int[100];
    int   putptr, takeptr, count;
    
     public void add(int x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) {
         notFull.await();
       }
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public int get() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) {
         notEmpty.await();
       }
       int x = items[takeptr];
       items[takeptr] = 0;
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }  
    void clear() {
        for (int i = 0; i < items.length; i++) items[(i+putptr)%items.length] = 0;
    }
}

The code is almost the same that we used in our C program. We have a mutex (a ReentrantLock in this case) plus two conditions for the buffer empty and buffer full cases. Note how the Condition variables are associated to the previous lock at creation time. When the buffer is full the producer has to wait in a loop and when it is empty, the consumer has to wait.

Encapsulating all the synchronisation logic in this class, make the producer/consumer code very simple:

package com.mycompany.producer.consumer;

import java.util.concurrent.atomic.AtomicBoolean;

public class Producer extends Thread{
  BoundedBuffer q;
  int           id;
  int           v = 1;
  private final AtomicBoolean flag = new AtomicBoolean(false);
  
  public Producer (SyncBuffer q, int id)  {
    this.q = q;
    this.id = id;
  }
  public  void stop_thread () {
    System.out.println ("Stopping Producer");
    flag.set(false);
  }
  @Override
  public void run () {
    flag.set (true);
    while (flag.get()) {
      try {
        v++;
        System.out.println ("Producer " + id+ " -> " + v);
        
        q.add (v);

        Thread.sleep (800); // Let's make some noise
      } catch (InterruptedException e) {
        e.printStackTrace (System.out);
      }
    }
    System.out.println ("Producer " + id +  " ends...");
  }
};

And the Consumer will look the same but using the get method on our BoundedBuffer object.

package com.mycompany.producer.consumer;

import java.util.concurrent.atomic.AtomicBoolean;

public class Consumer extends Thread {
  SyncBuffer   q;
  int          id;
  int          v;
  private final AtomicBoolean flag = new AtomicBoolean(false);
        
  public Consumer (SyncBuffer q, int id)  {
    this.q =q;
    this.id = id;
  }
  public  void stop_thread () {
     System.out.println ("Stopping COnsumer.....");
     flag.set(false);
   }
   @Override
   public void run () {            
     flag.set (true);
     while (flag.get()) {
       try {
         v = q.get();
         System.out.println ("COnsumer " + id + " <- " + v);
         Thread.sleep (800);
       } catch (InterruptedException e) {
         e.printStackTrace (System.out);
       }
     }
   System.out.println ("Consumer " + this.id +  " ends...");
   }
};

In this case, as an example, we have used the java.util.concurrent.atomic package to get an AtomicBoolean value to control the threads main loop. Other than that, the producer produces increasing integers that the consumer consumes.

So, with all these classes now we can produce our test program

       BoundedBuffer q = new BoundedBuffer ();
        q.clear();
        Producer p1 = new Producer (q, 1);
        
        Consumer c1 = new Consumer (q, 1);
        Consumer c2 = new Consumer (q, 2);
        
        p1.start ();
        
        c1.start ();
        c2.start ();
        Thread.sleep (10000);
        // Stop producer
        p1.stop_thread();
        p1.join();
        // Stop consumers
        c1.stop_thread();
        c2.stop_thread();

        c1.join();
        c2.join();

When we run this program, it hangs up at the end never returning control to the console. The solutions works fine for infinite loops, but when we want to stop the threads the thing becomes tricky. This is actually a kind of deadlock with a twist.

Issue with Consumer/Producer Implementation

So, that is really happening with our implementation?. Let's see it step by step. The first thing we have to note is that we were using one producer and two consumers, with similar processing times. This means, that the buffer will be empty most of the time, as the consumers consume items twice faster than the producer produces (because we have 2 consumers).

Then this is what happens. The producer is signalled to stop which happens smoothly. At that point, the buffer may contain, one or zero items (see previous paragraph). Then we signal the consumers to stop, but most of the time, the consumers will be waiting in the get function for items from the producer (see previous paragraph). As the producer is finished, there is nobody adding items to the buffer and therefore, the consumers will wait for ever in the empty condition variable. This is the relevant code:

       while (count == 0) {
         notEmpty.await();
       }

There are different ways or solving this. And at some extend this is a non-issue as you will likely have such a system to run forever, but it will be a nice exercise to make the application stop in a neat way.

My solution was to add a time out in the BoundingBuffer class, that now will look like this:

package com.mycompany.producer.consumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    
    final int[] items = new int[100];
    int         putptr, takeptr, count;
    
    public void add(int x) throws InterruptedException {
       lock.lock();
       try {
         while (count == items.length) {
           if (notFull.await(3000, TimeUnit.MILLISECONDS) == false)
             throw new InterruptedException ("FULL condition time-out");
         }
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
       } finally {
         lock.unlock();
       }
     }

    public int get() throws InterruptedException {
     lock.lock();
     try {

       while (count == 0) {
         if (notEmpty.await(3000, TimeUnit.MILLISECONDS) == false)
           throw new InterruptedException ("EMPTY condition time-out");
       }
       int x = items[takeptr];
       items[takeptr] = 0;
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
       } finally {
         lock.unlock();
       }
     }
    void clear() {
        for (int i = 0; i < items.length; i++) items[(i+putptr)%items.length] = 0;
    }

}

In this implementation we wait on the conditions polling for 3 seconds maximum and we throw a InterruptedException in case that timeout expired. The await method will return false if the indicated timeout have expired. We still need to run this in a loop in order to deal with spurious interrupts as indicated by the Java doc.

This is it for now. I'll come back to Java when we go further in out concurrent programming journey!

Conclusions

We went quickly through the Java built-in support for concurrent programming and we also took a glimpse in the java.util.concurrent package and some of the low level synchronisation classes. To finish this post I would just like to point out that, as indicated in the Java docs, there is an ArrayBlockingQueue that implements this functionality.... but I always believe it is good to know how things works behind the hood even when you will not usually do things that way.

Header Image Credits: Merlene Goulet

 
Tu publicidad aquí :)