Low-Level Threading

  • Creating and running threads. Refer. java.lang.Thread, java.lang.Runnable

    Runnable r = () -> System.out.println("job/task");
    Thread t = new Thread(r); //1. new state
    t.start();// 2. Runnable state
    
  • Thread States

    • Thread States
  • Thread Groups. java.lang.ThreadGroup
    • Example:

      ThreadGroup tg = new ThreadGroup("srk");
      Runnable task = () -> 
          System.out.println(
            Thread.currentThread().getName() + "-" + "Executing task");
      Thread t1 = new Thread(tg, task);
      t1.start();
      Thread t2 = new Thread(tg, task);
      t2.start();
      System.out.println(tg.activeCount()); // 2
      System.out.println(tg.getName()); // srk
      System.out.println(tg.getParent().getName()); // main
      
    • Excerpt from Do not invoke ThreadGroups

      Each thread in Java is assigned to a thread group upon the thread’s creation. These groups are implemented by the java.lang.ThreadGroup class. When the thread group name is not specified explicitly, the main default group is assigned by the Java Virtual Machine (JVM) [Java Tutorials]. The convenience methods of the ThreadGroup class can be used to operate on all threads belonging to a thread group at once. For instance, the ThreadGroup.interrupt() method interrupts all threads in the thread group. Thread groups also help reinforce layered security by confining threads into groups so that they avoid interference with threads in other groups

    • SONAR Rule: https://rules.sonarsource.com/java/RSPEC-3014/
  • volatile
    • Keyword in java
    • Used in multi-threading use cases
    • Can be applied on variables that ensures that data is read and written to main memory
    • Doesn’t work on non-atomic operations
    • Recommended applying on boolean data types
  • synchronized
    • keyword in java
    • Can be applied on methods and code blocks i.e., synchronized methods and synchronized blocks
    • Ensures that the critical-section of code is prevented from parallel execution by multiple threads by acquiring lock on class or object.
    • Class level lock: When synchronized applied on a static method
    • Object/instance level lock: When synchronized is applied on instance method
    • Example depicting the class level lock vs object level lock:

      class A {
        static synchronized void m1() {}
        synchronized void m2(){}
        static synchronized void m3() {}
        synchronized void m3(){}
      }
      public class LockDemo  {
        public static void main() {
          A a = new A();
          Thread t1 = new (() -> A.m1());
          Thread t2 = new (() -> a.m2());
          Thread t3 = new (() -> A.m3());
          Thread t4 = new (() -> a.m4());
      
          t1.start();
          t2.start();
          t3.start();
          t4.start();
      
          t1.join();
          t2.join();
          t3.join();
          t4.join();
        }
      }
      
    • While t1 is running, it acquires lock on class which means no other thread can execute other static synchronized methods of the class.
      • But other threads can execute non-static synchronized methods
    • Similarly, while t2 is running it acquires lock on object meaning no other thread can execute non-static synchronized methods of that object
      • But, other threads can execute static synchronized methods
    • In summary

      The static synchronized method holds class-level lock, preventing other threads from entering any static synchronized methods on the same class

      The non-static synchronized method holds the instance-level/object lock, preventing other threads from entering any synchronized methods on the same instance

      However, it’s important to note that these two locks are independent of each other.

      While one thread is executing a static synchronized method, it does not prevent another thread from executing non-static synchronized methods on the same instance

      Similarly, while one thread is executing a non-static synchronized method, it does not prevent another thread from executing static synchronized methods on the class

      Therefore, threads can acquire and release locks independently at both the class and instance levels.

  • synchronized block
    • The above example can be rewritten using synchronized blocks as shown below
      • Version1

        class A {
          private static final Object classLock = new Object();
          private final Object objectLock = new Object();
          static void m1() {
            synchronized(classLock){}
          }
          void m2(){
            synchronized(objectLock){}
          }
          static void m3() {
            synchronized(classLock){}
          }
          void m4(){
            synchronized(objectLock){}
          }
        }
        
        
      • Version2

        class A {
          static void m1() {
            synchronized(A.class) {}
          }
          void m2() {
            synchronized(this) {}
          }
          static void m3() {
            synchronized(A.class) {}
          }
          void m4() {
            synchronized(this) {}
          }
        }
        
      • Note: In both versions of the examples, the class level lock is applied to methods m1(), m3() whether they have static in method signature or not.

  • java.lang.ThreadLocal

    • Data stored in ThreadLocal variable is only accessible and modifiable by the thread that created it.
      • Creating ThreadLocal and adding state to it.

        ThreadLocal tLocal = new ThreadLocal();
        tLocal.set(employee); // add state to thread
        tLocal.get(); // to retrieve state of thread
        tLocal.remove(); // to cleanup state associated with thread
        
      • ThreadLocal with generics

        ThreadLocal<String> tLocal = new ThreadLocal<>();
        tLocal.set(employee); // add state to thread
        tLocal.get(); // to retrieve state of thread
        tLocal.remove(); // to cleanup state associated with thread
        
    • Note: ThreadLocal values should be removed or cleared when they no longer needed to prevent holding references to objects longer than necessary.

