JetCracker

Life-time learner's blog

[Java] Worker thread – thread-safe processing items one by one

Often we face the problem of online collecting and handling some data, abtained from various external systems and devices. In some cases it is more convenient to execute the processing code in a different thread. Here is how I usually solve this problem.

Assume the data are divided into chunks which are obtained one by one. The following class contains all necessary logic to process these chunks (*** updated according to the improvements suggested by Valeriy Ovchinnikov).

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public abstract class WorkerThread<T> extends Thread {

    public static final long TIMEOUT = 200;
    private final BlockingQueue<T> queue = new LinkedBlockingQueue();

    @Override
    public void run() {
        while (!isInterrupted()) {
            try {
                T item = queue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
                if (!isInterrupted() && item != null) {
                    processItem(item);
                }
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void put(T e) {
        queue.add(e);
    }

    public abstract void processItem(T item);
}

All you need is to extend this class and implement its abstract method processItem(T item). The example of it follows.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public abstract class WorkerThread extends Thread {

    public static final long TIMEOUT = 200;
    private final BlockingQueue queue = new LinkedBlockingQueue();

    @Override
    public void run() {
        while (!isInterrupted()) {
            try {
                T item = queue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
                if (!isInterrupted() && item != null) {
                    processItem(item);
                }
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void put(T e) {
        queue.add(e);
    }

    public abstract void processItem(T item);
}

To use this class you need to create and start the thread and call put() whenever you want to process next data item.

HRDBWorker hrDBWorker = new HRDBWorker();
hrDBWorker.start();
...
htDBWorker.put(nextItem);

Hopefully, this is the useful piece of code. You are welcome to comment.

Advertisements

2 responses to “[Java] Worker thread – thread-safe processing items one by one

  1. Valeriy Ovchinnikov October 29, 2013 at 16:44

    Your should better use Thread.currentThread.interrupt() when catching InterruptedException in your run() method in order to keep state (see http://stackoverflow.com/questions/4906799/why-invoke-thread-currentthread-interrupt-when-catch-any-interruptexception).
    You should also take a look at Executor Framework. In your case using Executors.newSingleThreadExecutor() will provide more stability (in case of unexpected WorkerThread dieing it’ll be replaced by new WorkerThread) and more flexibility. I mean, you can change executor any time you want to tune you work process. Executors also provide you a graceful way to tear down WorkerThread. Take a look at lifecycle methods like shutdown(), shutdownNow() and awaitTermination().

    • jetcracker October 29, 2013 at 18:01

      Thank you, Valeriy, for you comment. I’ll add the call of interrupt() to the catch block.

      I will definitely look into Executor Framework.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: