Architecture of a Highly Scalable NIO-Based Server (original) (raw)

If you are asked to write a highly scalable Java-based server, it won't take long to decide to use the Java NIO package. To get your server running, you will probably spend a lot of time reading blogs and tutorials to understand the thread synchronization needs of the NIO Selector class and to deal with common pitfalls. This article describes the basic architecture of a connection-oriented NIO-based server. It takes a look at a preferred threading model and discusses the basic components of such a server.

Threading Architecture

The first and most intuitive way to implement a multi-threaded server is to follow the _thread-per-connection_approach. This is the traditional pre-Java-1.4 solution, caused by the lack of non-blocking I/O support in older Java versions. The thread-per-connection approach uses an exclusive worker thread for each connection. Within the handling loop, a worker thread waits for new incoming data, processes the request, returns the response data, and calls the blocking socket's read method again.

` public class Server { private ExecutorService executors = Executors.newFixedThreadPool(10); private boolean isRunning = true;

public static void main(String... args) throws ... { new Server().launch(Integer.parseInt(args[0])); }

public void launch(int port) throws ... { ServerSocket sso = new ServerSocket(port); while (isRunning) { Socket s = sso.accept(); executors.execute(new Worker(s)); } }

private class Worker implements Runnable { private LineNumberReader in = null; ...

Worker(Socket s) throws ... {
  in = new LineNumberReader(new InputStreamReader(...));
  out = ...
}

public void run() {
  while (isRunning) {
    try {
      // blocking read of a request (line) 
      String request = in.readLine();

      // processing the request
      ...
      String response = ...

      // return the response
      out.write(resonse);
      out.flush();
    } catch (Exception e ) { 
      ... 
    }
  }
  in.close();
  ...
} 

} }`

There is always a one-to-one relationship between simultaneous client connections and the number of concurrent worker threads. Because each connection has an associated thread waiting on the server side, very good response times can be achieved. However, higher loads require a higher number of running, concurrent threads, which limits scalability. In particular, long-living connections like persistent HTTP connections lead to a lot of concurrent worker threads, which tend to waste their time waiting for new client requests. In addition, hundreds or even thousands of concurrent threads can waste a great deal of stack space. Note, for example, that the default Java thread stack size for Solaris/Sparc is 512 KB.

If the server has to handle a high number of simultaneous clients and tolerate slow, unresponsive clients, an alternative threading architecture is needed. The thread-on-event_approach implements such requirements in a very efficient way. The worker threads are independent from the connections and will only be used to handle specific events. For instance, if a_data received event occurs, a worker thread will be used to process the application-specific encoding and service tasks (or at least to start them). Once this job is complete, the worker will be returned to the thread pool. This approach requires performing the socket I/O operations in a non-blocking manner. The socket'sread or write method calls have to be non-blocking. Additionally, an event system is required; it signals if new data is available, which in turn initiates the socketread call. This removes the one-to-one relationship between waiting reads and taken threads. The design of such an event-driven I/O system is described by the Reactor pattern.

The Reactor Pattern

The Reactor pattern, illustrated in Figure 1, separates the detection of events like readiness for read or readiness for accepting and the processing of these events. If a readiness event occurs, an event handler will be notified to perform the appropriate processing within dedicated worker threads.

A NIO-based Reactor pattern implementation
Figure 1. A NIO-based Reactor pattern implementation

To participate in the event architecture, the connection'sChannel has to be registered on aSelector. This will be done by calling theregister method. Although this method is part of theSocketChannel, the channel will be registered on theSelector, not the other way around.

` ... SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false);

// register the connection SelectionKey sk = channel.register(selector, SelectionKey.OP_READ); ...`

To detect new events, the Selector provides the capability to ask the registered channels for their readiness events. By calling the select method, theSelector collects the readiness events of the registered channels. This method call blocks until at least one event has been occurred. In this case, the method returns the number of connections that have become ready for I/O operations since the last select call. The selected connections can be retrieved by calling the Selector's selectedKeymethod. This method returns a set of SelectionKeyobjects, which holds the IO event status and the reference of the connection's Channel.

A Selector is held by the Dispatcher. This is a single-threaded active class that surrounds theSelector. The Dispatcher is responsible to retrieve the events and to dispatch the handling of the consumed events to the EventHandler. Within the dispatch loop, the Dispatcher calls the Selector'sselect method to wait for new events. If at least one event has been occurred, the method call returns and the associated channel for each event can be acquired by calling theselectedKeys method.

