Concurrent Loops Using Java Closures (original) (raw)
The java.util.concurrent package has very nice support for writing concurrent loops. I used it recently to improve the performance of some code in the Google Calendar Server. In the Calendar Server, we have a class representing an event on a calendar. The following method on an event computed the set of attendees to the event. The old code looked something like this:
public Collection getAttendees() {
List result = new ArrayList();
for (EventResponse r : getResponses()) {
if (r.mayAttend()) {
result.add(r.getAttendee());
}
}
return result;
}
The problem with this code, it turned out, was that getAttendee() has to talk to a remote server to look up the Principal in a database, and although the remote server is very fast, it is far enough away that there is some latency. Since this loop is sequential, when there are many responses it spends most of its time waiting for a response from the remote server, over and over again. Consequently the call to getAttendees() was very slow for events with many attendees. One solution to this kind of problem would be to send a single batch request, and that is probably the best long-term solution to the problem, but the server in this case doesn't yet know how to handle batch requests. So our fix to the immediate performance problem was to use java.util.concurrent and make the requests in parallel; this did speed things up significantly:
public Collection getAttendees() {
final List result = new ArrayList();
CompletionService ecs =
new ExecutorCompletionService(threadPool);
for (final EventResponse r : getResponses()) {
ecs.submit(new Callable() {
public Void call() {
if (r.mayAttend()) {
try {
Principal attendee = r.getAttendee();
synchronized (result) {
result.add(attendee);
}
} catch (Throwable ex) {
LOGGER.log(Level.SEVERE, "Uncaught Exception", ex);
}
}
return null;
}
});
}
// wait for the tasks to complete
for (final EventResponse r : getResponses()) {
try {
/*discard*/ ecs.take().get();
} catch (InterruptedException ex) {
throw new AssertionError(ex); // shouldn't happen
} catch (ExecutionException ex) {
LOGGER.log(Level.SEVERE, "Uncaught Exception", ex);
}
}
return result;
}
When I find code like this I have a few reactions. First, my eyes glaze over at the complexity. Then, if I'm interested, I look carefully at the code to try to understand it. After all, I'm likely to want to do something like this again someday. Finally, I bookmark it so I can add it to my bag of tricks.
I don't think writing a concurrent loop should have to be so complex. Here is what I would like to have written:
public Collection getAttendees() {
List result = new ArrayList();
for eachConcurrently(EventResponse r : getResponses(), threadPool) {
if (r.mayAttend()) {
Principal attendee = r.getAttendee();
synchronized (result) {
result.add(attendee);
}
}
}
return result;
}
You might think that in order to do this kind of thing we would need to add a concurrent looping statement to the language. Actually, it is possible to add the concurrent looping construct as a library method if you have closures in the language! It isn't trivial to write, but that's why we have people like Doug Lea in the world. An API like this "for eachConcurrently" thing should be written once by an expert and placed into the JDK for everyone to use.
What should happen if you use continue, break, or return within the body of this loop, or throw an exception? The continue case is easy: it just completes execution of that one iteration, and the other iterations proceed on their merry way. The semantics of break are a bit subtle, but obvious once you realize this is supposed to act like a loop: it completes the current iteration and cancels any other iterations that have not completed, and control returns to the caller of for eachConcurrently. Handling a return statement is similar: it cancels any uncompleted iterations and returns from the enclosing method, which in this case would be getAttendees. Finally, any exception that propagates out of a loop iteration cancels uncompleted iterations and propagates from the loop.
p.s.: Martin Buchholz offered the follow improved version of the code using java.util.concurrent:
public Collection getAttendees() {
final Collection result
= new ConcurrentLinkedQueue();
final Collection<Callable> tasks
= new ArrayList<Callable>();
for (final EventResponse r : getResponses())
tasks.add(Executors.callable(new Runnable() { public void run() {
try {
if (r.mayAttend())
result.add(r.getAttendee());
} catch (Throwable ex) {
LOGGER.log(Level.SEVERE, "Uncaught Exception", ex);
}}}));
try {
threadpool.invokeAll(tasks);
} catch (InterruptedException ex) {}
return new ArrayList(result);
}