В качестве предыстории
Есть клиент-серверное приложение, где сервер обрабатывает задачи. Есть достаточно большой набор задач, которые может попросить выполнить клиент. Изначально не известно какое время займет выполнение задачи. Причем задачи теряют важность с течением времени.
В качестве исполнителя задач был выбран стандартный пул нитей(threadpool). Все да ничего, но в какой-то прекрасный день сервер стал отказываться выполнять задачи. Из-за того, что некоторые задачи исполнялись довольно долго и занимали собой место в пуле, отнимая тем самым это место у быстрых задач. Соответственно, когда места в пуле не хватало, были отказы от выполнения задач. Причем, при завершении выполнения долго живущей задачи ее результат был уже не актуален, т.е. в какой-то момент времени можно было просто оборвать задачу.
Решение
Что удивительно, интернет не подсказал ни одного решения, достойного или нет. Поэтому был придуман пул, который обрывает выполнение нитей после истечение заданного таймаута(timeouted thread pool).
К каждой задаче прикрепляется надсмотрщик, который выполняет единственное действие — оборвать выполнение задачи по истечению таймаута. Все задачи, которые успели выполнится до истечения таймаута завершаются стандартным образом, а те, что не успели — обрываются из нити-надсмотрщика.
Код класса TimeoutedThreadPool выглядит следующим образом
import java.util.concurrent.*;
/**
* Thread pool implementation that executes each submitted task using
* one of possibly several pooled threads with given timeout.
* Worker thread are interrupted after timeout expires.
*/
public class TimeoutedThreadPool extends ThreadPoolExecutor{
/**
* Thread pool for threads which control worker threads.
*/
private Executor supervisorThreadPool;
/**
* Time in milliseconds after which interrupt worker thread
*/
private int delay;
/**
* {@inheritDoc}
* @param timeout- timeout in milliseconds
*/
public TimeoutedThreadPool(int timeout,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(timeout, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory());
}
/**
* {@inheritDoc}
* @param timeout - timeout in milliseconds
*/
public TimeoutedThreadPool(int timeout,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
init(timeout);
}
/**
* Do some initialization work
* @param timeout - timeout in milliseconds
*/
private void init(int timeout ){
this.timeout = timeout ;
supervisorThreadPool = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("supervisor-"+t.getName());
return t;
}
});
}
/**
* Attach supervisor thread to worker thread here
* {@inheritDoc}
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
supervisorThreadPool.execute(new SupervisorThread(t));
}
/**
* Thread which attaches to worker thread and interrupt it after given timeout expires
*/
private class SupervisorThread implements Runnable{
private Thread target;
public SupervisorThread(Thread target){
this.target = target;
}
@Override
public void run() {
try {
target.join(delay); // do attach here
} catch (InterruptedException e) {
target.interrupt(); // do interrupt here if supervisor interrupted
}
if(target.isAlive()) target.interrupt(); // do interrupt here if worker thread is still alive
}
}
}
Тестирование
Для тестирования пула использовался такой тестовый класс
import java.util.concurrent.*;
/**
* In this test will be created a set of tasks with incremented sleep time, eg. 1s, 2s, ..., 9s
* and timeout time 5s. Tasks with less sleep time should finish as usual, with bigger should be interrupted.
*
*/
public class PoolTest {
private static final int TIMEOUT = 5000;
/**
* Start progrum from here
*/
public static void main(String[] args){
new PoolTest().run();
}
/**
* Run test
*/
public void run(){
/*
* Create our thread pool
*/
TimeoutedThreadPool threadPool = new TimeoutedThreadPool(TIMEOUT, 1, 10, 5, TimeUnit.MINUTES, new SynchronousQueue());
/*
* Create 9 tasks with incremented sleep time
*/
for (int i = 1; i < 10; i++) {
threadPool.execute(new TaskThread(i * 1000));
}
}
/**
* Task thread which sleeps to emulate work
*/
private class TaskThread implements Runnable{
private int sleepTime;
public TaskThread(int sleepTime){
this.sleepTime = sleepTime;
}
@Override
public void run() {
long start = System.currentTimeMillis();
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
System.out.println("Thread-"+sleepTime+" interrupted after " + (System.currentTimeMillis() - start) + " ms");
}
System.out.println("Thread-"+sleepTime+" finished");
}
}
}
Результаты тестирования
Результаты очень даже хорошие, с поставленной задачей новый пул справляется отлично.
Thread-1000 finished
Thread-2000 finished
Thread-3000 finished
Thread-4000 finished
Thread-5000 finished
Thread-8000 interrupted after 5025 ms
Thread-9000 interrupted after 5010 ms
Thread-9000 finished
Thread-8000 finished
Thread-7000 interrupted after 5010 ms
Thread-7000 finished
Thread-6000 interrupted after 5025 ms
Thread-6000 finished