High-Level Threading - Abstraction

  • java.util.concurrent package since java 1.5 or 5.0
    • Provides utility classes commonly used in concurrent programming.
      • java.util.concurrent.locks
      • java.util.concurrent.atomic Executor Framework

        java.lang.Runnable java.util.concurrent.Callable
        public interface Runnable { void run(); } public interface Callable<V> { V call() throws Exception;}
        void run() - When an object implementing interface Runnable is used to create a thread, starting a thread causes the object’s run method to be called in that separately executing thread V call() - Computes a result, or throws an exception if unable to do so
        java.util.concurrent.Executor java.util.concurrent.ExecutorService
        Frequently used methods: For more details refer Javadoc Frequently used methods: For more details refer Javadoc
        void execute(Runnable command) - Executes the given command at some time in the future. Future<?> submit(Runnable task) - Submits a runnable task for execution and returns a Future representing that task
          <T> Future<T> submit(Runnable task, T result) - Submits a Runnable task for execution and returns a Future representing that task
          <T> Future<T> submit(Callable<T> task) - Submits a value-returning task for execution and returns a Future representing the pending results of the task.
          void shutdown() - Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
          List<Runnable> shutdownNow() - Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.
  • java.util.concurrent.ThreadPoolExecutor
    • All we have to do is just submit the jobs/tasks to an ExecutorService(single or fixed or cached or custom) that would internally take care of thread creation and management of threads within the pool.
    • Terminology
      • Excerpt from Javadoc of ThreadPoolExecutor Constructor

           public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue)
        

        Note: It may be more convenient to use one of the Executors factory methods instead of this general purpose constructor.

        Paramaters:

        corePoolSize - the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set

        maximumPoolSize - the maximum number of threads to allow in the pool

        keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.

        unit - the time unit for the keepAliveTime argument

        workQueue - the queue to use for holding tasks before they are executed.

  • ThreadPoolExecutor Internals
    • ThreadPoolExecutor

    • java.util.concurrent.Executors class provides factory methods to create 4 standard thread pools like Single, Fixed, Cached, and Scheduled. However, we can create our own custom ThreadPoolExecutor config as well.

      1. Single Thread Pool Executor
        • Example:

          Runnable task = () -> System.out.println("Running a task or job");
          ExecutorService es = Executors.newSingleThreadExecutor();
          Future<?> result = es.submit(task); // accepts runnable or callable
          /*-
          submit returns Future<?> for Runnable task, 
          Future<T> for Callable task
          */
          try {
            task.get(); 
            /*- blocking-call waits till task completion, 
            throws checked-exceptions*/
          } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
          }
          /*- to submit multiple tasks use invokeAll(Collection<Callable<T>>) 
          that returns List<Future<T>>*/
          es.shutdown(); // blocking call
          
          /*- shutdown - No new tasks are accepted 
          and previously submitted tasks are executed. */
          /*-
          * Single Thread Executor
          *
          *  Creates 1 Thread - ThreadPoolExecutor with the 
          following internal impl or config
          *
          * new ThreadPoolExecutor(1,
          *                        1,
          *                        0L,
          *                        TimeUnit.MILLISECONDS,
          *                        new LinkedBlockingQueue<Runnable>())
          *
          * UnBounded Queue - No limit, will accept indefinite number of tasks
          * keepAliveTime - O - Milliseconds
          * CorePoolSize = MaxPoolSize = 1
          */
          
        • Use Case: Suitable when tasks to be executed sequentially

      2. Fixed Thread Pool Executor
        • Example:

          Runnable task = () -> System.out.println("Running a task or job");
          ExecutorService es = Executors.newFixedThreadPool(2);
          Future<?> result = es.submit(task); // accepts runnable or callable
          /*-
          submit returns Future<?> for Runnable task, 
          Future<T> for Callable task
          */
          try {
            task.get(); 
            /*- blocking-call waits till task completion, 
            throws checked-exceptions*/
          } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
          }
          /*- to submit multiple tasks use invokeAll(Collection<Callable<T>>) 
          that returns List<Future<T>>*/
          es.shutdown(); // blocking call
          
          /*- shutdown - No new tasks are accepted
          and previously submitted tasks are executed. */
          /*-
          * Fixed Thread Pool Executor
          *
          *  Creates Fixed Size ThreadPoolExecutor with the
          following internal impl or config
          *
          * new ThreadPoolExecutor(nThreads,
          *                        nThreads,
          *                        0L,
          *                        TimeUnit.MILLISECONDS,
          *                        new LinkedBlockingQueue<Runnable>())
          *
          * UnBounded Queue - No limit, will accept indefinite number of tasks
          * keepAliveTime - O - Milliseconds
          * CorePoolSize = MaxPoolSize = nThreads
          */
          
        • Use Case: Suitable when you are certain of the number of tasks or throughput.

      3. Cached Thread Pool Executor
        • Example:

          Runnable task = () -> System.out.println("Running a task or job");
          ExecutorService es = Executors.newCachedThreadPool();
          Future<?> result = es.submit(task); // accepts runnable or callable
          /*-
          submit returns Future<?> for Runnable task, 
          Future<T> for Callable task
          */
          try {
            task.get(); 
            /*- blocking-call waits till task completion, 
            throws checked-exceptions*/
          } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
          }
          /*- to submit multiple tasks use invokeAll(Collection<Callable<T>>) 
          that returns List<Future<T>>*/
          es.shutdown(); // blocking call
          
          /*- shutdown - No new tasks are accepted
          and previously submitted tasks are executed. */
          
          /*-
          * Cached Thread Pool Executor
          *
          *  Creates Cached ThreadPoolExecutor with the
          following internal impl or config
          *
          * new ThreadPoolExecutor(0,
          *                        Integer.MAX_VALUE,
          *                        60L,
          *                        TimeUnit.SECONDS,
          *                        new SynchronousQueue<Runnable>())
          *
          * UnBounded Queue - No limit, will accept indefinite number of tasks
            - Synchronous Queue:
              - It is a type of blocking queue
                in Java, but it has a capacity of zero. This means
                that it doesn't store elements like a traditional
                bounded queue (e.g., ArrayBlockingQueue or
                LinkedBlockingQueue) with a fixed capacity
              - In a SynchronousQueue, each put operation
                (task submission) blocks until there is a
                corresponding take operation (thread available
                to consume the task). It is often used for direct
                handoff of tasks between producer and consumer
                threads without queuing them.
              - Tt doesn't hold tasks for later processing.
                Tasks are handed off immediately to a
                waiting/consumer thread or result in the creation
                of a new thread if no threads are available.
              - When you submit a task to a cached thread pool
                with a SynchronousQueue, the task is either handed off
                to an existing idle thread for immediate execution or
                results in the creation of a new thread if all threads 
                are busy.
              - The use of SynchronousQueue in a cached thread pool
                ensures that tasks are not queued but are processed
                immediately by threads, making it suitable for scenarios
                where you want to minimize thread creation and handle
                tasks efficiently during bursts of workload.
          
          * keepAliveTime - 60 - seconds
          * CorePoolSize = 0
          * MaxPoolSize = nThreads
          
          * Threads in (non-core-pool) are reused after their task completion
          and killed if they are idle for 60 seconds(keepAliveTime)
          */
          
        • Use Case: When you are unsure of number of incoming tasks submissions, and suitable for short-lived dynamic number of task submissions. This threadpool can reuse, shrink, and expand as per the incoming task load.

      4. Scheduled Thread Pool Executor
        • java.util.concurrent.ScheduledExecutorService
          Excerpt From the javadoc
          An ExecutorService that can schedule commands to run after a given delay, or to execute periodically
          <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) Creates and executes a ScheduledFuture that becomes enabled after the given delay
          ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) Creates and executes a one-shot action that becomes enabled after the given delay.
          ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on.
          ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.
          • fixedRate
            • Example: Assume fixedRate = 5000 ms = 5 sec
              • Case 1: Assume jobs are completed before the delay interval i.e., 5 sec
                • Assume a job started at 00:00:00(HH:mm:ss)
                  • 1st Run - 00:00:00(start time) - here fixed rate doesn’t care about end time
                  • 2nd Run - 00:00:05(start time) - here fixed rate doesn’t care about end time
                  • 3rd Run - 00:00:10(start time) - here fixed rate doesn’t care about end time
                  • And so on
              • Case2: Assume jobs takes more time than delay interval i.e., 5 sec in which case the jobs wait for the previous to finish
                • 1st Run - 00:00:00(start time) - 00:00:07(end time)
                • 2nd Run - 00:00:07(start time) - 00:00:15(end time)
                • 2nd Run - 00:00:15(start time) - 00:00:28(end time)
          • fixedDelay
            • Example: Assume fixedDelay = 5000 ms = 5 sec
              • Assume a job started at 00:00:00(HH:mm:ss)
                • 1st Run - 00:00:00(start time) - 00:00:02 (Assume end time) - task completed in 2 seconds
                • Waits for 5 ms after task 1 completion i.e., next Run start time = 00:00:02 + 5 ms = 00:00:07
                • 2nd Run - 00:00:07(start time) - 00:00:13*(Assume end time) - task completed in 6 seconds
                • Waits for 5 ms after task 2 completion i.e., next Run start time - 00:00:13 + 6 ms = 00:00:19
                • And so on
          • Applicability
            • fixedRate
              • Ensures that each run’s start time is at the consistent time intervals. There might be time tasks overlap with time which means multiple tasks may be running at a give instance
              • When you want to do an activity every x secs, x mins, x hours irrespective of the previous task completion status
              • Example: Log cleanup every day at 00:00:00.
              • Another example could be to find the health of a service by hitting the status endpoint for every x seconds.
            • fixedDelay
              • No Overlap of tasks. Each run will be started only after the previous run has finished + a fixedDelay time so, tasks are executed sequentially.
              • Example: Retries after each failed operation. In other words, no time overlap of retries. Each retry is done after a fixed delay and all retries are made sequentially.
      • Example of fixedRate case1 - when tasks are completed before the delay time:

        •    Runnable fixedRateTask =
                  () -> {
                    System.out.println(Thread.currentThread().getName() 
                    + " startTime: " + Instant.now());
                    try {
                      Thread.sleep(1000); // 1 second
                    } catch (InterruptedException e) {
                      e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() 
                    + " endTime: " + Instant.now());
                  };
            ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
            ses.scheduleAtFixedRate(fixedRateTask, 0, 5, TimeUnit.SECONDS);
          
          • Output

              pool-1-thread-1 startTime: 2023-09-25T20:47:07.385706Z
              pool-1-thread-1 endTime: 2023-09-25T20:47:08.422220Z
                        
              pool-1-thread-1 startTime: 2023-09-25T20:47:12.381992Z
              pool-1-thread-1 endTime: 2023-09-25T20:47:13.382415Z
                        
              pool-1-thread-2 startTime: 2023-09-25T20:47:17.381134Z
              pool-1-thread-2 endTime: 2023-09-25T20:47:18.381550Z
            
      • Example of fixedRate case2 - when tasks take more than the delay time, then each run waits for previous task to be completed.

        •   Runnable fixedRateTask =
                  () -> {
                    System.out.println(Thread.currentThread().getName() 
                    + " startTime: " + Instant.now());
                    try {
                      Thread.sleep(5000); // 5 seconds
                    } catch (InterruptedException e) {
                      e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() 
                    + " endTime: " + Instant.now());
                  };
            ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
            ses.scheduleAtFixedRate(fixedRateTask, 0, 1, TimeUnit.SECONDS);
          
          • Output

              pool-1-thread-1 startTime: 2023-09-25T20:49:26.340729Z
              pool-1-thread-1 endTime: 2023-09-25T20:49:31.372340Z
                        
              pool-1-thread-1 startTime: 2023-09-25T20:49:31.373885Z
              pool-1-thread-1 endTime: 2023-09-25T20:49:36.374391Z
                        
              pool-1-thread-2 startTime: 2023-09-25T20:49:36.375383Z
              pool-1-thread-2 endTime: 2023-09-25T20:49:41.375770Z
            
      • Example of fixedDelay

          Runnable fixedDelayTask =
                () -> {
                  System.out.println(Thread.currentThread().getName() 
                  + " startTime: " + Instant.now());
                  try {
                    Thread.sleep(5000); // 5 seconds
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  System.out.println(Thread.currentThread().getName() 
                  + " endTime: " + Instant.now());
                };
          ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
          ses.scheduleAtFixedRate(fixedDelayTask, 0, 1, TimeUnit.SECONDS);
        
        • Output

            pool-1-thread-1 startTime: 2023-09-25T20:57:29.455850Z
            pool-1-thread-1 endTime: 2023-09-25T20:57:34.492649Z
                    
            pool-1-thread-1 startTime: 2023-09-25T20:57:35.495405Z
            pool-1-thread-1 endTime: 2023-09-25T20:57:40.495860Z
                    
            pool-1-thread-2 startTime: 2023-09-25T20:57:41.496456Z
            pool-1-thread-2 endTime: 2023-09-25T20:57:46.496884Z
          

CompletableFuture - Since Java1.8

  • Terminology
    • async(asynchronous)
      • Refers to the programming style that enables the initiation of tasks without blocking the calling thread
      • Common mechanisms for async programming includes callbacks(in Java8’s CompletabeFuture has callback methods like thenRun, thenAccept), promise(javascript), async/await constructs in languages like Javascript, Python and c#
      • Excerpt from this StackOverflow answer which I agree with

        Asynchronous means that the execution happens outside the flow of the caller, and is potentially deferred. The execution typically occurs in another thread.

    • non-blocking
      • Refers to the program execution flow which can execute operations sequentially without blocking or waiting.
  • CompletableFuture is used to run tasks asynchronously and also chain multiple asynchronous operations with callback methods like thenRun, thenAccept, thenApply, thenCompose

  • Run a task asynchronously
    • Using Executor/ExecutorService Pattern - Java5

      Runnable r = () -> System.out.println("Hello");
      ExecutorService es = Executors.newSingleThreadExecutor();
      Future<?> task = es.submit(r);
      try {
        task.get();
        // blocking-call waits till task completion, throws checked-exceptions
      } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
      }
      es.shutdown();
      /*-blocking-call, doesn't accept new tasks, waits till current tasks 
      are completed */
      
    • Using CompletableFuture Pattern - Java8

      Runnable r = () -> System.out.println("Hello");
      CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(r);
      asyncTask.join();
      // blocking-call waits till task completion, throws un-checked exceptions
      
      • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool()

      • In the above example by default, it executes the runnable in default common Fork Join Pool

      • To pass explicit thread pool and run task in that thread pool

        • There is an overloaded version of runAsync i.e., runAsync(runnable, executor)
  • Run a task that returns a value asynchronously
    • Using Executor/ExecutorService Pattern - Java5

      // we use callable to run a job/task that returns result
      Callable<String> c = () -> "Hello";
      // callable call() returns a value and  throws Checked Exception
      ExecutorService es = Executors.newSingleThreadExecutor();
      Future<String> task = es.submit(c); // callable returns a future
      try {
        task.get();
        // blocking-call waits till task completion, throws checked-exceptions
      } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
      }
      es.shutdown();
      /*-blocking-call, doesn't accept new tasks, waits till current tasks 
      are completed */
      
    • Using CompletableFuture Pattern - Java8

      /* CompletableFuture does not work with Callable, rather the 
      equivalent here is Supplier*/
      Supplier<String> supplier = () -> "Hello";
      // Supplier cannot throw any checked exceptions
      CompletableFuture<String> asyncTask = 
          CompletableFuture.supplyAsync(supplier);
      String taskResult = asyncTask.join();
      /* blocking-call waits till task completion, 
        throws un-checked exceptions */
      System.out.println(taskResult); // Prints Hello
      