` ... while (isRunning) { // blocking call, to wait for new readiness events int eventCount = selector.select();

// get the events Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove();

// readable event?
if (key.isValid() && key.isReadable()) {
  eventHandler.onReadableEvent(key.channel());
}

// writable event? 
if (key.isValid() && key.isWritable()) {
  key.interestOps(SelectionKey.OP_READ); // reset to read only
  eventHandler.onWriteableEvent(key.channel());
}
...

} ... }`

Based on an event like readiness for read or readiness for write, the EventHandler will be called by theDispatcher to process the event. TheEventHandler decodes the request data, processes the required service activities, and encodes the response data. Because worker threads are not forced to waste time by waiting for new requests to open a connection, the scalability and throughput of this approach is conceptually only limited by system resources like CPU or memory. That said, the response times wouldn't be as good as for the thread-per-connection approach, because of the required thread switches and synchronization. The challenge of the event-driven approach is therefore to minimize synchronizations and optimize thread management, so that this overhead will be negligible.

Component Architecture

Most highly scalable Java servers are built on the top of the_Reactor pattern_. By doing this, the classes of the Reactor pattern will be enhanced by additional classes for connection management, buffer management, and for load balancing reasons. The entry class of such a server is the Acceptor. This arrangement is shown in Figure 2.

Major components of a connection-oriented server
Figure 2. Major components of a connection-oriented server

Acceptor

Every new client connection of a server will be accepted by the single Acceptor, which is bound to the server port. The Acceptor is a single threaded active class. Because it is only responsible for handling the very short-running client connection request, it is often sufficient to implement theAcceptor using the blocking I/O model. TheAcceptor gets the handle of a new connection by calling the ServerSocketChannel's blockingaccept method. The new connection will be registered to a Dispatcher. After this, the connection participates in event handling.

Because the scalability of a single Dispatcher is limited, often a small pool of Dispatchers will be used. One reason for this limitation is the operating-system-specific implementation of the Selector. Most popular operating systems map a SocketChannel to a file handle in a one-to-one relationship. Depending on the concrete system, the maximum number of file handles per Selector is limited in a different way.

` class Acceptor implements Runnable { ... void init() { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(true); serverChannel.socket().bind(new InetSocketAddress(serverPort)); }

public void run() { while (isRunning) { try { SocketChannel channel = serverChannel.accept();

    Connection con = new Connection(channel, appHandler);
    dispatcherPool.nextDispatcher().register(con);  
  } catch (...) {
    ...
  }
}

} }`

In the example code, a Connection object holds theSocketChannel and an application-level event handler. These classes will be described below.

Dispatcher

By calling the Dispatcher's registermethod, the SocketChannel will be registered on the underlying Selector. Here is where the trouble comes in. The Selector manages the registered channels internally by using key sets. This means that by registering a channel, an associated SelectionKey will be created and be added to the Selector's registered key set. At the same time, the concurrent dispatcher thread could call theSelector's select method, which also accesses the key set. Because the key sets are not thread-safe, an unsynchronized registration in the context of theAcceptor thread can lead to deadlocks and race conditions. This can be solved by implementing the selector guard object idiom, which allows suspending the dispatcher thread temporarily. See "How to Build a Scalable Multiplexed Server with NIO" (PDF) for an explanation of this approach.

` class Dispatcher implements Runnable { private Object guard = new Object(); …

void register(Connection con) { // retrieve the guard lock and wake up the dispatcher thread // to register the connection's channel synchronized (guard) { selector.wakeup();
con.getChannel().register(selector, SelectionKey.OP_READ, con); }

// notify the application EventHandler about the new connection 
…

}

void announceWriteNeed(Connection con) { SelectionKey key = con.getChannel().keyFor(selector); synchronized (guard) { selector.wakeup(); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } }

public void run() { while (isRunning) { synchronized (guard) { // suspend the dispatcher thead if guard is locked } int eventCount = selector.select();

  Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  while (it.hasNext()) {
    SelectionKey key = it.next(); 
    it.remove();

    // read event?
    if (key.isValid() && key.isReadable()) {
      Connection con = (Connection) key.attachment();
      disptacherEventHandler.onReadableEvent(con);
    }

    // write event?
    …
  }
}

} }`

After a connection has been registered, theSelector listens for readiness events of this connection. If a event occurs, the appropriated callback method of the Dispatcher's event handler will be called by passing the associated connection.

Dispatcher-Level EventHandler

The first activity performed while processing a readiness for read event is to call the channel's read method. In contrast to the streaming interface, the Channelinterface requires that a read buffer has to be passed over. Often direct-allocated ByteBuffers will be used. Direct buffers reside in native memory, bypassing the Java heap space. By using direct buffers, socket IO operations will be performed without the need to create internal intermediate buffers.

