AtomicReferenceArray Example

0

In this article, we will look into an example of AtomicReferenceArray. We will use it to build a trackable Future task which can also be disposed anytime we want.
In order to submit a task asynchronously we call

Future<?> future = ExecutorService.submit(Callable)

If we want to cancel the future we call:

future.cancel(mayInterruptIfRunning);

If the request for future cancellation was called from another thread, we want the thread executing the task to be interrupted.
Assuming we have a pool of executor services, we also want the capability to track the how many of them are running using a parent that contains all the tasks currently running.
Thus to build a trackable and disposable task, we need to know:

  1. The thread in which the task is getting executed so we know whether the cancellation request has come from within or externally.
  2. The related parent object so we can remove the task from the parent on disposable.
  3. The future object created so we can cancel it if requested for.

We build a wrapper around the actual task and use AtomicReferenceArray to hold the above array of object references so that we can update the individual elements atomically.

In the below example, we create the actual task and wrap it in a TrackableTask by passing the parent. We then submit the trackable task to an ExecutorService which returns us the Future object. This future object we have to explicitly set to the trackable task so it can then atomically update its future element.
After few seconds before the task gets completed, we cancel it and then verify whether the task got canceled and removed from its parent.

AtomicReferenceArrayExample:

package com.javarticles.threads;

import static com.javarticles.threads.PrintUtils.print;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

 
public class AtomicReferenceArrayExample {
    private static final AtomicLong id = new AtomicLong(0);
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, new MyThreadFactory());
        try {
            MyTask actualTask = new MyTask(1);
            Parent parent = new Parent();
            TrackableTask trackableTask = new TrackableTask(actualTask, parent);
            Future<?> future = exec.submit((Callable<Object>)trackableTask);
            trackableTask.setFuture(future);
            
            System.out.println("Parent has the future?" + parent.contains(trackableTask));
            Thread.sleep(3000);
            
            System.out.println("Cancel the task");
            trackableTask.dispose();
            Thread.sleep(15000);
            
            if (future.isCancelled()) {
                System.out.println("Task is cancelled");
                System.out.println("After cancellation does parent has the future?" + parent.contains(trackableTask));
            }            
        } finally {
            exec.shutdown();
        }
    }

   private static class MyThreadFactory implements ThreadFactory {
       @Override
       public Thread newThread(Runnable runnable) {
           String threadName = "ThreadPoolExec" + "-" + id.incrementAndGet();
           Thread thread = new Thread(runnable, threadName);
           thread.setDaemon(true);
           thread.setPriority(Thread.NORM_PRIORITY);
           thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
               @Override
               public void uncaughtException(final Thread t, final Throwable e) {
                   print("Error in thread '{}'" + t.getName() + e);
               }
           });

           print("Created thread:" + thread + " for " + runnable);
           return thread;
       }
   }

    private static class MyTask implements Runnable {
        int i;
        MyTask(int i) {
            this.i = i;
        }
        
        @Override
        public void run() {
            print(toString());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String toString() {
            return "MyTask(" + i + ")";
        }        
    }

}

TrackableTask extends AtomicReferenceArray. It consists of three elements so we create an array of three.

    public TrackableTask(Runnable actual, Parent parent) {
        super(3);
        ....
    }

In order to set an element against an index we will use:

   void lazySet(int i, E newValue);

To fetch an array element, we will use:

   E get(int i);

TrackableTask:

package com.javarticles.threads;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReferenceArray;

public final class TrackableTask extends AtomicReferenceArray<Object> implements Runnable, Callable<Object>, Disposable {

    private static final long serialVersionUID = 4814435011827980193L;

    final Runnable actual;

    static final Object PARENT_DISPOSED = new Object();
    static final Object SYNC_DISPOSED = new Object();
    static final Object ASYNC_DISPOSED = new Object();
    static final Object DONE = new Object();

    static final int PARENT_INDEX = 0;
    static final int FUTURE_INDEX = 1;
    static final int THREAD_INDEX = 2;


    public TrackableTask(Runnable actual, Parent parent) {
        super(3);
        this.actual = actual;
        this.lazySet(0, parent);
        parent.add(this);
    }

    @Override
    public Object call() {
        run();
        return null;
    }

    @Override
    public void run() {
        stashCurrentThread();
        try {
            actual.run();
        } finally {
            nullifyCurrentThread();
            removeFromParentIfNeeded();
            setFutureDone();
        }
    }

    private void setFutureDone() {
        for (;;) {
            Object o = getFuture();
            if (isDisposed(o) || futureDone(o)) {
                break;
            }
        }
    }

    public void setFuture(Future<?> f) {
        for (;;) {
            Object o = getFuture();
            if (isDone(o)) {
                return;
            }
            if (isSyncDisposed(o)) {
                f.cancel(false);
                return;
            }
            if (isAsyncDisposed(o)) {
                f.cancel(true);
                return;
            }
            if (setFuture(o, f)) {
                return;
            }
        }
    }

