Montag, 16. Dezember 2013

A Executor is not a Thread - or: correct ThreadPoolExecutor error handling

Java 1.5 introduced the Executor framework. In summery: if you have some tasks (let's say: 20) and you want them to be processed in parallel by a couple of threads (let's say: 6), then a Executor is the solution you want to look for. But there are some surprising caveats which may lead to problems. And they are hard to diagnose.

Introduction

Executor is a interface. The ExecutorService interface extends Executor. One of the standard implementations of ExecutorService is ThreadPoolExecutor. This one manages some threads in a pool and executes the tasks you give it. You do this in form of a list of Runnable instances. A typical Runnable implementation looks like this:

public class MyWorker implements Runnable {
    private final Object data;

    public MyWorker(final Object data) {
        this.data = data;
    }

    @Override
    public void run() {
        process();
    }

    private void process() {
        // process data
    }
}

You initialize a ThreadPoolExecutor like this:

int nThreads = 8;
Executor executor = new ThreadPoolExecutor(nThread, nThreads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

And since this is so cumbersome, there is a helper class for that:

int nThreads = 8;
Executor executor = Executors.newFixedThreadPool(nThreads);

This is how to use a Executor wrapped in a method:

public void executeRunnables(final List<Runnable> runnables) {
    int nThreads = 8;
    Executor executor = Executors.newFixedThreadPool(nThreads);
    for (final Runnable command : runnables) {
        executor.execute(command);
    }
}

This method would return to the caller immediately after creating the executor. But usually you want to wait until all the tasks have been processed before returning to the caller. For this purpose ExecutorService defines the methods shutdown() and awaitTermination():

public void executeRunnables(final List<Runnable> runnables) throws InterruptedException {
    final int nThreads = 8;
    final ExecutorService executor = Executors.newFixedThreadPool(nThreads);

    for (final Runnable command : runnables) {
        executor.execute(command);
    }

    executor.shutdown();

    executor.awaitTermination(2, TimeUnit.SECONDS);
}

(notice the change of executor type from Executor to ExecutorService) shutdown() puts the executor in "finish your work" mode. In this mode the executor will not accept new tasks. awaitTermination() waits until all threads processed all tasks. The time out takes care that your program doesn't wait forever if something goes wrong.

Error handling

So what if something goes wrong? What if one of your tasks throws an exception? How do you handle that? How do you even know something gone wrong? One possible approach is to catch and collect all exception inside the Runnable implementation:

public class MyRunnable implements Runnable {
    private final Object            data;
    private final List<Exception>   exceptions;

    public MyRunnable(final Object data, final List<Throwable> exceptions) {
        this.data = data;
        this.exceptions = exceptions;
    }

    @Override
    public void run() {
        try {
            process();
        } catch (final Exception ex) {
            exceptions.add(ex);
        }
    }

    private void process() {
        // process data
    }
}

But this approach has two major drawbacks: First, it would also catch the InterruptedException which may be used to control the thread under normal conditions. And second, the responsibility of error handling is moved to each Runnable implementation. You are going to implement a lot of Runnables and it's easy to forget something. Executor error handling calls for a generic solution.

Java 1.5 adds a method to register exception handlers for exactly this purpose: Thread.setUncaughtExceptionHandler(). Exceptions which are not handled by the run() method of the Thread implementation, will be forwarded to this handler. Let's modify the above example to use a exception handler:

/**
 * Implementation of a UncaughtExceptionHandler, which stores all exceptions in a List.
 */
private static class ExceptionCollector implements UncaughtExceptionHandler {
    final List<Throwable> exceptions = Collections.synchronizedList(new LinkedList<Throwable>());

    @Override
    public void uncaughtException(final Thread t, final Throwable e) {
        exceptions.add(e);
    }
}

/**
 * A ThreadFactory, which registers a UncaughtExceptionHandler.
 */
private static class ThreadWithUncaughtExHandlerFactory implements ThreadFactory {
    private final UncaughtExceptionHandler    exHandler;

    public ThreadWithUncaughtExHandlerFactory(final UncaughtExceptionHandler exHandler) {
        this.exHandler = exHandler;
    }

    @Override
    public Thread newThread(final Runnable r) {
        final Thread t = new Thread(r);
        t.setUncaughtExceptionHandler(exHandler);
        return t;
    }
}

public void executeRunnables(final List<Runnable> runnables) throws InterruptedException {
    final int nThreads = 8;
    // UnhandledExceptionHandler which will collect Exceptions:
    final ExceptionCollector exHandler = new ExceptionCollector();
    // create a executor with a custom Thread factory:
    final ExecutorService executor = Executors.newFixedThreadPool(nThreads,
                                       new ThreadWithUncaughtExHandlerFactory(exHandler));

    for (final Runnable command : runnables) {
        executor.execute(command);
    }

    executor.shutdown();

    executor.awaitTermination(2, TimeUnit.SECONDS);

    if (exHandler.exceptions.size() > 0) {
        // rethrow first of the collected exceptions
        throw new RuntimeException(exHandler.exceptions.size() +
                    " exceptions occured. First exception:", exHandler.exceptions.get(0));
    }
}


Now this looks good! If a exception is not handled by the Runnable implementations the Thread will get it and the Thread will pass it on to the uncaught exception handler. The handler will store it in the list for later usage. executeRunnables() waits until all Threads are done with work and checks the exception list then. If there is a entry it will be wrapped in a RuntimeException and rethrown. Instead of just passing the first exception, it's also possible to append the whole exception list to the thrown exception. This way the caller will be notified.

Or won't it?

Well, this article would probably not exist if everything would be that simple like it looks. The truth is: it doesn't work. At least not always. Which makes it even worse. Sometimes it works and sometimes it doesn't.

Problem analysis

A Thread is capable of processing only one Runable in general. When the Thread.run() method exits the Thread dies. The ThreadPoolExecutor implements a trick to make a Thread process multiple Runnables: it uses a own Runnable implementation. The threads are being started with a Runnable implementation which fetches other Runanbles (your Runnables) from the ExecutorService and executes them: ThreadPoolExecutor -> Thread -> Worker -> YourRunnable. When a uncaught exception occurs in your Runnable implementation it ends up in the finally block of Worker.run(). In this finally block the Worker class tells the ThreadPoolExecutor that it "finished" the work. The exception not yet arrived at the Thread class but ThreadPoolExecutor already registered the worker as idle.

And here's where the fun begins. The awaitTermination() method will be invoked when all Runnables have been passed to the Executor. This happens very quickly so that probably not any of the Runnables finished their work. A Worker will switch to "idle" if a exception occurs, before the Exception reaches the Thread class. If the situation is similar for the other threads (or if they finished their work), all Workers signal "idle" and awaitTermination() returns. The main thread reaches the code line where it checks the size of the collected exception list. And this may happen before any (or some) of the Threads had the chance to call the UncaughtExceptionHandler. It depends on the order of execution if or how many exceptions will be added to the list of uncaught exceptions, before the main thread reads it.

A very unexpected behaviour. But I won't leave you without a working solution. So let's make it work.

Correct solution

We are lucky that the ThreadPoolExecutor class was designed for extendibility. There is a empty protected method afterExecute(Runnable r, Throwable t). This will be invoked directly after the run() method of our Runnable before the worker signals that it finished the work. The correct solution is to extend the ThreadPoolExecutor to handle uncaught exceptions:

public class ExceptionAwareThreadPoolExecutor extends ThreadPoolExecutor {
    private final List<Throwable> uncaughtExceptions = 
                    Collections.synchronizedList(new LinkedList<Throwable>());

    @Override
    protected void afterExecute(final Runnable r, final Throwable t) {
        if (t != null) uncaughtExceptions.add(t);
    }

    public List<Throwable> getUncaughtExceptions() {
        return Collections.unmodifiableList(uncaughtExceptions);
    }
}




Keine Kommentare:

Kommentar veröffentlichen