Na wstępie można było by sobie zadać pytanie, po co wymyślono egzekutory. Przecież możemy odpalić wątki, pozarządzać, poczekać na wykonanie pracy, itd. Ale nie ma co „wymyślać koła na nowo”, a najlepszy kod to ten którego nie trzeba pisać. Należy również wspomnieć o tym, że tworzenie wątków w ramach puli jest mniej, zasobożerne niż tworzenie kilku lub kilkunastu (a niekiedy nawet kilkaset), pojedynczych instancji wątków i pilnowanie ich działania.

Egzekutory pozwalają nam na logiczne odseparowanie zadań od wątków, nie musimy wtedy zarządzać wątkami, uruchamiać nowe. Tym zajmują się pule wątków, które wraz z egzekutorami pozwalają na różne strategie uruchamiania i zarządzania wątkami. Myślimy wtedy bardziej nie o wątkach jako osobnych zadaniach, nie zajmując się ich zarządzaniem.

Podstawową implementacją puli wątków jest klasa ThreadPoolExecutor, która pozwala na określenie:

  • jak mają być synchronizowane i wykonywane zadania,
  • ile ma być odpalonych wątków,
  • ile ma czekać.
  • co należy zrobić gdy kolejka puli wątków się zapełni, możemy wtedy:
    • odrzucić nowe rzeczy do wykonania ThreadPoolExecutor.AbortPolicy
    • zignorować nowe zadania – ThreadPoolExecutor.DiscardPolicy
    • zakończyć najstarsze zadania – hreadPoolExecutor.DiscardOldestPolicy
    • wywołać wątek wywołujący aby on zdecydował co zrobić – ThreadPoolExecutor.CallerRunsPolicy

Do tego mamy również klasę Executors, która zawiera statyczne metody zwracające ExecutorService z gotowymi strategiami uruchamiania wątków, i tam mamy:

  • newSingleThreadExecutor – zwraca egzekutora, który uruchamia jeden wątek
  • newFixedThreadPool – pula wątków która ma określony maksymalny stały rozmiar. Nowe wątki są tworzone aż do uzyskania maksymalnego rozmiaru. Po jego osągnięciu nowe są tworzone tylko wtedy, gdy jakiś wątek zakończy swoją pracę.
  • newCachedThreadPool – pula wątków która dynamicznie dostosowuje swój rozmiar do obciążenia. Wykonawca będzie usuwał wątki kiedy zmaleje obciążenie, a dodawał nowe gdy wzrośnie
  • newSingleThreadScheduledExecutor – zwraca egzekutora, który odpala jeden wątek uruchamiany z opóźnieniem lub uruchamiany cykliczne, co jakiś czas
  • newScheduledThreadPool – zwraca egzekutora, który ma ustaloną liczbę wątków, uruchamianych z opóźnieniem lub co jakiś czas

Warto trochę powiedzieć na temat interfejsu Future<V> i Callable<V>. Takiej konstrukcji używamy głównie, gdy mamy bardzo długie obliczenia i nie wiemy kiedy się zakończą. Wtedy możemy użyć operacji asynchronicznych i przy pomocy konstrukcji interfejsu Future i Callable wywołać zadanie, a jego rezultat zwrócić w przyszłości. Przy pomocy metody get. Różni się sposób uruchomienia wątku w egekutorze, wątek zwracający wynik uruchamiany metodą submit(Runnable task), a nie execute(Runnable command).

Przykład wykorzystania puli wątków:

import java.util.concurrent.*;
 
class SuperWorker implements Runnable{
 
   private int counter = 0;
   private int countTo = 0;
   private String name;
 
   public SuperWorker(int countTo, String name){
 
      this.countTo = countTo;
      this.name = name;
   }
 
   @Override
   public void run(){
 
      System.out.println("Start wątku: " + name);
      System.out.flush();
 
      try{
         while (counter < countTo){
            counter++;
            System.out.println("Wątek " + name + " liczy " + counter);
            System.out.flush();
            Thread.sleep(100);
         }
      } catch (InterruptedException e){
         e.printStackTrace();
      }
 
      System.out.println("Koniec wątku: " + name);
      System.out.flush();
   }
}
 
public class Thread5ThreadPool{
 