java.util.concurrent.Future java.util.concurrent.CompletableFuture
Since Java1.5 Since Java 1.8
Interface. Known Implementations: FutureTask, ForkJoinTask, RecursiveAction, RecursiveTask class that implements CompletionStage and Future
Refer Javadoc: java.util.concurrent.Future Refer javadoc: CompletableFuture and CompletionStage
Frequently used methods: For more details refer Javadoc Frequently used methods: For more details refer Javadoc
v get() - Waits if necessary or the computation to complete, and then retrieve its result - Blocking call get()Implemented
v get(long timeout, TimeUnit unit) - Waits if necessary or the computation to complete, and then retrieve its result - Blocking call get(timeout, unit) Implemented
boolean isCancelled() - Returns true if this task was cancelled before it completed normally isCancelled() Implemented
boolean isDone() - Returns true if this task is completed isDone() Implemented
  static methods
  static CompletableFuture<Void> runAsync(Runnable runnable) - Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.
  static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) - Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.
  static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) - Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
  static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) - Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
  static <U> CompletableFuture<U> completedFuture(U value) - Returns a new CompletableFuture that is already completed with the given value.
  static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) - Returns a new CompletableFuture that is completed when any of the given CompletableFutures complete, with the same result.
  static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) - Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.
  Contains other instance methods used for chaining the CompletableFuture like .thenRun(runnable), .thenRunAsync(runnable), .thenAccept(Consumer<? super T> action), .thenAcceptAsync(Consumer<? super T> action), .thenApply(Function<I,O> f), .thenApplyAsync(Function<I,O> f), .thenCompose(Function<? super T, ? extends CompletionStage<U>> fn), .thenComposeAsync(Functon<? super T, ? extends CompletionStage<U>> fn)
  • Chaining
    • .thenRun(runnable) - returns CompletableFuture<Void> - Executes a Runnable once the future completes
      • Run after the previous stage of CompletableFuture completion
      • Example:

        CompletableFuture<Integer> future = 
            CompletableFuture.supplyAsync(() -> 99);
        CompletableFuture<Void> actionFuture = 
            future.thenRun(() -> {
                // Post completion activities
                System.out.println("Action successful.");
            });
        
    • .thenAccept(consumer) - returns CompletableFuture<Void>
      • Consume previous stage or CompletableFuture result
      • Example:

        CompletableFuture<Integer> future = 
            CompletableFuture.supplyAsync(() -> 99);
        
        CompletableFuture<Void> actionFuture = 
            future.thenAccept(result -> {
              System.out.println("Result: " + result);
          });
        
    • .thenApply(function) - returns <U> CompletableFuture<U>
      • Transform the result of previous stage or CompletableFuture into different result type
      • It’s a like a map which transforms an input type to another output type
      • Example:

        CompletableFuture<Integer> future = 
              CompletableFuture.supplyAsync(() -> 99);
        CompletableFuture<String> transformedFuture = 
              future.thenApply(result -> "Result: " + result);
        
    • .thenCompose() - Used for chaining multiple asynchronous operations together in a sequential manner, often in situations where one operation depends on the result of a previous one.
      • thenCompose creates a nested chain of CompletableFutures, and the result of one CompletableFuture influences the execution of the next.
      • It’s like a flatMap

      • Example:

        CompletableFuture<Integer> cf = 
            CompletableFuture.supplyAsync(() -> getEmployeeId());
        CompletableFuture<String> composedFuture =
          cf.thenCompose(empId -> 
            // Perform another asynchronous operation based on the result
            CompletableFuture.supplyAsync(() -> getEmployeeName(empId));
              
        composedFuture.thenAccept(empName -> 
            System.out.println("Employee Name = " + empName));
        composedFuture.join(); // wait for the chain of actions to be completed
        
  • non-async and async versions of instance methods for chaining
    • Note: Both are non-blocking calls, it’s just that Async versions run in a separate thread unlike non-async versions which run task in the calling thread itself.
    • Example: thenRun vs thenRunAsync
      • thenRun

        System.out.println("1. Launcher thread -> " 
              + Thread.currentThread().getName()); // 1
        ExecutorService es = Executors.newFixedThreadPool(2);
        CompletableFuture<Integer> future =
            CompletableFuture.supplyAsync(
                () -> {
                  try {
                    Thread.sleep(3000);
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  System.out.println(
                      "3. In supply Async thread -> " 
                      + Thread.currentThread().getName()); // 3
                  return 99;
                },
                es);
        
        future.thenRun(
            () -> {
              // Runs in the same thread of the previous stage or calling thread
              System.out.println("4. In thenRun thread -> " 
                  + Thread.currentThread().getName()); // 4
              System.out.println("5. Action completed synchronously."); // 5
            });
        System.out.println(
            "2. If this is printed before thenRun execution "
                + "-- thenRun is non-blocking call"); // 2
        
        future.join(); 
        // throws un-checked exceptions, and waits/blocks for future to complete
        es.shutdown(); 
        // stops accepting new tasks, waits for currently executing tasks
        

        Output:

        1. Launcher thread -> main
        2. If this is printed before thenRun execution – thenRun is non-blocking call
        3. In supply Async thread -> pool-1-thread-1
        4. In thenRun thread -> pool-1-thread-1
        5. Action completed synchronously.
      • thenRunAsync

        System.out.println("1. Launcher thread -> " 
              + Thread.currentThread().getName()); // 1
        ExecutorService es = Executors.newFixedThreadPool(2);
        CompletableFuture<Integer> future =
            CompletableFuture.supplyAsync(
                () -> {
                  try {
                    Thread.sleep(3000);
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  System.out.println(
                      "3. In supply Async thread -> " 
                      + Thread.currentThread().getName()); // 3
                  return 99;
                },
                es);
        
        future.thenRunAsync(
            () -> {
              // Runs in another thread from the thread pool
              System.out.println("4. In thenRunAsync thread -> " 
                  + Thread.currentThread().getName()); // 4
              System.out.println("5. Action completed asynchronously."); // 5
            }, es);
        System.out.println(
            "2. If this is printed before thenRunAsync execution "
                + "-- thenRunAsync is non-blocking call"); // 2
        
        future.join(); 
        // throws un-checked exceptions, and waits/blocks for future to complete
        es.shutdown(); 
        // stops accepting new tasks, waits for currently executing tasks
        

        Output:

        1. Launcher thread -> main
        2. If this is printed before thenRunAsync execution – thenRunAsync is non-blocking call
        3. In supply Async thread -> pool-1-thread-1
        4. In thenRunAsync thread -> pool-1-thread-2
        5. Action completed asynchronously.
    non-Async versions Async versions
    Runs the task in calling thread or thread of the previous stage Run the task in another from pool if threadpool is passsed explicitly otherwise runs it in another thread from common fork-join pool
    non-blocking non-blocking
    thenRun() thenRunAsync()
    thenAccept() thenAcceptAsync()
    thenApply() thenApplyAsync()
    thenCompose() thenComposeAsync()

In summary, I’ve tried to shed some light on the evolution of concurrency in Java and the corresponding language constructs. To explore more on this, refer to Javadocs, and for a deeper understanding, I would recommend applying these concurrency principles and language constructs in real-time use cases.

References

  1. Multithreading in Java
  2. Java Concurrency
  3. Concurrency in Java
  4. java.util.concurrent.ThreadPoolExecutor
  5. scheduleAtFixedRate vs scheduleWithFixedDelay