    private boolean setFuture(Object o, Future<?> f) {
        return compareAndSet(FUTURE_INDEX, o, f);
    }

    private boolean isSyncDisposed(Object o) {
        return o == SYNC_DISPOSED;
    }
    
    private boolean isAsyncDisposed(Object o) {
        return o == ASYNC_DISPOSED;
    }

    private boolean isDone(Object o) {
        return o == DONE;
    }
    
    private boolean futureDone(Object o) {
        return o == SYNC_DISPOSED || o == ASYNC_DISPOSED;
    }

    private boolean isDisposed(Object o) {
        return o == SYNC_DISPOSED || o == ASYNC_DISPOSED;
    }

    private Object getFuture() {
        return get(FUTURE_INDEX);
    }

    private void removeFromParentIfNeeded() {
        Object o = getParent();
        if (setParentDone(o)
                && o != null) {
            removeFromParent(o);
        }
    }

    private boolean setParentDone(Object o) {
        return o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE);
    }

    private void removeFromParent(Object o) {
        ((Parent) o).delete(this);
    }

    private void nullifyCurrentThread() {
        lazySet(THREAD_INDEX, null);
    }

    private void stashCurrentThread() {
        lazySet(THREAD_INDEX, Thread.currentThread());
    }

    @Override
    public void dispose() {
        for (;;) {
            Object o = getFuture();
            if (isDone(o) || isDisposed(o)) {
                break;
            }
            if (setDisposed(o)) {               
                break;
            }
        }

        for (;;) {
            Object o = getParent();
            if (isParentDisposedOrDone() || o == null) {
                return;
            }
            if (isParentDisposed(o)) {
                removeFromParent(o);
                return;
            }
        }
    }

    private boolean isParentDisposed(Object o) {
        return compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED);
    }

    private Object getParent() {
        return get(PARENT_INDEX);
    }

    private boolean setDisposed(Object o) {
        boolean async = getTaskThread() != Thread.currentThread();
        if (compareAndSet(FUTURE_INDEX, o,
                async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
            if (o != null) {
                ((Future<?>) o).cancel(async);
            }
            return true;
        }
        return false;
    }

    private Thread getTaskThread() {
        return (Thread) get(THREAD_INDEX);
    }

    public boolean isParentDisposedOrDone() {
        Object o = get(PARENT_INDEX);
        return o == PARENT_DISPOSED || o == DONE;
    }
}

Parent contains an internal set of disposable tasks. We have add, delete and dispose methods.

Parent:

package com.javarticles.threads;

import java.util.HashSet;
import java.util.Set;

public class Parent implements Disposable {
    volatile boolean disposed;
    private Set<Disposable> disposableTasks = new HashSet<>();
    
    public void delete(Disposable disposableTask) {
        disposableTasks.remove(disposableTask);
    }
    
    public boolean add(Disposable d) {
        if (!disposed) {
            synchronized (this) {
                if (!disposed) {
                    disposableTasks.add(d);
                    return true;
                }
            }
        }
        d.dispose();
        return false;
    }
    
    @Override
    public void dispose() {
        if (disposed) {
            return;
        }
        Set<Disposable> set;
        synchronized (this) {
            if (disposed) {
                return;
            }
            disposed = true;
            set = disposableTasks;
            disposableTasks = null;
        }

        dispose(set);
    }
    
    void dispose(Set<Disposable> set) {
        if (set == null) {
            return;
        }
        for (Disposable disposable : set) {
            disposable.dispose();
        }
        
    }

    public boolean contains(Disposable d) {
        return disposableTasks.contains(d);
    }

}

Disposable:

package com.javarticles.threads;

public interface Disposable {
    void dispose();
}

PrintUtils:

package com.javarticles.threads;

public class PrintUtils {
    public static void print(String s) {
        System.out.println(Thread.currentThread().getName() + ":" + s);
    }
}

As you can see from the output, as soon as we cancel a task, the thread running it gets interrupted.

Output:

main:Created thread:Thread[ThreadPoolExec-1,5,main] for [email protected][State = -1, empty queue]
Parent has the future?true
ThreadPoolExec-1:MyTask(1)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.javarticles.threads.AtomicReferenceArrayExample$MyTask.run(AtomicReferenceArrayExample.java:66)
	at com.javarticles.threads.TrackableTask.run(TrackableTask.java:39)
	at com.javarticles.threads.TrackableTask.call(TrackableTask.java:31)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Task is cancelled
After cancellation does parent has the future?false

Download the source code

This was an example about AtomicReferenceArray.

You can download the source code here: javaAtomicReferenceArray.zip
Share.

Comments are closed.