I want to create an ExecutorService that increases or decreases the number of threads depending on the task status and sets an upper limit on the number of threads.

In the Java world, you can easily create threads by using the Thread class. However, it is difficult to manage the life cycle properly, so Executor etc. Is recommended to use.

Item 68 of Effective Java says "Select executors and tasks from threads".

By the way, Executor (Actually [ExecutorService](https://docs. oracle.com/javase/jp/8/docs/api/java/util/concurrent/ExecutorService.html) and ScheduledExecutorService If you need an instance of (I think you'll be using the interface in /util/concurrent/ScheduledExecutorService.html), then Executors There is a factory method in /java/util/concurrent/Executors.html), so it's easy to use via this, and for most purposes, just using this will solve the problem.

Here, we will explain how to create an "Executor Service that increases or decreases the number of threads depending on the task status and sets an upper limit on the number of threads", which is not provided by Executors. I will explain it in a way that compares my trial and error, so what should I do after all? If you just need it, scroll to the last chapter.

ExecutorService factory method

Aside from ScheduledExecutorService, there are the following factory methods to create an instance of ExecutorService.

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool()
SingleThreadExecutor
Perform the task in a single thread. Used when you want to execute tasks sequentially
FixedThreadPool
Execute the task in the specified fixed number of threads. Used for parallelization of arithmetic processing.
CachedThreadPool
Threads are created as needed, the created threads are cached and reused for a certain period of time, and threads that have not been used for a while are terminated. Used for IO processing with long waiting time, such as parallelism requirements changing over time

Factory method implementation

Let's take a look at the implementation of these factory methods.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

First of all, I'm curious that SingleThreadExecutor is wrapped in FinalizableDelegatedExecutorService, but if you look at the implementation

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}

It sounds like a wrap class that just calls shutdown () with finalize (). But why only SingleThreadExecutor? It's not the main subject, so I'll leave it.

You can see that both Executors use the ThreadPoolExecutor class. The behavior seems to change depending on the parameters.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

I want to create an ExecutorService that increases or decreases the number of threads depending on the task status and sets an upper limit on the number of threads.

I think that CachedThreadPool will be used when I finally want to execute a lot of tasks that basically cause IO waiting. However, looking at the definition I wrote earlier, the maximum number of threads is ʻInteger.MAX_VALUE`. In the case of a thread waiting for IO, it makes sense to start a separate thread because the CPU is free. However, of course, the cost of memory etc. will increase depending on the number of threads, so you may want to avoid starting too many threads.

If so, is it Fixed Thread Pool? However, I want a certain maximum parallelism during peak hours, but when I don't need it, I want you to reduce the number of threads ... I think that such a request is normal. That's why I want the thread limit setting version of CachedThreadPool.

Try changing the argument of ThreadPoolExecutor

Can I change the arguments a little?

Because CachedThreadPool is like this ...

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

If you change ʻInteger.MAX_VALUE`, yeah

new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

Since the 3rd and 4th arguments are thread cache times, let's change and test here.

test

@Test
public void test() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize; i++) {
        final CountDownLatch startLatch = new CountDownLatch(1);
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            endLatch.countDown();
        });
        startLatch.await();
        assertThat(executor.getActiveCount(), is(i + 1));
        assertThat(executor.getPoolSize(), is(i + 1));
    }
    endLatch.await();
    Thread.sleep(100);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(poolSize));
    Thread.sleep(1000);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(0));
}

The number of threads will increase by the amount of the load, and when the task is completed, inactive threads will remain, and when the cache time is exceeded, the pooled threads will also be terminated. Yeah yeah good

Next, let's make sure that the degree of parallelism is within the upper limit.

@Test
public void test2() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    final CountDownLatch startLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize + 1; i++) {
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    startLatch.await();
    assertThat(executor.getActiveCount(), is(poolSize));
    assertThat(executor.getPoolSize(), is(poolSize));
}
java.util.concurrent.RejectedExecutionException: Task example.ThreadTest$$Lambda$1/739498517@311d617d rejected from java.util.concurrent.ThreadPoolExecutor@7c53a9eb[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]

……what……

Let's think again

Looking back at FixedThreadPool, the last argument is different.

CachedThreadPool specified SynchronousQueue. I used it without knowing it, but don't know what SynchronousQueue is. I used it. According to the explanation, it was a queue that offer succeeds only when there is a thread trying to read with a blocking queue with zero capacity.

Well, since there is no upper limit on the number of threads, there is no need for capacity in the queue in the first place.

In other words, if you use the Linked Blocking Queue used in Fixed Thread Pool ...

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
java.lang.AssertionError: 
Expected: is <2>
     but: was <1>
Expected :is <2>
     
Actual   :<1>

orz This time the number of threads will not increase

Don't try to use it just by intuition

ThreadPoolExecutor Javadoc has a proper explanation.

** Core and maximum pool size ** ThreadPoolExecutor automatically adjusts the pool size (see getPoolSize ()) according to the boundaries set by corePoolSize (see getCorePoolSize ()) and maximumPoolSize (see getMaximumPoolSize ()). If a new task is sent with the execute (Runnable) method and there are fewer threads running than corePoolSize, a new thread will be created to handle the request, even if other worker threads are idle. If there are more threads than corePoolSize and less than maximumPoolSize running, new threads will only be created if the queue is full. Setting corePoolSize and maximumPoolSize to the same value creates a fixed-size thread pool. You can store any number of concurrent tasks in the pool by setting maximumPoolSize to a value that is in a virtually unbound format, such as Integer.MAX_VALUE. The core pool size and maximum pool size are most commonly set only at build time, but can also be changed dynamically using setCorePoolSize (int) and setMaximumPoolSize (int).

