Architecture of a Highly Scalable NIO-Based Server (original) (raw)
- Contents
- Threading Architecture
- The Reactor Pattern
- Component Architecture
- Acceptor
- Dispatcher
- Dispatcher-Level EventHandler
- Application-Level EventHandler
- Conclusion
- Resources
If you are asked to write a highly scalable Java-based server, it won't take long to decide to use the Java NIO package. To get your server running, you will probably spend a lot of time reading blogs and tutorials to understand the thread synchronization needs of the NIO Selector
class and to deal with common pitfalls. This article describes the basic architecture of a connection-oriented NIO-based server. It takes a look at a preferred threading model and discusses the basic components of such a server.
Threading Architecture
The first and most intuitive way to implement a multi-threaded server is to follow the _thread-per-connection_approach. This is the traditional pre-Java-1.4 solution, caused by the lack of non-blocking I/O support in older Java versions. The thread-per-connection approach uses an exclusive worker thread for each connection. Within the handling loop, a worker thread waits for new incoming data, processes the request, returns the response data, and calls the blocking socket's read
method again.
` public class Server { private ExecutorService executors = Executors.newFixedThreadPool(10); private boolean isRunning = true;
public static void main(String... args) throws ... { new Server().launch(Integer.parseInt(args[0])); }
public void launch(int port) throws ... { ServerSocket sso = new ServerSocket(port); while (isRunning) { Socket s = sso.accept(); executors.execute(new Worker(s)); } }
private class Worker implements Runnable { private LineNumberReader in = null; ...
Worker(Socket s) throws ... {
in = new LineNumberReader(new InputStreamReader(...));
out = ...
}
public void run() {
while (isRunning) {
try {
// blocking read of a request (line)
String request = in.readLine();
// processing the request
...
String response = ...
// return the response
out.write(resonse);
out.flush();
} catch (Exception e ) {
...
}
}
in.close();
...
}
} }`
There is always a one-to-one relationship between simultaneous client connections and the number of concurrent worker threads. Because each connection has an associated thread waiting on the server side, very good response times can be achieved. However, higher loads require a higher number of running, concurrent threads, which limits scalability. In particular, long-living connections like persistent HTTP connections lead to a lot of concurrent worker threads, which tend to waste their time waiting for new client requests. In addition, hundreds or even thousands of concurrent threads can waste a great deal of stack space. Note, for example, that the default Java thread stack size for Solaris/Sparc is 512 KB.
If the server has to handle a high number of simultaneous clients and tolerate slow, unresponsive clients, an alternative threading architecture is needed. The thread-on-event_approach implements such requirements in a very efficient way. The worker threads are independent from the connections and will only be used to handle specific events. For instance, if a_data received event occurs, a worker thread will be used to process the application-specific encoding and service tasks (or at least to start them). Once this job is complete, the worker will be returned to the thread pool. This approach requires performing the socket I/O operations in a non-blocking manner. The socket'sread
or write
method calls have to be non-blocking. Additionally, an event system is required; it signals if new data is available, which in turn initiates the socketread
call. This removes the one-to-one relationship between waiting reads and taken threads. The design of such an event-driven I/O system is described by the Reactor pattern.
The Reactor Pattern
The Reactor pattern, illustrated in Figure 1, separates the detection of events like readiness for read or readiness for accepting and the processing of these events. If a readiness event occurs, an event handler will be notified to perform the appropriate processing within dedicated worker threads.
Figure 1. A NIO-based Reactor pattern implementation
To participate in the event architecture, the connection'sChannel
has to be registered on aSelector
. This will be done by calling theregister
method. Although this method is part of theSocketChannel
, the channel will be registered on theSelector
, not the other way around.
` ... SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false);
// register the connection SelectionKey sk = channel.register(selector, SelectionKey.OP_READ); ...`
To detect new events, the Selector
provides the capability to ask the registered channels for their readiness events. By calling the select
method, theSelector
collects the readiness events of the registered channels. This method call blocks until at least one event has been occurred. In this case, the method returns the number of connections that have become ready for I/O operations since the last select
call. The selected connections can be retrieved by calling the Selector's selectedKey
method. This method returns a set of SelectionKey
objects, which holds the IO event status and the reference of the connection's Channel
.
A Selector
is held by the Dispatcher
. This is a single-threaded active class that surrounds theSelector
. The Dispatcher
is responsible to retrieve the events and to dispatch the handling of the consumed events to the EventHandler
. Within the dispatch loop, the Dispatcher
calls the Selector
'sselect
method to wait for new events. If at least one event has been occurred, the method call returns and the associated channel for each event can be acquired by calling theselectedKeys
method.
` ... while (isRunning) { // blocking call, to wait for new readiness events int eventCount = selector.select();
// get the events Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove();
// readable event?
if (key.isValid() && key.isReadable()) {
eventHandler.onReadableEvent(key.channel());
}
// writable event?
if (key.isValid() && key.isWritable()) {
key.interestOps(SelectionKey.OP_READ); // reset to read only
eventHandler.onWriteableEvent(key.channel());
}
...
} ... }`
Based on an event like readiness for read or readiness for write, the EventHandler
will be called by theDispatcher
to process the event. TheEventHandler
decodes the request data, processes the required service activities, and encodes the response data. Because worker threads are not forced to waste time by waiting for new requests to open a connection, the scalability and throughput of this approach is conceptually only limited by system resources like CPU or memory. That said, the response times wouldn't be as good as for the thread-per-connection approach, because of the required thread switches and synchronization. The challenge of the event-driven approach is therefore to minimize synchronizations and optimize thread management, so that this overhead will be negligible.
Component Architecture
Most highly scalable Java servers are built on the top of the_Reactor pattern_. By doing this, the classes of the Reactor pattern will be enhanced by additional classes for connection management, buffer management, and for load balancing reasons. The entry class of such a server is the Acceptor
. This arrangement is shown in Figure 2.
Figure 2. Major components of a connection-oriented server
Acceptor
Every new client connection of a server will be accepted by the single Acceptor
, which is bound to the server port. The Acceptor
is a single threaded active class. Because it is only responsible for handling the very short-running client connection request, it is often sufficient to implement theAcceptor
using the blocking I/O model. TheAcceptor
gets the handle of a new connection by calling the ServerSocketChannel
's blockingaccept
method. The new connection will be registered to a Dispatcher
. After this, the connection participates in event handling.
Because the scalability of a single Dispatcher
is limited, often a small pool of Dispatcher
s will be used. One reason for this limitation is the operating-system-specific implementation of the Selector
. Most popular operating systems map a SocketChannel
to a file handle in a one-to-one relationship. Depending on the concrete system, the maximum number of file handles per Selector
is limited in a different way.
` class Acceptor implements Runnable { ... void init() { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(true); serverChannel.socket().bind(new InetSocketAddress(serverPort)); }
public void run() { while (isRunning) { try { SocketChannel channel = serverChannel.accept();
Connection con = new Connection(channel, appHandler);
dispatcherPool.nextDispatcher().register(con);
} catch (...) {
...
}
}
} }`
In the example code, a Connection
object holds theSocketChannel
and an application-level event handler. These classes will be described below.
Dispatcher
By calling the Dispatcher
's register
method, the SocketChannel
will be registered on the underlying Selector
. Here is where the trouble comes in. The Selector
manages the registered channels internally by using key sets. This means that by registering a channel, an associated SelectionKey
will be created and be added to the Selector's registered key set. At the same time, the concurrent dispatcher thread could call theSelector
's select
method, which also accesses the key set. Because the key sets are not thread-safe, an unsynchronized registration in the context of theAcceptor
thread can lead to deadlocks and race conditions. This can be solved by implementing the selector guard object idiom, which allows suspending the dispatcher thread temporarily. See "How to Build a Scalable Multiplexed Server with NIO" (PDF) for an explanation of this approach.
` class Dispatcher implements Runnable { private Object guard = new Object(); …
void register(Connection con) {
// retrieve the guard lock and wake up the dispatcher thread
// to register the connection's channel
synchronized (guard) {
selector.wakeup();
con.getChannel().register(selector, SelectionKey.OP_READ, con);
}
// notify the application EventHandler about the new connection
…
}
void announceWriteNeed(Connection con) { SelectionKey key = con.getChannel().keyFor(selector); synchronized (guard) { selector.wakeup(); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } }
public void run() { while (isRunning) { synchronized (guard) { // suspend the dispatcher thead if guard is locked } int eventCount = selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// read event?
if (key.isValid() && key.isReadable()) {
Connection con = (Connection) key.attachment();
disptacherEventHandler.onReadableEvent(con);
}
// write event?
…
}
}
} }`
After a connection has been registered, theSelector
listens for readiness events of this connection. If a event occurs, the appropriated callback method of the Dispatcher
's event handler will be called by passing the associated connection.
Dispatcher-Level EventHandler
The first activity performed while processing a readiness for read event is to call the channel's read
method. In contrast to the streaming interface, the Channel
interface requires that a read buffer has to be passed over. Often direct-allocated ByteBuffer
s will be used. Direct buffers reside in native memory, bypassing the Java heap space. By using direct buffers, socket IO operations will be performed without the need to create internal intermediate buffers.
Normally the read
call will be performed very quickly. Depending on the operating system, the socket read operation often only puts a copy of the received data from the kernel memory space into the read buffer, which resides in the user-controlled memory space.
The received data will be appended to the connection's_thread-safe_ read queue for further processing. Based on the result of the I/O operation, application-specific tasks have to be processed. Such tasks will be processed by the assigned application-level event handler. This handler will typically called by using a worker thread.
` class DispatcherEventHandler { ...
void onReadableEvent(final Connection con) { // get the received data ByteBuffer readBuffer = allocateMemory(); con.getChannel().read(readBuffer); ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);
// append it to read queue
con.getReadQueue().add(data);
...
// perform further operations (encode, process, decode)
// by a worker thread
if (con.getReadQueue().getSize() > 0) {
workerPool.execute(new Runnable() {
public void run() {
synchronized (con) {
con.getAppHandler().onData(con);
}
}
});
}
}
void onWriteableEvent(Connection con) { ByteBuffer[] data = con.getWriteQueue().drain(); con.getChannel().write(data); // write the data ...
if (con.getWriteQueue().isEmpty()) {
if (con.isClosed()) {
dispatcher.deregister(con);
}
} else {
// there is remaining data to write
dispatcher.announceWriteNeed(con);
}
} }`
Within the application-specific tasks, data will be encoded, services will be performed, and data will be written. By writing data, the data to send will be appended to the write queue, and theDispatcher
's announceWriteNeed
method will be called. This method causes the Selector
to listen for_readiness for write_ events. If such an event occurs, theDispatcher
-level event handler's methodonWriteableEvent
will be performed. It gets the data from the connection's write queue and performs the required write I/O operation. Trying to write data in a direct way, by bypassing this event approach, will end in deadlocks and race conditions.
Application-Level EventHandler
In contrast to the Dispatcher
's event handler, the application-specific event handler listens for higher-level connection-oriented events, like connection established,data received, or connection disconnected. The concrete event handler design is one of the major differences between NIO server frameworks like SEDA, MINA, or emberIO. Such frameworks often implement a multi-staged architecture, where chains of event handlers can be used. This allows adding handlers like SSLHandler
orDelayedWriteHandler
, which intercept the request/response processing. The following example shows an application-level handler based on the xSocket framework. The xSocket framework supports different handler interfaces that define callback methods to be implemented by application-specific code.
` class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... { private static final String DELIMITER = ... private Mailbox mailbox = ...
public static void main(String... args) throws ... { new MultithreadedServer(110, new POP3ProtocolHandler()).run(); }
public boolean onConnect(INonBlockingConnection con) throws ... { if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) { con.setWriteTransferRate(5); // reduce transfer: 5byte/sec }
con.write("+OK My POP3-Server" + DELIMITER);
return true;
}
public boolean onData(INonBlockingConnection con) throws ... { String request = con.readStringByDelimiter(DELIMITER);
if (request.startsWith("QUIT")) {
mailbox.close();
con.write("+OK POP3 server signing off" + DELIMITER);
con.close();
} else if (request.startsWith("USER")) {
this.user = request.substring(4, request.length());
con.write("+OK enter password" + DELIMITER);
} else if (request.startsWith("PASS")) {
String pwd = request.substring(4, request.length());
boolean isAuthenticated = authenticator.check(user, pwd);
if (isAuthenticated) {
mailbox = MailBox.openAndLock(user);
con.write("+OK mailbox locked and ready" + DELIMITER);
} else {
...
}
} else if (...) {
...
}
return true;
} }`
To ease in accessing the underlying read and write queue, theConnection
object provides several convenienceread
and write
methods for stream- and channel-oriented operations.
By closing the connection, the underlying implementation initiates a writeable event round-trip to flush the write queue. The connection will be terminated after the remaining data has been written. Besides such a controlled termination, connections can be disconnected for other reasons. For instance, hardware malfunctions could cause the termination of a TCP-based connection. Such a situation can only be detected by performing read or write operations on the socket, or by idle timeouts. Most NIO frameworks provide a built-in function to handle such uncontrolled terminations.
Conclusion
An event-driven non-blocking architecture is a fundamental layer to implement highly efficient, scalable, and reliable servers. The challenge is to minimize the thread synchronization overhead and to optimize the connection/buffer management. This will be the hardest part to program.
But there is no need to reinvent the wheel. Server frameworks like xSocket, emberIO, SEDA, or MINA abstract the low-level event handling and thread management to ease the creation of highly scalable servers. Most of these server frameworks also support features like SSL or UDP, which haven't been discussed in this article.
Resources
- "Scalable IO in Java" (PDF) describes event-driven processing by using Java NIO
- "Tricks and Tips with NIO, Part 2: Why SelectionKey.attach() Is Evil" describes how a memory leak occurs by a unwary use of the SelectionKey's
attach
method. - "Pico Threads: Lightweight Threads in Java" shows the problems with large-scale threaded programming and event-based techniques.
- A Reactor pattern description by Douglas C. Schmidt (PDF)
- Unix Network Programming: The Sockets Networking API gives a good overview about network programming in general, and gives a good impression what happens behind the Java I/O operations on the operating-system level.
- xSocket is a LGPL NIO-based library to build network applications. Most example code of this article has been written based on xSocket.
Gregor Roth works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.