Anyhow, in this first installment we will explore the basic language support and take a sneak peek into the concurrency package that provides more advanced constructions.
Synchronised methods
There are not many languages out there (I mean mainstream) that provides built-in support for concurrent programming. Ada, Erlang and Go to some extend are some of them, but most programming languages relies on external libraries that interface to the OS which is the one in charge of the actual concurrent behaviour.
Java provides two elements for concurrent programming. One (actually there are a bunch of options for this) to create threads and another one for simple synchronisation. Thread creation is supported through the standard class Thread
and the standard interface Runable
.
Starting with Java 8 and the introduction of lambda expressions more fancy ways to start a thread are possible, but overall they are just different ways of using the Thread
class or the Runable
interface.
Just for completeness (it would be useful for me to have simple code examples available at some place)... and this looks as good place as any other.
Java Thread sub-classing
public class MyThread extends Thread {
@Override
public void run () {
// This is the thread code
}
}
This technique just creates a new class that inherits from the Thread
class. Then we just need to overriding the run
method with the code we want to run in our thread.
Java Runnable Interface
public class MyFunc implements Runnable {
public void run () {
// This is the thread code
}
Thread myThread = new Thread (myFunc);
}
Which is the same than the previous example, but specifying the function using an interface.
Java Thread Anonymous sub-classing
Thread myThread = new Thread () {
public void run () {
// This is the thread code
}
}
This is a more compact way to declare a one-time use function. These kind of constructions are common on GUI development to code event handlers for widgets to substitute the previous listeners implementation of older Java GUI libraries
Java Lambda Runnable
Runnable myfunc = () -> {
// This is the thread code
};
Thread myThread = new Thread (myfunc);
myThread.start ();
That is equivalent to the code:
Thread myThread = new Thread (new Runnable () {
public void run () {
// This is the thread code
}
});
myThread.start();
Java lambda Thread
Thread myThread = new Thread (() -> {
// This is the thread code
});
myThread.start ();
Or using a more compact format and without creating a variable
new Thread (() -> {
// This is the thread code
}).start ();
This code is equivalent to:
Thread myThread = new Thread () {
public vouid run () {
// This is the thread code
}
};
myThread.start();
Note that the code to run by the thread needs to be added into the run
method, but the way to start the thread is using the method start
of the thread class.
Our mutex example in Java
The mutex example we used in previous instalments, simply launches several threads that try to increase a shared counter as fast as they can. Each time they manage to successfully increase the shared counter they increase its own internal one. After running for a while, as far as everything is properly synchronised the value in the shared counter should equal the sum of the values of the internal counters for each thread.
In the C version we just needed a function and some parameters to pass to that function. The run
method on Java threads doesn't accept parameters so we need to create a class and add some fields to store our thread internal counter.... Well, at least this is the solution I come up with :).
This is our initial code:
public class ttest1 {
static int sharedVar = 0; // Shared counter
static class Task extends Thread {
int cnt; // Thread internal counter
boolean flag; // Thread stop flag
Task (String name) { // Constructor
super (name); // Uses Thread (superclass) constructor to store name
cnt = 0;
flag = true;
}
@Override
public void run () { // The actual thread code
while (flag) {
try {
System.out.println ("B:" + getName() + " : " + sharedVar+ " Local: " + cnt);
int tmp = sharedVar;
tmp = tmp + 1;
Thread.sleep (1);
sharedVar = tmp;
cnt++;
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
}
public static void main (String[] arg) {
Task t1, t2;
t1 = new Task("Thread1");
t2 = new Task("Thread2");
t1.start ();
t2.start ();
try {
Thread.sleep (2*1000);
t1.flag = false;
t2.flag= false;
t1.join();
t2.join();
System.out.println ("Shared Var : " + sharedVar + " Cycles: " + (t1.cnt+t2.cnt));
if (sharedVar == t1.cnt+t2.cnt)
System.out.println ("*** SUCCESS ***");
else
System.out.println ("*** FAIL ***");
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
The code should look very familiar to you loyal reader of this humble blog :). We add a small delay while updating the shared counter in order to force misbehaviour that may not happen if we just do a sharedVar++
. If we run the application like this, it will fail and we already know the reason.
For our C version we used a mutex
to ensure mutual exclusion access to the shared variable, but instead of that, we are going to use the second Java element provided by the language, the so-called synchronised blocks. (Actually syntax is different but in essence we are using a mutex)
Synchronised blocks ensure mutual exclusion access to the indicated block, and block here can be either a method or a code block between curly brackets.
Because of the way our test program was written, we are forced to use a code block. The only existing method is the thread run
method, and synchronising that will have no effect as each thread has its own....and therefore access to the code is always granted (you can try).
// Task
@Override
public synchronized void run () {
while (flag) {
(...)
Ensuring mutual exclusion with a code block
If you have quickly gone over some Java concurrent programming tutorial, your first idea to solve this problem could be to use some code like this:
synchronized (this) {
int tmp = sharedVar;
tmp = tmp + 1;
Thread.sleep (1);
sharedVar = tmp;
}
But this will miserably fail. The reason is that we are synchronising thread against the object this
, which, in our example, is the thread itself. So the code before will use a different this
(a different object) to control the access to the critical access and therefore, the access will be always granted.
We will see in a way the proper way to use this type of code (making good use of this
), but in order to make our current program work properly we need to synchronise against an object that is the same for all different threads.
So, the first thing is to create such an object
public class ttest1 {
static int sharedVar = 0;
static Object objSync = new Object ();
(...)
This will be a generic object reference (just what synchronized
expects) referenced through a static variable in the class that will be accessible for all the inner classes (note the static
in the Task
class definition). Now, we can rewrite our run
method as follows:
synchronized (objSync) {
int tmp = sharedVar;
tmp = tmp + 1;
Thread.sleep (1);
sharedVar = tmp;
}
And we will get the expected result.
Creating a thread-safe counter
In order to illustrate the other forms of using synchronized
in a Java program, let's introduce our SyncCounter
class. It looks like this:
public class SyncCounter {
int cnt = 0;
public void inc () {
int tmp;
try {
tmp = cnt;
tmp =tmp + 1;
Thread.sleep (1);
cnt =tmp;
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
So, we have put the sharedVar
in our previous example and the code to manipulate it (to increment it) in a class. Sure, that is a heavy refactoring!!! Now our main program will look like this:
public class MutexMetSync {
static SyncCounter sharedVar = new SyncCounter ();
static class Task extends Thread {
int cnt;
boolean flag;
Task (String name) { // Constructor
super (name);
cnt = 0;
flag = true;
}
@Override
public void run () { // Task
while (flag) {
try {
System.out.println ("B:" + getName() + " : " + sharedVar.cnt + " Local: " + cnt);
sharedVar.inc ();
cnt++;
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
}
public static void main (String[] arg) {
Task t1, t2;
System.out.println ("Mutex example with Synchronisation blocks");
t1 = new Task("Thread1");
t2 = new Task("Thread2");
t1.start ();
t2.start ();
try {
Thread.sleep (2*1000);
t1.flag = false;
t2.flag= false;
t1.join();
t2.join();
System.out.println ("Shared Var : " + sharedVar.cnt + " Cycles: " + (t1.cnt+t2.cnt));
if (sharedVar.cnt == t1.cnt+t2.cnt)
System.out.println ("*** SUCCESS ***");
else
System.out.println ("*** FAIL ***");
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
We have just substituted our sharedVar
together with our synchronisation object for the new class.
You may have notice that there is no synchronized
in all the code.... Yes, this code, as it is fails. In order to fix it we can do two things (actually three but the third one I leave it uo to you to figure our hint:we have already done that):
First, we can use a synchronized block in the inc
function like this:
public void inc () {
int tmp;
try {
synchronized (this) {
tmp = cnt;
tmp =tmp + 1;
Thread.sleep (1);
cnt =tmp;
}
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
Now, this
references the SyncCounter
instance that we ensured is the same for all threads passing it as a parameter to the thread constructor.
Second, we can just synchronize
the whole method as, in this case, all the code in the method has to be executed in mutual exclusion. something like this:
public synchronized void inc () {
(....)
}
So, this is it for the built-in Java capabilities. This works fine for simple cases, but it is not enough in general. For that reason, Java provides a very complete package named java.util concurrent
, with all kinds of ready to use classes to satisfy most of your concurrent programming carvings.
Java Locks
Even went the aforementioned package has lots of classes, there is no specific mutex
class. That is very fine as the built-in synchronized
keyword allows us to code critical regions as we would do with a mutex.
However, java.util.concurrent.lock
provides ways to deal with generic locks (a kind of generic mutex
) and also java.util.concurrent.atomic
provides classes to atomically modify values.
To illustrate the use of locks (as I have seen those more often than the atomic ones, we'll see an example of those later), let's rewrite our test program using a lock. The code will look like this:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ttest2 {
static int sharedVar = 0;
static Lock lock = new ReentrantLock();
static class Task extends Thread {
int cnt, id;
boolean flag;
Task (String name) { // Constructor
super (name);
cnt = 0;
flag = true;
}
public void run () {
int tmp;
while (flag) {
try {
System.out.println ("B:" + id + " : " + sharedVar + " Local: " + cnt);
lock.lock(); // Critical Region Starts
tmp = sharedVar;
tmp = tmp + 1;
Thread.sleep (1);
sharedVar = tmp;
cnt++;
lock.unlock(); // Critical Region Ends
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
}
public static void main (String[] arg) {
Task t1, t2;
System.out.println ("Mutex example with Synchronisation blocks");
t1 = new Task("Thread1");
t2 = new Task("Thread2");
t1.start ();
t2.start ();
try {
Thread.sleep (2*1000);
t1.flag = false;
t2.flag= false;
t1.join();
t2.join();
System.out.println ("Shared Var : " + sharedVar + " Cycles: " + (t1.cnt+t2.cnt));
if (sharedVar == t1.cnt+t2.cnt)
System.out.println ("*** SUCCESS ***");
else
System.out.println ("*** FAIL ***");
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
We can easily identify the so-called ReentrantLock
and how to lock
and unlock
it. This code looks much more similar to the C code we had used in previous instalment. The reason is that this is a lower-level construct more similar to the pthread
interface we used in our C version.
Producer/consumer
Other class provided by the java.util.concurrent
package is Condition
that allows us to implement condition variables, and solve the producer/consumer problem the same way we did it on C.
The Java implementation of the producer/consumer is pretty straightforward, however there are some caveats to discuss. In this case we have defined a BoundedBuffer
class that will do all the synchronisation stuff... (this code is taken from Java documentation for the Condition
).
package com.mycompany.producer.consumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final int[] items = new int[100];
int putptr, takeptr, count;
public void add(int x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public int get() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
int x = items[takeptr];
items[takeptr] = 0;
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
void clear() {
for (int i = 0; i < items.length; i++) items[(i+putptr)%items.length] = 0;
}
}
The code is almost the same that we used in our C program. We have a mutex (a ReentrantLock
in this case) plus two conditions for the buffer empty and buffer full cases. Note how the Condition
variables are associated to the previous lock
at creation time. When the buffer is full the producer has to wait in a loop and when it is empty, the consumer has to wait.
Encapsulating all the synchronisation logic in this class, make the producer/consumer code very simple:
package com.mycompany.producer.consumer;
import java.util.concurrent.atomic.AtomicBoolean;
public class Producer extends Thread{
BoundedBuffer q;
int id;
int v = 1;
private final AtomicBoolean flag = new AtomicBoolean(false);
public Producer (SyncBuffer q, int id) {
this.q = q;
this.id = id;
}
public void stop_thread () {
System.out.println ("Stopping Producer");
flag.set(false);
}
@Override
public void run () {
flag.set (true);
while (flag.get()) {
try {
v++;
System.out.println ("Producer " + id+ " -> " + v);
q.add (v);
Thread.sleep (800); // Let's make some noise
} catch (InterruptedException e) {
e.printStackTrace (System.out);
}
}
System.out.println ("Producer " + id + " ends...");
}
};
And the Consumer
will look the same but using the get
method on our BoundedBuffer
object.
package com.mycompany.producer.consumer;
import java.util.concurrent.atomic.AtomicBoolean;
public class Consumer extends Thread {
SyncBuffer q;
int id;
int v;
private final AtomicBoolean flag = new AtomicBoolean(false);
public Consumer (SyncBuffer q, int id) {
this.q =q;
this.id = id;
}
public void stop_thread () {
System.out.println ("Stopping COnsumer.....");
flag.set(false);
}
@Override
public void run () {
flag.set (true);
while (flag.get()) {
try {
v = q.get();
System.out.println ("COnsumer " + id + " <- " + v);
Thread.sleep (800);
} catch (InterruptedException e) {
e.printStackTrace (System.out);
}
}
System.out.println ("Consumer " + this.id + " ends...");
}
};
In this case, as an example, we have used the java.util.concurrent.atomic
package to get an AtomicBoolean
value to control the threads main loop. Other than that, the producer produces increasing integers that the consumer consumes.
So, with all these classes now we can produce our test program
BoundedBuffer q = new BoundedBuffer ();
q.clear();
Producer p1 = new Producer (q, 1);
Consumer c1 = new Consumer (q, 1);
Consumer c2 = new Consumer (q, 2);
p1.start ();
c1.start ();
c2.start ();
Thread.sleep (10000);
// Stop producer
p1.stop_thread();
p1.join();
// Stop consumers
c1.stop_thread();
c2.stop_thread();
c1.join();
c2.join();
When we run this program, it hangs up at the end never returning control to the console. The solutions works fine for infinite loops, but when we want to stop the threads the thing becomes tricky. This is actually a kind of deadlock
with a twist.
Issue with Consumer/Producer Implementation
So, that is really happening with our implementation?. Let's see it step by step. The first thing we have to note is that we were using one producer and two consumers, with similar processing times. This means, that the buffer will be empty most of the time, as the consumers consume items twice faster than the producer produces (because we have 2 consumers).
Then this is what happens. The producer is signalled to stop which happens smoothly. At that point, the buffer may contain, one or zero items (see previous paragraph). Then we signal the consumers to stop, but most of the time, the consumers will be waiting in the get
function for items from the producer (see previous paragraph). As the producer is finished, there is nobody adding items to the buffer and therefore, the consumers will wait for ever in the empty condition variable. This is the relevant code:
while (count == 0) {
notEmpty.await();
}
There are different ways or solving this. And at some extend this is a non-issue as you will likely have such a system to run forever, but it will be a nice exercise to make the application stop in a neat way.
My solution was to add a time out in the BoundingBuffer
class, that now will look like this:
package com.mycompany.producer.consumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final int[] items = new int[100];
int putptr, takeptr, count;
public void add(int x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
if (notFull.await(3000, TimeUnit.MILLISECONDS) == false)
throw new InterruptedException ("FULL condition time-out");
}
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public int get() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
if (notEmpty.await(3000, TimeUnit.MILLISECONDS) == false)
throw new InterruptedException ("EMPTY condition time-out");
}
int x = items[takeptr];
items[takeptr] = 0;
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
void clear() {
for (int i = 0; i < items.length; i++) items[(i+putptr)%items.length] = 0;
}
}
In this implementation we wait on the conditions polling for 3 seconds maximum and we throw a InterruptedException
in case that timeout expired. The await
method will return false
if the indicated timeout have expired. We still need to run this in a loop in order to deal with spurious interrupts as indicated by the Java doc.
This is it for now. I'll come back to Java when we go further in out concurrent programming journey!
Conclusions
We went quickly through the Java built-in support for concurrent programming and we also took a glimpse in the java.util.concurrent
package and some of the low level synchronisation classes. To finish this post I would just like to point out that, as indicated in the Java docs, there is an ArrayBlockingQueue
that implements this functionality.... but I always believe it is good to know how things works behind the hood even when you will not usually do things that way.
■