Java vs C Network Programming. Select and Selectors
 
PROGRAMMING
Java vs C Network Programming. Select and Selectors
2021-05-26 | by David "DeMO" Martínez Oliveira

Single client servers work fine for simple applications but in more complex cases when multiple connections need to be managed at the same time, you will end writing a multi-thread application that takes up much more resources than needed. Let's see how to write this kind of applications in an efficient way.

Sometime multithread/multiprocess applications are indeed required, but in many cases we can just go ahead in a simpler way using the select system call or the Java NIO Selectors. We will see in a second that they are roughly the same thing.

All the echo servers we had written so far (12) can only handle a single connection. Actually, for the version than replies to the client until it closes the connection, no other client will be able to connect in the meanwhile.... basically because the accept syscall will not be executed again until the current client disconnects.

The easiest way to deal with this problem is to just fire a new thread for each connection. After accepting the connection, a new thread is created to serve the just connected socket. This is a very simple implementation but, depending on the kind of server we are writing it may be a waste of resources.

In these cases, using select will allow us to deal with multiple connections with a single thread and also in an easier way.

The select system call

The select system call allows us, given a list of file descriptors (actually three lists, but let's get to that later), make our process sleep until something happens with those file descriptors. Actually select does more than that.

As I have just mentioned select allows us to wait for something to happen in three sets of file descriptors. That's right, we talk about file descriptor that may represent a file, a socket, a pipe... that is the magic of Unix... everything is a file.

The three sets that select manage are:

  • A read set. In this list we will add the file descriptors what we want to read from. Whenever data is available in any of the file descriptor in this set, select will return and mark that file descriptor, ensuring that we can perform a read operation without blocking.
  • A write sett. This works the same than the read list but for writing. In most of the cases you may not need to use that, and just write straight away. For network applications you won't usually need to care about this one.
  • A exception set. This will watch for exceptional conditions, which, once again, are very unlikely to happen in a normal network application (unless we use OOB traffic).

select will wait until something happens in those sets of file descriptor or until a given timeout expires. In the first case, select returns the total number of file descriptors that need to be checked (in all three groups). In the second case it returns 0.

With all this information, let's modify our last C echo server to use select and support multiple simultaneously connections.

Setting up the File Description Sets

The file description lists or sets we described in the previous section, are manipulated using the following macros:

  • FD_ZERO (&fds). This resets the file descriptor set fds
  • FD_SET(fd,&fds). This sets a file descriptor fd to the file descriptor set fds
  • FD_CLR(fd,&fds). This removes file descriptor fd from the file descriptor set fds

So, the first change we have to do is to declare our file description set and initialise it:

  fd_set               rfds;
  CHANNEL              *c, *c1; 

  (...)
  if ((c = channel_new (CHANNEL_SERVER, NULL, 1234)) < 0) FAIL("channel_new:");
 
  while (1)
    { 
      FD_ZERO(&rfds);
      FD_SET(c->s,&rfds); // Add accept socket

As you can see we are initialising the file descriptor set inside the infinite loop of the server. The reason is that we have to reset the set on each call to select. This is how the man page explains it:

On exit, each of the file descriptor sets is modified in place to indicate which file descriptors actually changed status. (Thus, if using select() within a loop, the sets must be reinitialized before each call.)

For this reason, we also need to keep track of the current connections we are serving so we can properly initialise the file descriptor set on each loop. For that we will just declare a CHANNEL array and we will initialise it with NULLs. So the code above will be modified like this:

#define MAX_CONNECTIONS 64
CHANNEL *con[MAX_CONNECTIONS]; // List of current connections

void add_connection (CHANNEL *c) {
  int  i;
  if (!c) return;
  
  // Look for a hole in the array
  for (i = 0; con[i] && i < MAX_CONNECTIONS; i++); 
  
  if (i == MAX_CONNECTIONS) FAIL ("Too many connections");
  con[i] = c;

  return;
}

int main () {
  fd_set               rfds;
  CHANNEL              *c, *c1; 
  (...)
  for (i = 0; i < MAX_CONNECTIONS; con[i++] = NULL); // Initialise connections
  if ((c = channel_new (CHANNEL_SERVER, NULL, 1234)) < 0) FAIL("channel_new:");
  
  while (1)
    { 
      FD_ZERO(&rfds);
      FD_SET(c->s,&rfds); // Add accept socket
      
      // Add all client sockets
      for (max = -1,i = 0; i < MAX_CONNECTIONS; i++)
        if (con[i]) {
          FD_SET(con[i]->s,&rfds);
          if (con[i]->s > max) max = con[i]->s;
        }

We have added a function to add a channel to the list. It just looks for the first NULL in the array and uses that entry. Then, in the main function, the first thing we have to do is to initialise the list of connections. We do that with the first for loop.

After that we can just create our listen socket and go into the loop. Clean up the read file descriptor set, add the listen socket and then go through the whole connection array, adding the socket handler for all channels stored there. We could also keep track of the number of current channels in use so we could finish the loop a bit earlier... I leave it as an exercise to the reader ;).

Calling select

Now we can call select, the code will look like this:

  struct timeval       tv; 
  (...)
      /* Set timeout Timeout*/
      tv.tv_sec = 0;
      tv.tv_usec = 100000;
      
      if ((n = select (max + 1, &rfds, NULL, NULL, &tv)) < 0)
        perror ("select:");
      else
        {
           if (n == 0) continue; // Timeout. You can do background task here
           // There is something to process.... Let's go

In this example we are setting a 100 msec timeout. This is specified using a struct timeval vairable that we need to pass as a reference. When select returns because a file descriptor got data to read, the tv variable will contain the amount of remaining time to complete the indicated period, at least on Linux system (Check the section about the timeout in the man page). Passing a NULL will make select block (wait forever) and setting both fields in the timeout structure to 0 select will return immediately.... we will be polling the file descriptors.

I haven't described the select parameters so far. Now it looks like the right time to do:

The first parameter has to be the higher fd number in all sets plus 1. I haven't checked the select source code, but looks like it does some kind of polling so having this number defined like that will ensure that we use the minimal resources possible.

Then we find the three file descriptor sets. In this case we are just using the read set and we have set the write and exception sets to NULL. You can set all three sets to NULL and then use select as a high precision sleep.

The final parameter is the timeout that we have already described.

select as all other system calls will return a negative value in case of error. Otherwise it will return the number of file descriptors affected in all three sets or 0 in case nothing happened in the sets and the timeout expired.

This last case can be used to run background tasks, as for instance updating the widgets of a GUI application, while waiting for something to happen in the network layer.

Checking the file descriptors

select will tell us how many file descriptors can be read, write or have an exceptional condition, but we need to find out which specific ones are they. For that, we can use the FD_ISSET(fd,&fds) macro. This macro will let us know if the file descriptor fd is set for the file descriptor set fds.

For our simple echo server we need to check for two conditions. The first one is a connection try, so we can run accept and let the remote client use our service. The second one is data to be read, or in other words, the data sent by the client that we have to echo back.

Let's first look at the accept code:

      if (FD_ISSET(c->s,&rfds)) // Accept connection
        {
          if ((c1 = channel_accept (c)) == NULL) FAIL ("channel_accept:");
          add_connection (c1); // Add new socket to connection list
        }

This is easy. We just check whether there is something in our listen socket (that we already know). Yes, a connection attempt is registered by select in the read file descriptor set. In that case we can run accept and we know it won't block (because that is what select does), and then we add the accepted channel to our list of current connections. Know you can check again the code that initialises the file descriptor set and the add_connection function above.

For the rest of the active connections we actually have to check all of them to know which ones are the ones actually providing data to be read. Again, if you keep the number of current connections, you can end the loop earlier. In this case I'm just always checking the whole array (it is just 64 connections max so it is not a big deal).

The code for this will look like:

      for (i = 0; i < MAX_CONNECTIONS; i++)
        {
          if ((con[i] && FD_ISSET(con[i]->s, &rfds)))
            {
              // Do the echo thingy
            }
        }

This way, we can process all the client request one after the other. In this case, the service is very simple and it won't take longer. In more complex application you will likely just get your data here and then feed it into some queue to be processed by some thread pool... for instance.. This always depends on the actual task to be performed.

For your convenience, here is the new code added to the previous version of the C echo server:

#include <time.h>
#define MAX_CONNECTIONS 64
CHANNEL *con[MAX_CONNECTIONS];

void add_connection (CHANNEL *c) {
  int  i;
  if (!c) return;
  
  for (i = 0; con[i] && i < MAX_CONNECTIONS; i++);
  if (i == MAX_CONNECTIONS) FAIL ("Out of memory");
  con[i] = c;
  
  return;
}

int main () {
  fd_set               rfds;
  struct timeval       tv; 
  CHANNEL              *c, *c1; 
  BUFFER               *buf;
  int                  i, max, n, ops=1;
  unsigned char        *msg;

  for (i = 0; i < MAX_CONNECTIONS; con[i++] = NULL); // Initialise connections
  if ((c = channel_new (CHANNEL_SERVER, NULL, 1234)) < 0) FAIL("channel_new:");
  
  while (1)
    { 
      FD_ZERO(&rfds);
      FD_SET(c->s,&rfds); // Add accept socket
      
      // Add all other sockets
      for (max = -1,i = 0; i < MAX_CONNECTIONS; i++)
        if (con[i]) {
          FD_SET(con[i]->s,&rfds);
          if (con[i]->s > max) max = con[i]->s;
        }
      
      /* Set timeout Timeout*/
      tv.tv_sec = 0;
      tv.tv_usec = 100000;
      
      if ((n = select (max + 1, &rfds, NULL, NULL, &tv)) < 0)
        perror ("select:");
      else
        {
          if (FD_ISSET(c->s,&rfds)) // Accept connection
            {
              if ((c1 = channel_accept (c)) == NULL) FAIL ("channel_accept:");

              add_connection (c1); // Add new socket to connection list
            }
          // Check if there is anything to read from the connections
          for (i = 0; i < MAX_CONNECTIONS; i++)
            {
              if ((con[i] && FD_ISSET(con[i]->s, &rfds)))
                {
                  buf = buffer_new (1024);
                  int len = channel_read (con[i], buf);
                  if (len > 0)
                    {
                      buffer_flip (buf);
                      msg = malloc (len + 1);
                      memset (msg,0, len + 1);
                      buffer_get (buf, msg, len);
                      printf ("RECV (%d) : %s", len, msg);
                      if (len > 0)
                        {
                          buf->clear(buf)->put(buf, "ECHO : ", 7)->put(buf, msg,len)->flip(buf);
                          channel_write (con[i], buf);
                          free (msg);
                          msg = NULL;
                          buffer_free (buf);
                          continue;
                        }
                      }
                  printf ("Connection closed\n");
                  channel_free (con[i]);
                  free (msg);
                  con[i] = NULL;
                  msg = NULL;
                  buffer_free (buf);
                }
              }
          }
    }
  channel_free (c);
  return 0;
}

Java Selectors

Now it is time to see how we can implement this on Java. Java NIO added the so-called Selector class that actually works like the select system call... Well there are little divergences but overall, as we will see, it is the same thing.

Let's start seeing how to create the selector:

  public static void main (String[] args) {
    try {
      ServerSocketChannel s;    // Socket Server
      Selector            sel;  // Selector
      
      // Create server socket and bind
      s = ServerSocketChannel.open();
      s.socket().bind(new InetSocketAddress(port));

      // Create selector and add server socket to accept connections
      sel = Selector.open ();
      s.configureBlocking (false);
      s.register (sel, SelectionKey.OP_ACCEPT);

      // Main loop
      while(true)
        {
          (...)

The beginning of the program is the same than in the previous version. We just create the server socket and bind it to a port. Then we create the Selector and we register the listen socket with it. This is equivalent to adding the server socket to the read file descriptor set.

However, there are two differences. The first one is that in order to register a Channel with a Selector it has to be configured as non-blocking. In the C version we could also have done that, but we did not really needed as select ensures that the operation will be not blocking... in other words, it will work the same.

NOTE:

When writing a client things are slightly different. The connect system call has a default timeout of several seconds. This value is a combination of retries and increasing timeout values. In these cases you may want to set your socket non-blocking and manage the connection timeout yourself. Otherwise, for instance in a GUI application, the GUI will block for several seconds (unless you run the connection on a separated thread) when trying to connect to a machine that doesn't responds (behind a firewall dropping packets for instance)... And this case you are likely to forget during your development testing and may pop up in the future at an inconvenient time.

Note that, when registering s (our listening channel) we pass the SelectionKey.OP_ACCEPT. In Java NIO we have different keys for accepting connections and reading them. When using select both cases are actually the same.

Running the selector

Now we can start running the selector. Everything will work the same than in the C version but the selector will return Sets for us that we can go navigate using an Iterator. This is how to code looks like:

      while(true)
    {
      int n = sel.select (100);
      if (n == 0) continue; // Timeout/idle/background function here

      Set<SelectionKey> keys = sel.selectedKeys (); // Get Keys selected
      Iterator<SelectionKey> it = keys.iterator (); // Create an Iterator to go through them

      // Process Selection
      while (it.hasNext ()) // Navigate the keys using the iterator
        {
          SelectionKey key = it.next (); // Current Key
          if (key.isAcceptable ()) // It is a connection attempt to our server?
            {
              System.out.println ("! Connection accepted");
          
              ServerSocketChannel ss =(ServerSocketChannel) key.channel (); // Retrieve the chanel
              SocketChannel sc = ss.accept();
              // In order to add the new socket to the selector it has to be nonBlocking
              sc.configureBlocking (false);
              sc.register (sel, SelectionKey.OP_READ);
              System.out.println ("+ Connection from " + sc.getRemoteAddress());
          }
          else if (key.isReadable ()) // Is there data to read? {
            (...)
          }

At the beginning we can see the call to select with a timeout of 100 miliseconds. You can use the selectNow method that sets the timeout to 0 but that will take all your CPU (you will be polling as described when talking about select), so just do that if you really need it.

Then the sel.selectedKeys() method. will return a set containing all the keys selected by select. The keys are roughly the Java equivalent to our file descriptors. In order to process all those keys, as they are returned as a Set, we can just create an Iterator<SelectionKey> to go through each one of them. This is what the second while loop does.

In this loop, for each key (returned by our iterator after calling it.next()), we can use the isAcceptable method to check if we can run accept on the channel associated to that key, the isReadable() method to check if we can read data from the channel associated to that key, the isConnectable() in case we can connect to a remote server and isWritable() that you should already know what case is for.

In the code fragment above I kept the accept code. It takes the actual channel from the key being tested (key.channel()) which is handy if, for instance, your server is listening in more than one port (you have more than one ServerSocket objects). Once we have the Channel we just need to call accept and register the new SocketChannel it returns in the selector (sel) just after setting it as non-blocking... otherwise the register method will fail.

The code for the isReadable block is actually the same we used in the last instalment, but also calling key.cancel() in case the connection is dropped by the client. That method will remove the key from the set so it won't be tested in future calls to select. It is equivalent to FD_CLR

The complete Java code

As reference, this is the complete Java code in case you missed some pieces in the step by step explanation:

import java.net.*;
import java.util.Set;
import java.util.Iterator;
import java.nio.channels.*;
import java.nio.ByteBuffer;

class TCPServerNIOSelector {
  public static final int port = 1234;

  public static void main (String[] args) {
      
    System.out.println ("NIO Server");

    try {
      ServerSocketChannel s;    // Socket Server
      Selector            sel;  // Selector
      
      // Create server socket and bind
      s = ServerSocketChannel.open();
      s.socket().bind(new InetSocketAddress(port));

      // Create selector and add server socket to accept connections
      sel = Selector.open ();
      s.configureBlocking (false);
      s.register (sel, SelectionKey.OP_ACCEPT);

      // Main loop
      while(true)
    {
      int n = sel.select (100);
      //int n = sel.selectNow (); // 100% ercent CPU!
      if (n == 0) continue;

      Set<SelectionKey> keys = sel.selectedKeys (); // Get Keys selected
      Iterator<SelectionKey> it = keys.iterator (); // Create an Iterator to go through them

      // Process Selection
      while (it.hasNext ()) // Navigate the keys using the iterator
        {
          SelectionKey key = it.next (); // Current Key
          if (key.isAcceptable ()) // It is a connection attempt to our server?
        {
          System.out.println ("! Connection accepted");
          
          ServerSocketChannel ss =(ServerSocketChannel) key.channel (); // Retrieve the chanel
          SocketChannel sc = ss.accept();
          // In order to add the new socket to the selector it has to be nonBlocking
          sc.configureBlocking (false);
          sc.register (sel, SelectionKey.OP_READ);
          System.out.println ("+ Connection from " + sc.getRemoteAddress());          
        }
          else if (key.isConnectable ()) // Not used.. Relevant for clients on non-blocking mode
        {
          System.out.println ("Connection established");
        }
          else if (key.isReadable ()) // Is there data to read?
        {
          SocketChannel sc =(SocketChannel) key.channel (); // Retrieve relevant channel
          ByteBuffer    buf = ByteBuffer.allocate (1024);   // Let's use NIO Buffers
          int           len = sc.read (buf);
          
          if (len <= 0)
            {
              System.out.println ("! Error Reading from " +  sc.getRemoteAddress());
              System.out.println ("+ Closing Connection");
              key.cancel (); // Remove chanel from selector
              sc.close ();   // Close connection
              break;
          }
          
          // Socket channel was writting in buffer. Now we gonna read from buffer
          buf.flip ();   
          
          // Convert message into string.
          byte[]    msg = new byte [len];
          buf.get(msg);
          String   desc = new String (msg);
          
          
          System.out.println ("RECV ("+len+" bytes) >> "+ desc.trim());
          if (len > 0) // Did we received something?
            {
              // Build response
              ByteBuffer    obuf = ByteBuffer.allocate ( len + 8);
              obuf.put ("ECHO : ".getBytes());
              obuf.put (msg);
              // We are done, ready to write back
              obuf.flip();
              sc.write (obuf);
            }
          buf.clear ();
          
        }
          it.remove ();
        }
    }
    }
    catch (Exception e) {
      e.printStackTrace ();
    }
  }
}

Alternatively, we can call select using the Consumer<SelectionKey> interface. In this case the difference is minimal, but in more complex cases it may be handy. For reference here is the new version using a lambda expression.

import java.net.*;
import java.util.Set;
import java.util.Iterator;
import java.nio.channels.*;
import java.nio.ByteBuffer;

import java.util.function.Consumer;

class TCPServerNIOSelectorConsumer {
  public static final int port = 1234;

  public static void main (String[] args) {
    System.out.println ("NIO Server");

    try
      {
    ServerSocketChannel s;    // Socket Server
    Selector            sel;  // Selector
    // Create server socket and bind
    s = ServerSocketChannel.open();
    s.socket().bind(new InetSocketAddress(port));
    
    // Create selector and add server socket to accept connections
    sel = Selector.open ();
    s.configureBlocking (false);
    s.register (sel, SelectionKey.OP_ACCEPT);
    
    // Main loop
    while(true)
      {
        Consumer<SelectionKey> echo_srv = (SelectionKey key) -> 
          {
        try {
          if (key.isAcceptable ()) // It is a connection attempt to our server?
            {
              System.out.println ("! Connection accepted");
              
              ServerSocketChannel ss =(ServerSocketChannel) key.channel (); // Retrieve the chanel
              SocketChannel sc = ss.accept();
              // In order to add the new socket to the selector it has to be nonBlocking
              sc.configureBlocking (false);
              sc.register (sel, SelectionKey.OP_READ);
              System.out.println ("+ Connection from " + sc.getRemoteAddress());          
            }
          else if (key.isReadable ()) // Is there data to read?
            {
              SocketChannel sc =(SocketChannel) key.channel (); // Retrieve relevant channel
              ByteBuffer    buf = ByteBuffer.allocate (1024);   // Let's use NIO Buffers
              int           len = sc.read (buf);
              
              if (len <= 0)
            {
              System.out.println ("! Error Reading from " +  sc.getRemoteAddress());
              System.out.println ("+ Closing Connection");
              key.cancel (); // Remove chanel from selector
              sc.close ();   // Close connection
              return;
            }
              // Socket channel was writting in buffer. Now we gonna read from buffer
              buf.flip ();   
              
              // Convert message into string.
              byte[]    msg = new byte [len];
              buf.get(msg);
              String   desc = new String (msg);
              
              System.out.println ("RECV ("+len+" bytes) >> "+ desc.trim());
              if (len > 0) // Did we received something?
            {
              // Build response
              ByteBuffer    obuf = ByteBuffer.allocate ( len + 8);
              obuf.put ("ECHO : ".getBytes());
              obuf.put (msg);
              // We are done, ready to write back
              obuf.flip();
              sc.write (obuf);
            }
              buf.clear ();
              
            }
        } catch (Exception e) {e.printStackTrace ();}
          };
        sel.select (echo_srv, 100);
      }
      }
    catch (Exception e) { e.printStackTrace ();}
  }
}

Conclusions

In this instalment we have seen how to write network servers able to serve multiple clients simultaneously without using threads. For that we have used the select system call to build a C version and then we wrote a Java version using the Selector class that somehow encapsulates the same behaviour.


 
Tu publicidad aquí :)