JetCracker

Life-time learner's blog

Tag Archives: thread-safe

[Java] Worker thread – thread-safe processing items one by one

Often we face the problem of online collecting and handling some data, abtained from various external systems and devices. In some cases it is more convenient to execute the processing code in a different thread. Here is how I usually solve this problem.

Assume the data are divided into chunks which are obtained one by one. The following class contains all necessary logic to process these chunks (*** updated according to the improvements suggested by Valeriy Ovchinnikov).

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public abstract class WorkerThread<T> extends Thread {

    public static final long TIMEOUT = 200;
    private final BlockingQueue<T> queue = new LinkedBlockingQueue();

    @Override
    public void run() {
        while (!isInterrupted()) {
            try {
                T item = queue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
                if (!isInterrupted() && item != null) {
                    processItem(item);
                }
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void put(T e) {
        queue.add(e);
    }

    public abstract void processItem(T item);
}

All you need is to extend this class and implement its abstract method processItem(T item). The example of it follows.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public abstract class WorkerThread extends Thread {

    public static final long TIMEOUT = 200;
    private final BlockingQueue queue = new LinkedBlockingQueue();

    @Override
    public void run() {
        while (!isInterrupted()) {
            try {
                T item = queue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
                if (!isInterrupted() && item != null) {
                    processItem(item);
                }
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void put(T e) {
        queue.add(e);
    }

    public abstract void processItem(T item);
}

To use this class you need to create and start the thread and call put() whenever you want to process next data item.

HRDBWorker hrDBWorker = new HRDBWorker();
hrDBWorker.start();
...
htDBWorker.put(nextItem);

Hopefully, this is the useful piece of code. You are welcome to comment.

Advertisements