   public static void main(String[] args){
 
 
      //-------------------------------------------------
      // Przykład uruchamiania pojedyńczego wątku
      //-------------------------------------------------
      System.out.println("--- Start pojedynczego egzekutora -------------");
      System.out.flush();
 
      final ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
      newSingleThreadExecutor.execute(new SuperWorker(10, "SingleThread"));
 
      //
      // Zakończenie pracy wątku
      //
      newSingleThreadExecutor.shutdown();
      try{
         newSingleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
      newSingleThreadExecutor.shutdown();
 
      System.out.println("--- Koniec pojedynczego egzekutora -------------");
      System.out.flush();
 
      //-------------------------------------------------
      // Przykład uruchamiania pojedyńczego
      // opóżnionego wątku
      //-------------------------------------------------
      System.out.println("--- Start pojedynczego opóżnionego egzekutora -------------");
      System.out.flush();
 
      final ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
      singleThreadScheduledExecutor.schedule(new SuperWorker(10, "SingleScheduledThread"), 1, TimeUnit.SECONDS);
 
      singleThreadScheduledExecutor.shutdown();
      try{
         singleThreadScheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
      singleThreadScheduledExecutor.shutdown();
 
      System.out.println("--- Koniec pojedynczego opóżnionego egzekutora -------------");
      System.out.flush();
 
 
      //-------------------------------------------------
      // Przykład uruchamiania puli wątków,
      // gdzie uruchamiamy więcej wątków niż wielkość puli
      // Widać ze wątki powyżej
      // 5 startują wtedy gdy "stare" zakończą swoją pracę
      //-------------------------------------------------
      System.out.println("--- Start egzekutora o stałej liczbie wątków -------------");
      System.out.flush();
 
      final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
      for (int i = 0; i < 10; i++){
         newFixedThreadPool.execute(new SuperWorker(5, "newFixedThreadPool_" + i));
      }
 
      newFixedThreadPool.shutdown();
      try{
         newFixedThreadPool.awaitTermination(10, TimeUnit.SECONDS);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
      newFixedThreadPool.shutdown();
 
      System.out.println("--- Koniec egzekutora o stałej liczbie wątków -------------");
      System.out.flush();
 
 
      //-------------------------------------------------
      // Przykład uruchamiania puli wątków,
      // indywidualnie skonfigurowanej
      //-------------------------------------------------
      System.out.println("--- Start własnej pili wątków -------------");
      System.out.flush();
 
      final ExecutorService executorService= new ThreadPoolExecutor(
      2,
      4,
      0L, TimeUnit.MILLISECONDS,
      new ArrayBlockingQueue<Runnable>(20),
      new ThreadPoolExecutor.CallerRunsPolicy());
 
      for (int i = 0; i < 10; i++){
         executorService.execute(new SuperWorker(5, "newFixedThreadPool_" + i));
      }
 
      executorService.shutdown();
      try{
         executorService.awaitTermination(10, TimeUnit.SECONDS);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
      executorService.shutdown();
 
      System.out.println("--- Koniec własnej pili wątków -------------");
      System.out.flush();
 
   }
}

Przykład pobrania rezultatów pracy wątku przy pomocy interfejsu Callable i Future:

import java.util.concurrent.*;
 
/**
 * Klasa przetwarzająca dane i zwracająca je do głównego wątku
 * poprzez wywoałnie metody get()
 */
class SuperFutureWorker implements Callable<Integer>{
 
   private int counter = 0;
   private int countTo = 0;
   private int result = 1;
   private String name;
 
   public SuperFutureWorker(int countTo, String name){
 
      this.countTo = countTo;
      this.name = name;
   }
 
   @Override
   public Integer call(){
 
      System.out.println("Start wątku: " + name);
      System.out.flush();
 
      try{
         while (counter < countTo){
            counter++;
            this.result = this.result + this.result * counter;
            System.out.println("Wątek " + name + " liczy " + counter);
            System.out.flush();
            Thread.sleep(100);
         }
      } catch (InterruptedException e){
         e.printStackTrace();
      }
 
      System.out.println("Koniec wątku: " + name + " zwracam " + result);
      System.out.flush();
 
      return result;
   }
}
 
public class Thread6Future{
 
   public static void main(String[] args){
 
      //-------------------------------------------------
      // Przykład uruchamiania pojedyńczego wątku
      //-------------------------------------------------
      System.out.println("--- Start pojedynczego egzekutora -------------");
      System.out.flush();
 
      final ExecutorService executorService = Executors.newSingleThreadExecutor();
      final Future<Integer> integerFuture = executorService.submit(new SuperFutureWorker(5, "Worker1"));
      executorService.shutdown();
 
      try{
         final Integer integer = integerFuture.get();
         System.out.println("Wynik wątku: " + integer);
      } catch (InterruptedException | ExecutionException e){
         e.printStackTrace();
      }
 
      System.out.println("--- Koniec pojedynczego egzekutora -------------");
 
   }
}

Inne artykuły z tej serii: