ThreadPoolExecutor

0

ThreadPoolExecutor consists of a pool of threads where the the size of pool is automatically adjusted based on the boundary defined using  core pool size and maximum pool size.

In this article we will look into the attributes that influence the way ThreadPoolExecutor maintains the thread pool. If we understand these parameters we will know how to tweak the thread pool’s behaviour to suit our own specific purpose.

Execute a task

First lets see what happens when a task is executed. When a new task is submitted, a worker thread is created to handle it. 

Core pool size

Should the ThreadPoolExecutor create a new worker or use an existing idle worker thread depends on the core pool size. If the no. of workers in pool are fewer than corePoolSize threads are running, a new worker thread would be created to handle the request. It doesn’t matter if the pool has idle worker threads, a new worker thread is created till the pool grows to the size of core pool set.

Worker Queue

If no. of workers exceed the core pool size then no further thread will be assigned instead the task will be queued till one of the core worker becomes free to handle the cached task.

Maximum pool size

If the task cannot be queued, a new thread is created to handle the task. The maximum no. of threads that can be created is bounded by the maximum pool size configured. If no. of threads would exceed maximumPoolSize, in which case, the task will be rejected.

When all the core threads are busy, any new task submitted is added to the cache. The task will remain in cache till one of the worker thread becomes free to consume the cached task. What happens if there are no new tasks cached but the worker thread is now free? keepAliveTime will decide the time till which the worker thread will wait for the new cached task.

KeepAliveTime

If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime. This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. By default, the keep-alive policy applies only when there are more than corePoolSize threads.

ThreadPoolExecutor Example

In the below example, we create many tasks and submit it to the ThreadPoolExecutor. Number of core pool size used is set to three and the maximum pool size is set to four. We also have used a queue of capacity two.

ThreadPoolExecutorExample:

package com.javarticles.threads;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.javarticles.threads.PrintUtils.*;

public class ThreadPoolExecutorExample {
    private static final AtomicLong id = new AtomicLong(0);
    public static void main(String[] args) {
        ExecutorService executorService = createDefaultExecutor();
        try {
            for (int i = 0; i < 10; i++) {
                executorService.execute(new MyTask(i+1));
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            print("Done");
        } finally {
            executorService.shutdown();
        }
    }

    private static ExecutorService createDefaultExecutor() {
        ThreadPoolExecutor rc = new ThreadPoolExecutor(3, 4, 100, 
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable runnable) {
                String threadName = "ThreadPoolExec" + "-" + id.incrementAndGet();
                Thread thread = new Thread(runnable, threadName);
                thread.setDaemon(true);
                thread.setPriority(Thread.NORM_PRIORITY);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(final Thread t, final Throwable e) {
                        print("Error in thread '{}'" + t.getName() + e);
                    }
                });

                print("Created thread:" + thread + " for " + runnable);
                return thread;
            }
        }){
            protected void afterExecute(Runnable r, Throwable t) {
                print("executed");
            }

            protected void terminated() {
                print(this + " worker terminated");
            }
        };

        rc.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        return rc;
    }


    private static class MyTask implements Runnable {
        int i;
        MyTask(int i) {
            this.i = i;
        }
        @Override
        public void run() {
            print(toString());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String toString() {
            return "MyTask(" + i + ")";
        }
    }

}

As you can see from the output below, a thread is created as soon as a task is submitted. Thread ThreadPoolExec-1 is reused to handle MyTask(4) after executing MyTask(1). Later at some point fourth thread is created to handle task MyTask(9) as all the other three threads are still busy with their task.
Output:

main:Created thread:Thread[ThreadPoolExec-1,5,main] for [email protected][State = -1, empty queue]
ThreadPoolExec-1:MyTask(1)
main:Created thread:Thread[ThreadPoolExec-2,5,main] for [email protected][State = -1, empty queue]
ThreadPoolExec-2:MyTask(2)
main:Created thread:Thread[ThreadPoolExec-3,5,main] for [email protected][State = -1, empty queue]
ThreadPoolExec-3:MyTask(3)
ThreadPoolExec-1:executed
ThreadPoolExec-1:MyTask(4)
ThreadPoolExec-2:executed
ThreadPoolExec-2:MyTask(5)
ThreadPoolExec-3:executed
ThreadPoolExec-3:MyTask(6)
main:Created thread:Thread[ThreadPoolExec-4,5,main] for [email protected][State = -1, empty queue]
ThreadPoolExec-4:MyTask(9)
main:MyTask(10)
ThreadPoolExec-1:executed
ThreadPoolExec-1:MyTask(7)
ThreadPoolExec-2:executed
ThreadPoolExec-2:MyTask(8)
ThreadPoolExec-3:executed
ThreadPoolExec-4:executed
ThreadPoolExec-1:executed
main:Done

Download the source code

This was an example about internals of ThreadPoolExecutor.

You can download the source code here: javaThreadPoolExecutorExample.zip
Share.

Comments are closed.