Normally the read call will be performed very quickly. Depending on the operating system, the socket read operation often only puts a copy of the received data from the kernel memory space into the read buffer, which resides in the user-controlled memory space.

The received data will be appended to the connection's_thread-safe_ read queue for further processing. Based on the result of the I/O operation, application-specific tasks have to be processed. Such tasks will be processed by the assigned application-level event handler. This handler will typically called by using a worker thread.

` class DispatcherEventHandler { ...

void onReadableEvent(final Connection con) { // get the received data ByteBuffer readBuffer = allocateMemory(); con.getChannel().read(readBuffer); ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);

// append it to read queue
con.getReadQueue().add(data); 
...

// perform further operations (encode, process, decode) 
// by a worker thread
if (con.getReadQueue().getSize() > 0) {
  workerPool.execute(new Runnable() {
    public void run() {
      synchronized (con) {
        con.getAppHandler().onData(con);
      }
    }
  }); 
}

}

void onWriteableEvent(Connection con) { ByteBuffer[] data = con.getWriteQueue().drain(); con.getChannel().write(data); // write the data ...

if (con.getWriteQueue().isEmpty()) {
  if (con.isClosed()) {
    dispatcher.deregister(con);
  }

} else {
   // there is remaining data to write
   dispatcher.announceWriteNeed(con); 
}

} }`

Within the application-specific tasks, data will be encoded, services will be performed, and data will be written. By writing data, the data to send will be appended to the write queue, and theDispatcher's announceWriteNeed method will be called. This method causes the Selector to listen for_readiness for write_ events. If such an event occurs, theDispatcher-level event handler's methodonWriteableEvent will be performed. It gets the data from the connection's write queue and performs the required write I/O operation. Trying to write data in a direct way, by bypassing this event approach, will end in deadlocks and race conditions.

Application-Level EventHandler

In contrast to the Dispatcher's event handler, the application-specific event handler listens for higher-level connection-oriented events, like connection established,data received, or connection disconnected. The concrete event handler design is one of the major differences between NIO server frameworks like SEDA, MINA, or emberIO. Such frameworks often implement a multi-staged architecture, where chains of event handlers can be used. This allows adding handlers like SSLHandler orDelayedWriteHandler, which intercept the request/response processing. The following example shows an application-level handler based on the xSocket framework. The xSocket framework supports different handler interfaces that define callback methods to be implemented by application-specific code.

` class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... { private static final String DELIMITER = ... private Mailbox mailbox = ...

public static void main(String... args) throws ... { new MultithreadedServer(110, new POP3ProtocolHandler()).run(); }

public boolean onConnect(INonBlockingConnection con) throws ... { if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) { con.setWriteTransferRate(5); // reduce transfer: 5byte/sec }

con.write("+OK My POP3-Server" + DELIMITER);
return true;

}

public boolean onData(INonBlockingConnection con) throws ... { String request = con.readStringByDelimiter(DELIMITER);

if (request.startsWith("QUIT")) {
  mailbox.close();
  con.write("+OK POP3 server signing off" + DELIMITER);
  con.close();

} else if (request.startsWith("USER")) {
  this.user = request.substring(4, request.length());
  con.write("+OK enter password" + DELIMITER);


} else if (request.startsWith("PASS")) {
  String pwd = request.substring(4, request.length());
  boolean isAuthenticated = authenticator.check(user, pwd);
  if (isAuthenticated) {
    mailbox = MailBox.openAndLock(user);
    con.write("+OK mailbox locked and ready" + DELIMITER);
  } else {
    ...
  }  
} else if (...) {
  ...
}
return true;

} }`

To ease in accessing the underlying read and write queue, theConnection object provides several convenienceread and write methods for stream- and channel-oriented operations.

By closing the connection, the underlying implementation initiates a writeable event round-trip to flush the write queue. The connection will be terminated after the remaining data has been written. Besides such a controlled termination, connections can be disconnected for other reasons. For instance, hardware malfunctions could cause the termination of a TCP-based connection. Such a situation can only be detected by performing read or write operations on the socket, or by idle timeouts. Most NIO frameworks provide a built-in function to handle such uncontrolled terminations.

Conclusion

An event-driven non-blocking architecture is a fundamental layer to implement highly efficient, scalable, and reliable servers. The challenge is to minimize the thread synchronization overhead and to optimize the connection/buffer management. This will be the hardest part to program.

But there is no need to reinvent the wheel. Server frameworks like xSocket, emberIO, SEDA, or MINA abstract the low-level event handling and thread management to ease the creation of highly scalable servers. Most of these server frameworks also support features like SSL or UDP, which haven't been discussed in this article.

Resources

Gregor Roth works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.