Let's see the implementation

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Relationship between ThreadPoolExecutor queue and thread pool

To organize it easily

--If the pool size is smaller than corePoolSize, a new thread will be created regardless of the state. --Tasks are queued if the pool size is greater than or equal to corePoolSize --If queued, --A new thread will be created if the pool size is zero --Threads cannot be created if the pool size is 1 or more --If not queued, a new thread will be created. If a new thread cannot be created, such as when the pool size has reached the maximumPoolSize, it will be rejected.

In other words, if you want to specify corePoolSize as 0 and maximumPoolSize as a valid upper limit,

--If you use SynchronousQueue, the number of threads will increase until you reach the maximumPoolSize, but if the active thread exceeds the maximumPoolSize and you load a task, it will be rejected. --With LinkedBlockingQueue, no thread is created after the first thread is created

It turns out that.

What is the arrangement of expected values and the corresponding queue?

The Executor you want to make is

--If there is an idle thread, execute the task on that thread. --If there are no idle threads and the pool size is less than the maximum number of threads, a new thread will be created. --If there are no idle threads and the pool size is the maximum number of threads, the task will be queued.

If you use ThreadPoolExecutor, the required queue characteristics are

--Offer succeeds if there is a thread trying to read --No thread is trying to read, but offer fails if the pool size is less than the maximum number of threads --Offer succeeds when there are no threads trying to read and the pool size is the maximum number of threads

That is, it is a queue whose behavior changes according to the state of ThreadPoolExecutor. When it comes to making it, it feels like it's tightly coupled, with both Executor and Queue modified and cross-referenced.

However, there is a constructor of ThreadPoolExecutor that can specify RejectedExecutionHandler as another argument.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

As the name implies, RejectedExecutionHandler is a handler that callsvoid rejectedExecution (Runnable r, ThreadPoolExecutor executor);when rejecting. By default, a handler that simply throws an Exception is set. This seems to be available. That is, implement both the BlockingQueue <Runnable> and RejectedExecutionHandler interfaces and

--Offer succeeds if there is a thread trying to read --Offer fails if there is no thread trying to read --When rejectedExecution is called, stack it in the queue

You just have to make it work.

ExecutorService that increases or decreases the number of threads depending on the task status and sets an upper limit on the number of threads

The class created based on the consideration so far is as follows. I wrote it in Java, but I needed a mess of boilerplate code for the delegate, so I'm rewriting it in Kotlin so that I can paste it here.

class ThreadWorkQueue(
        private val delegate: BlockingQueue<Runnable> = LinkedBlockingQueue()
) : BlockingQueue<Runnable> by delegate,
        RejectedExecutionHandler {
    private val idleThreads = AtomicInteger(0)

    override fun offer(runnable: Runnable): Boolean {
        return if (idleThreads.get() == 0) {
            false
        } else delegate.offer(runnable)
    }

    override fun take(): Runnable {
        idleThreads.incrementAndGet()
        try {
            return delegate.take()
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun poll(timeout: Long, unit: TimeUnit): Runnable? {
        idleThreads.incrementAndGet()
        try {
            return delegate.poll(timeout, unit)
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
        if (executor.isShutdown) {
            throw RejectedExecutionException("Task $r rejected from $executor")
        }
        delegate.offer(r)
    }
}

By passing an instance of this class to ThreadPoolExecutor as workQueue and handler, I was able to create an ExecutorService with the desired behavior. The final factory method looks like this. The maximum number of threads is the number of available processors, but the minimum is 2 threads. I think there are various settings here depending on the characteristics of the task you want to execute.

private static ExecutorService createParallelExecutor() {
    final ThreadWorkQueue queue = new ThreadWorkQueue();
    return new ThreadPoolExecutor(0, calculateMaximumPoolSize(),
            1L, TimeUnit.MINUTES, queue, queue);
}

private static int calculateMaximumPoolSize() {
    return Math.max(2, Runtime.getRuntime().availableProcessors());
}

The above is how to create "Executor Service where the number of threads increases or decreases depending on the task status and the upper limit is set for the number of threads".

To be honest, I was able to create an ExecutorService that behaves as desired, but I have not been able to evaluate how efficient it is compared to the existing ʻExecutors factory method. The recommendation of using the Executor is that the management of difficult threads is thoroughly tested and the track record is left to the existing library, so if you play with it poorly, the benefits will disappear. Unless absolutely necessary, I think it is better to use the factory method of ʻExecutors.

Recommended Posts

I want to create an ExecutorService that increases or decreases the number of threads depending on the task status and sets an upper limit on the number of threads.
Command to check the number and status of Java threads
I want to call a method and count the number
The story of Collectors.groupingBy that I want to keep for posterity
I want to limit the input by narrowing the range of numbers
I want to add the disabled option to f.radio_button depending on the condition
I want to display the number of orders for today using datetime.
[Active Admin] I want to customize the default create and update processing
[Ruby] I want to extract only the value of the hash and only the key
I want to pass the argument of Annotation and the argument of the calling method to aspect
I want to control the start / stop of servers and databases with Alexa
I want to see the contents of Request without saying four or five
I want to recursively get the superclass and interface of a certain class
Pursuing the mystery that the number of DB connections in Tomcat increases to only 8-A day of an OSS support engineer