Introducción a Java ThreadPools
Cómo funciona un Pool de Hilos (Thread Pool)
En lugar de iniciar un nuevo hilo para cada tarea que se ejecutará concurrentemente, la tarea puede pasarse a un pool de hilos. Tan pronto como el pool tenga hilos inactivos, la tarea se asigna a uno de ellos y se ejecuta. Internamente, las tareas se insertan en una Cola Bloqueante de la cual los hilos en el pool están extrayendo elementos. Cuando se inserta una nueva tarea en la cola, uno de los hilos inactivos la extraerá con éxito y la ejecutará. El resto de los hilos inactivos en el pool quedarán bloqueados esperando para extraer tareas.
Casos de uso del Thread Pool
Los pools de hilos se utilizan a menudo en servidores multihilo. Cada conexión que llega al servidor a través de la red se envuelve como una tarea y se pasa a un pool de hilos. Los hilos en el pool procesarán las solicitudes en las conexiones de forma concurrente.
Thread Pool incorporado en Java
Java viene con pools de hilos incorporados en el paquete java.util.concurrent
, por lo que no es necesario implementar tu propio pool de hilos. Podemos leer más sobre esto en la documentación de java.util.concurrent.ExecutorService
. Aun así, puede ser útil conocer un poco sobre la implementación de un pool de hilos.
Implementación de un Thread Pool
en Java
Acá vemos una implementación simple de un pool de hilos. La implementación utiliza la BlockingQueue
estándar de Java que viene incluida desde Java 5
.
1import java.util.ArrayList;2import java.util.List;3import java.util.concurrent.ArrayBlockingQueue;4import java.util.concurrent.BlockingQueue;5
6public class ThreadPool {7
8 private BlockingQueue<Runnable> colaDetareas = null;9 private List<PoolThreadRunnable> runnables = new ArrayList<>();10 private boolean estaDetenido = false;11
12 public ThreadPool(int numDeHilos, int maxNumDeTareas){13 colaDetareas = new ArrayBlockingQueue<Runnable>(maxNumDeTareas);14
15 for(int i=0; i<numDeHilos; i++){16 PoolThreadRunnable poolThreadRunnable =17 new PoolThreadRunnable(colaDetareas);18
19 runnables.add(poolThreadRunnable);20 }21 for(PoolThreadRunnable ejecutable : runnables){22 new Thread(ejecutable).start();23 }24 }25
26 public synchronized void execute(Runnable tarea) throws Exception{27 if(this.estaDetenido) throw28 new IllegalStateException("ThreadPool está detenido");29
30 this.colaDetareas.offer(tarea);31 }32
33 public synchronized void stop(){34 this.estaDetenido = true;35 for(PoolThreadRunnable ejecutable : runnables){36 ejecutable.doStop();37 }38 }39
40 public synchronized void esperarHastaQueTodasLasTareasTerminen() {41 while(this.colaDetareas.size() > 0) {42 try {43 Thread.sleep(1);44 } catch (InterruptedException e) {45 e.printStackTrace();46 }47 }48 }49}
Ahora veamos la clase PoolThreadRunnable
que implementa la interfaz Runnable:
1import java.util.concurrent.BlockingQueue;2
3public class PoolThreadRunnable implements Runnable {4
5 private Thread hilo = null;6 private BlockingQueue<Runnable> colaDetareas = null;7 private boolean estaDetenido = false;8
9 public PoolThreadRunnable(BlockingQueue<Object> cola){10 colaDetareas = cola;11 }12
13 public void run(){14 this.hilo = Thread.currentThread();15 while(!isStopped()){16 try{17 Runnable runnable = colaDetareas.take();18 runnable.run();19 } catch(Exception e){20 // Registrar o reportar la excepción,21 // pero mantener el hilo del pool vivo.22 }23 }24 }25
26 public synchronized void doStop(){27 estaDetenido = true;28 // Sacar al hilo del pool de la llamada a dequeue().29 this.hilo.interrupt();30 }31
32 public synchronized boolean isStopped(){33 return estaDetenido;34 }35}
Y finalmente, un ejemplo de cómo usar el ThreadPool
:
1public class App {2 public static void main(String[] args) throws Exception {3 int numTareas = 20;4 int numHilos = 4;5 ThreadPool threadPool = new ThreadPool(numHilos, numTareas);6
7 for(int i=0; i<numTareas; i++) {8
9 int numTarea = i;10 threadPool.execute( () -> {11 String mensaje = Thread.currentThread().getName() + ": Tarea " + numTarea;12 System.out.println(mensaje);13 });14 }15
16 threadPool.esperarHastaQueTodasLasTareasTerminen();17 threadPool.stop();18
19 }20}
La implementación del pool de hilos consta de dos partes. Una clase ThreadPool
que es la interfaz pública del pool de hilos, y una clase PoolThreadRunnable
que implementa los hilos que ejecutan las tareas.
Para ejecutar una tarea, se llama al método ThreadPool.execute(Runnable r)
con una implementación de Runnable
como parámetro. El Runnable
se encola en la BlockingQueue
internamente, esperando ser desencolado.
El Runnable
será desencolado por un PoolThreadRunnable
inactivo y ejecutado. Podemos ver esto en el método PoolThreadRunnable.run()
. Después de la ejecución, el PoolThreadRunnable
hace un ciclo e intenta desencolar otra tarea de nuevo, hasta que se detenga.
Para detener el ThreadPool
, se llama al método ThreadPool.stop()
. La llamada a stop
se anota internamente en el atributo estaDetenido
. Luego, cada hilo en el pool se detiene llamando a doStop()
en cada hilo. Observemos cómo el método execute()
lanzará una IllegalStateException
si se llama después de que se haya llamado a stop()
.
Los hilos se detendrán después de terminar cualquier tarea que estén ejecutando actualmente. Observemos la llamada a this.interrupt()
en PoolThreadRunnable.doStop()
. Esto asegura que un hilo bloqueado en una llamada wait()
dentro de la llamada colaDetareas.take()
salga de la llamada de wait()
y salga del método take()
con una InterruptedException
lanzada. Esta excepción se captura en el método PoolThreadRunnable.run()
, se reporta, y luego se verifica la variable estaDetenido
. Como estaDetenido
es ahora verdadero, PoolThreadRunnable.run()
saldrá y el hilo morirá.