Saltearse al contenido

Introducción a Java ThreadPools

Cómo funciona un Pool de Hilos (Thread Pool)

En lugar de iniciar un nuevo hilo para cada tarea que se ejecutará concurrentemente, la tarea puede pasarse a un pool de hilos. Tan pronto como el pool tenga hilos inactivos, la tarea se asigna a uno de ellos y se ejecuta. Internamente, las tareas se insertan en una Cola Bloqueante de la cual los hilos en el pool están extrayendo elementos. Cuando se inserta una nueva tarea en la cola, uno de los hilos inactivos la extraerá con éxito y la ejecutará. El resto de los hilos inactivos en el pool quedarán bloqueados esperando para extraer tareas.

Diagrama de Thread Pool

Casos de uso del Thread Pool

Los pools de hilos se utilizan a menudo en servidores multihilo. Cada conexión que llega al servidor a través de la red se envuelve como una tarea y se pasa a un pool de hilos. Los hilos en el pool procesarán las solicitudes en las conexiones de forma concurrente.

Thread Pool incorporado en Java

Java viene con pools de hilos incorporados en el paquete java.util.concurrent, por lo que no es necesario implementar tu propio pool de hilos. Podemos leer más sobre esto en la documentación de java.util.concurrent.ExecutorService. Aun así, puede ser útil conocer un poco sobre la implementación de un pool de hilos.

Implementación de un Thread Pool en Java

Acá vemos una implementación simple de un pool de hilos. La implementación utiliza la BlockingQueue estándar de Java que viene incluida desde Java 5.

1
import java.util.ArrayList;
2
import java.util.List;
3
import java.util.concurrent.ArrayBlockingQueue;
4
import java.util.concurrent.BlockingQueue;
5
6
public class ThreadPool {
7
8
private BlockingQueue<Runnable> colaDetareas = null;
9
private List<PoolThreadRunnable> runnables = new ArrayList<>();
10
private boolean estaDetenido = false;
11
12
public ThreadPool(int numDeHilos, int maxNumDeTareas){
13
colaDetareas = new ArrayBlockingQueue<Runnable>(maxNumDeTareas);
14
15
for(int i=0; i<numDeHilos; i++){
16
PoolThreadRunnable poolThreadRunnable =
17
new PoolThreadRunnable(colaDetareas);
18
19
runnables.add(poolThreadRunnable);
20
}
21
for(PoolThreadRunnable ejecutable : runnables){
22
new Thread(ejecutable).start();
23
}
24
}
25
26
public synchronized void execute(Runnable tarea) throws Exception{
27
if(this.estaDetenido) throw
28
new IllegalStateException("ThreadPool está detenido");
29
30
this.colaDetareas.offer(tarea);
31
}
32
33
public synchronized void stop(){
34
this.estaDetenido = true;
35
for(PoolThreadRunnable ejecutable : runnables){
36
ejecutable.doStop();
37
}
38
}
39
40
public synchronized void esperarHastaQueTodasLasTareasTerminen() {
41
while(this.colaDetareas.size() > 0) {
42
try {
43
Thread.sleep(1);
44
} catch (InterruptedException e) {
45
e.printStackTrace();
46
}
47
}
48
}
49
}

Ahora veamos la clase PoolThreadRunnable que implementa la interfaz Runnable:

1
import java.util.concurrent.BlockingQueue;
2
3
public class PoolThreadRunnable implements Runnable {
4
5
private Thread hilo = null;
6
private BlockingQueue<Runnable> colaDetareas = null;
7
private boolean estaDetenido = false;
8
9
public PoolThreadRunnable(BlockingQueue<Object> cola){
10
colaDetareas = cola;
11
}
12
13
public void run(){
14
this.hilo = Thread.currentThread();
15
while(!isStopped()){
16
try{
17
Runnable runnable = colaDetareas.take();
18
runnable.run();
19
} catch(Exception e){
20
// Registrar o reportar la excepción,
21
// pero mantener el hilo del pool vivo.
22
}
23
}
24
}
25
26
public synchronized void doStop(){
27
estaDetenido = true;
28
// Sacar al hilo del pool de la llamada a dequeue().
29
this.hilo.interrupt();
30
}
31
32
public synchronized boolean isStopped(){
33
return estaDetenido;
34
}
35
}

Y finalmente, un ejemplo de cómo usar el ThreadPool:

1
public class App {
2
public static void main(String[] args) throws Exception {
3
int numTareas = 20;
4
int numHilos = 4;
5
ThreadPool threadPool = new ThreadPool(numHilos, numTareas);
6
7
for(int i=0; i<numTareas; i++) {
8
9
int numTarea = i;
10
threadPool.execute( () -> {
11
String mensaje = Thread.currentThread().getName() + ": Tarea " + numTarea;
12
System.out.println(mensaje);
13
});
14
}
15
16
threadPool.esperarHastaQueTodasLasTareasTerminen();
17
threadPool.stop();
18
19
}
20
}

La implementación del pool de hilos consta de dos partes. Una clase ThreadPool que es la interfaz pública del pool de hilos, y una clase PoolThreadRunnable que implementa los hilos que ejecutan las tareas.

Para ejecutar una tarea, se llama al método ThreadPool.execute(Runnable r) con una implementación de Runnable como parámetro. El Runnable se encola en la BlockingQueue internamente, esperando ser desencolado.

El Runnable será desencolado por un PoolThreadRunnable inactivo y ejecutado. Podemos ver esto en el método PoolThreadRunnable.run(). Después de la ejecución, el PoolThreadRunnable hace un ciclo e intenta desencolar otra tarea de nuevo, hasta que se detenga.

Para detener el ThreadPool, se llama al método ThreadPool.stop(). La llamada a stop se anota internamente en el atributo estaDetenido. Luego, cada hilo en el pool se detiene llamando a doStop() en cada hilo. Observemos cómo el método execute() lanzará una IllegalStateException si se llama después de que se haya llamado a stop().

Los hilos se detendrán después de terminar cualquier tarea que estén ejecutando actualmente. Observemos la llamada a this.interrupt() en PoolThreadRunnable.doStop(). Esto asegura que un hilo bloqueado en una llamada wait() dentro de la llamada colaDetareas.take() salga de la llamada de wait() y salga del método take() con una InterruptedException lanzada. Esta excepción se captura en el método PoolThreadRunnable.run(), se reporta, y luego se verifica la variable estaDetenido. Como estaDetenido es ahora verdadero, PoolThreadRunnable.run() saldrá y el hilo morirá.