Concurrency in Java
Low-Level Threading
-
Creating and running threads. Refer. java.lang.Thread, java.lang.Runnable
Runnable r = () -> System.out.println("job/task"); Thread t = new Thread(r); //1. new state t.start();// 2. Runnable state
-
Thread States
- Thread Groups. java.lang.ThreadGroup
-
Example:
ThreadGroup tg = new ThreadGroup("srk"); Runnable task = () -> System.out.println( Thread.currentThread().getName() + "-" + "Executing task"); Thread t1 = new Thread(tg, task); t1.start(); Thread t2 = new Thread(tg, task); t2.start(); System.out.println(tg.activeCount()); // 2 System.out.println(tg.getName()); // srk System.out.println(tg.getParent().getName()); // main
- Excerpt from Do not invoke ThreadGroups
Each thread in Java is assigned to a thread group upon the thread’s creation. These groups are implemented by the java.lang.ThreadGroup class. When the thread group name is not specified explicitly, the main default group is assigned by the Java Virtual Machine (JVM) [Java Tutorials]. The convenience methods of the ThreadGroup class can be used to operate on all threads belonging to a thread group at once. For instance, the ThreadGroup.interrupt() method interrupts all threads in the thread group. Thread groups also help reinforce layered security by confining threads into groups so that they avoid interference with threads in other groups
- SONAR Rule: https://rules.sonarsource.com/java/RSPEC-3014/
-
volatile
- Keyword in java
- Used in multi-threading use cases
- Can be applied on variables that ensures that data is read and written to main memory
- Doesn’t work on non-atomic operations
- Recommended applying on boolean data types
synchronized
- keyword in java
- Can be applied on methods and code blocks i.e., synchronized methods and synchronized blocks
- Ensures that the critical-section of code is prevented from parallel execution by multiple threads by acquiring lock on class or object.
- Class level lock: When
synchronized
applied on a static method - Object/instance level lock: When
synchronized
is applied on instance method -
Example depicting the class level lock vs object level lock:
class A { static synchronized void m1() {} synchronized void m2(){} static synchronized void m3() {} synchronized void m3(){} } public class LockDemo { public static void main() { A a = new A(); Thread t1 = new (() -> A.m1()); Thread t2 = new (() -> a.m2()); Thread t3 = new (() -> A.m3()); Thread t4 = new (() -> a.m4()); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); } }
- While
t1
is running, it acquires lock on class which means no other thread can execute other static synchronized methods of the class.- But other threads can execute non-static synchronized methods
- Similarly, while
t2
is running it acquires lock on object meaning no other thread can execute non-static synchronized methods of that object- But, other threads can execute static synchronized methods
- In summary
The
static synchronized
method holds class-level lock, preventing other threads from entering anystatic synchronized
methods on the same classThe non-static synchronized method holds the instance-level/object lock, preventing other threads from entering any synchronized methods on the same instance
However, it’s important to note that these two locks are independent of each other.
While one thread is executing a
static synchronized
method, it does not prevent another thread from executing non-static synchronized methods on the same instanceSimilarly, while one thread is executing a non-static synchronized method, it does not prevent another thread from executing
static synchronized
methods on the classTherefore, threads can acquire and release locks independently at both the class and instance levels.
- synchronized block
- The above example can be rewritten using synchronized blocks as shown below
-
Version1
class A { private static final Object classLock = new Object(); private final Object objectLock = new Object(); static void m1() { synchronized(classLock){} } void m2(){ synchronized(objectLock){} } static void m3() { synchronized(classLock){} } void m4(){ synchronized(objectLock){} } }
-
Version2
class A { static void m1() { synchronized(A.class) {} } void m2() { synchronized(this) {} } static void m3() { synchronized(A.class) {} } void m4() { synchronized(this) {} } }
-
Note: In both versions of the examples, the class level lock is applied to methods m1(), m3() whether they have static in method signature or not.
-
- The above example can be rewritten using synchronized blocks as shown below
-
- Data stored in
ThreadLocal
variable is only accessible and modifiable by the thread that created it.-
Creating
ThreadLocal
and adding state to it.ThreadLocal tLocal = new ThreadLocal(); tLocal.set(employee); // add state to thread tLocal.get(); // to retrieve state of thread tLocal.remove(); // to cleanup state associated with thread
-
ThreadLocal
with genericsThreadLocal<String> tLocal = new ThreadLocal<>(); tLocal.set(employee); // add state to thread tLocal.get(); // to retrieve state of thread tLocal.remove(); // to cleanup state associated with thread
-
- Note:
ThreadLocal
values should be removed or cleared when they no longer needed to prevent holding references to objects longer than necessary.
- Data stored in
High-Level Threading - Abstraction
java.util.concurrent
package since java 1.5 or 5.0- Provides utility classes commonly used in concurrent programming.
java.util.concurrent.locks
-
java.lang.Runnable
java.util.concurrent.Callable
public interface Runnable { void run(); }
public interface Callable<V> { V call() throws Exception;}
void run()
- When an object implementing interface Runnable is used to create a thread, starting a thread causes the object’s run method to be called in that separately executing threadV call()
- Computes a result, or throws an exception if unable to do sojava.util.concurrent.Executor
java.util.concurrent.ExecutorService
Frequently used methods: For more details refer Javadoc Frequently used methods: For more details refer Javadoc void execute(Runnable command)
- Executes the given command at some time in the future.Future<?> submit(Runnable task)
- Submits a runnable task for execution and returns a Future representing that task<T> Future<T> submit(Runnable task, T result)
- Submits a Runnable task for execution and returns a Future representing that task<T> Future<T> submit(Callable<T> task)
- Submits a value-returning task for execution and returns a Future representing the pending results of the task.void shutdown()
- Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.List<Runnable> shutdownNow()
- Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.
- Provides utility classes commonly used in concurrent programming.
java.util.concurrent.ThreadPoolExecutor
- All we have to do is just submit the jobs/tasks to an ExecutorService(single or fixed or cached or custom) that would internally take care of thread creation and management of threads within the pool.
- Terminology
-
Excerpt from Javadoc of
ThreadPoolExecutor
Constructorpublic ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
Note: It may be more convenient to use one of the Executors factory methods instead of this general purpose constructor.
Paramaters:
corePoolSize - the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize - the maximum number of threads to allow in the pool
keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit - the time unit for the keepAliveTime argument
workQueue - the queue to use for holding tasks before they are executed.
-
- ThreadPoolExecutor Internals
-
java.util.concurrent.Executors
class provides factory methods to create 4 standard thread pools like Single, Fixed, Cached, and Scheduled. However, we can create our own custom ThreadPoolExecutor config as well.- Single Thread Pool Executor
-
Example:
Runnable task = () -> System.out.println("Running a task or job"); ExecutorService es = Executors.newSingleThreadExecutor(); Future<?> result = es.submit(task); // accepts runnable or callable /*- submit returns Future<?> for Runnable task, Future<T> for Callable task */ try { task.get(); /*- blocking-call waits till task completion, throws checked-exceptions*/ } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } /*- to submit multiple tasks use invokeAll(Collection<Callable<T>>) that returns List<Future<T>>*/ es.shutdown(); // blocking call /*- shutdown - No new tasks are accepted and previously submitted tasks are executed. */ /*- * Single Thread Executor * * Creates 1 Thread - ThreadPoolExecutor with the following internal impl or config * * new ThreadPoolExecutor(1, * 1, * 0L, * TimeUnit.MILLISECONDS, * new LinkedBlockingQueue<Runnable>()) * * UnBounded Queue - No limit, will accept indefinite number of tasks * keepAliveTime - O - Milliseconds * CorePoolSize = MaxPoolSize = 1 */
-
Use Case: Suitable when tasks to be executed sequentially
-
- Fixed Thread Pool Executor
-
Example:
Runnable task = () -> System.out.println("Running a task or job"); ExecutorService es = Executors.newFixedThreadPool(2); Future<?> result = es.submit(task); // accepts runnable or callable /*- submit returns Future<?> for Runnable task, Future<T> for Callable task */ try { task.get(); /*- blocking-call waits till task completion, throws checked-exceptions*/ } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } /*- to submit multiple tasks use invokeAll(Collection<Callable<T>>) that returns List<Future<T>>*/ es.shutdown(); // blocking call /*- shutdown - No new tasks are accepted and previously submitted tasks are executed. */ /*- * Fixed Thread Pool Executor * * Creates Fixed Size ThreadPoolExecutor with the following internal impl or config * * new ThreadPoolExecutor(nThreads, * nThreads, * 0L, * TimeUnit.MILLISECONDS, * new LinkedBlockingQueue<Runnable>()) * * UnBounded Queue - No limit, will accept indefinite number of tasks * keepAliveTime - O - Milliseconds * CorePoolSize = MaxPoolSize = nThreads */
-
Use Case: Suitable when you are certain of the number of tasks or throughput.
-
- Cached Thread Pool Executor
-
Example:
Runnable task = () -> System.out.println("Running a task or job"); ExecutorService es = Executors.newCachedThreadPool(); Future<?> result = es.submit(task); // accepts runnable or callable /*- submit returns Future<?> for Runnable task, Future<T> for Callable task */ try { task.get(); /*- blocking-call waits till task completion, throws checked-exceptions*/ } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } /*- to submit multiple tasks use invokeAll(Collection<Callable<T>>) that returns List<Future<T>>*/ es.shutdown(); // blocking call /*- shutdown - No new tasks are accepted and previously submitted tasks are executed. */ /*- * Cached Thread Pool Executor * * Creates Cached ThreadPoolExecutor with the following internal impl or config * * new ThreadPoolExecutor(0, * Integer.MAX_VALUE, * 60L, * TimeUnit.SECONDS, * new SynchronousQueue<Runnable>()) * * UnBounded Queue - No limit, will accept indefinite number of tasks - Synchronous Queue: - It is a type of blocking queue in Java, but it has a capacity of zero. This means that it doesn't store elements like a traditional bounded queue (e.g., ArrayBlockingQueue or LinkedBlockingQueue) with a fixed capacity - In a SynchronousQueue, each put operation (task submission) blocks until there is a corresponding take operation (thread available to consume the task). It is often used for direct handoff of tasks between producer and consumer threads without queuing them. - Tt doesn't hold tasks for later processing. Tasks are handed off immediately to a waiting/consumer thread or result in the creation of a new thread if no threads are available. - When you submit a task to a cached thread pool with a SynchronousQueue, the task is either handed off to an existing idle thread for immediate execution or results in the creation of a new thread if all threads are busy. - The use of SynchronousQueue in a cached thread pool ensures that tasks are not queued but are processed immediately by threads, making it suitable for scenarios where you want to minimize thread creation and handle tasks efficiently during bursts of workload. * keepAliveTime - 60 - seconds * CorePoolSize = 0 * MaxPoolSize = nThreads * Threads in (non-core-pool) are reused after their task completion and killed if they are idle for 60 seconds(keepAliveTime) */
-
Use Case: When you are unsure of number of incoming tasks submissions, and suitable for short-lived dynamic number of task submissions. This threadpool can reuse, shrink, and expand as per the incoming task load.
-
- Scheduled Thread Pool Executor
-
java.util.concurrent.ScheduledExecutorService Excerpt From the javadoc An ExecutorService that can schedule commands to run after a given delay, or to execute periodically <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
Creates and executes a ScheduledFuture that becomes enabled after the given delayScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
Creates and executes a one-shot action that becomes enabled after the given delay.ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on.ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.- fixedRate
- Example: Assume fixedRate = 5000 ms = 5 sec
- Case 1: Assume jobs are completed before the delay interval i.e., 5 sec
- Assume a job started at 00:00:00(HH:mm:ss)
- 1st Run - 00:00:00(start time) - here fixed rate doesn’t care about end time
- 2nd Run - 00:00:05(start time) - here fixed rate doesn’t care about end time
- 3rd Run - 00:00:10(start time) - here fixed rate doesn’t care about end time
- And so on
- Assume a job started at 00:00:00(HH:mm:ss)
- Case2: Assume jobs takes more time than delay interval i.e., 5 sec in which case the jobs wait for the previous to finish
- 1st Run - 00:00:00(start time) - 00:00:07(end time)
- 2nd Run - 00:00:07(start time) - 00:00:15(end time)
- 2nd Run - 00:00:15(start time) - 00:00:28(end time)
- Case 1: Assume jobs are completed before the delay interval i.e., 5 sec
- Example: Assume fixedRate = 5000 ms = 5 sec
- fixedDelay
- Example: Assume fixedDelay = 5000 ms = 5 sec
- Assume a job started at 00:00:00(HH:mm:ss)
- 1st Run - 00:00:00(start time) - 00:00:02 (Assume end time) - task completed in 2 seconds
- Waits for 5 ms after task 1 completion i.e., next Run start time = 00:00:02 + 5 ms = 00:00:07
- 2nd Run - 00:00:07(start time) - 00:00:13*(Assume end time) - task completed in 6 seconds
- Waits for 5 ms after task 2 completion i.e., next Run start time - 00:00:13 + 6 ms = 00:00:19
- And so on
- Assume a job started at 00:00:00(HH:mm:ss)
- Example: Assume fixedDelay = 5000 ms = 5 sec
- Applicability
- fixedRate
- Ensures that each run’s start time is at the consistent time intervals. There might be time tasks overlap with time which means multiple tasks may be running at a give instance
- When you want to do an activity every x secs, x mins, x hours irrespective of the previous task completion status
- Example: Log cleanup every day at 00:00:00.
- Another example could be to find the health of a service by hitting the status endpoint for every x seconds.
- fixedDelay
- No Overlap of tasks. Each run will be started only after the previous run has finished + a fixedDelay time so, tasks are executed sequentially.
- Example: Retries after each failed operation. In other words, no time overlap of retries. Each retry is done after a fixed delay and all retries are made sequentially.
- fixedRate
- fixedRate
-
-
Example of fixedRate case1 - when tasks are completed before the delay time:
-
Runnable fixedRateTask = () -> { System.out.println(Thread.currentThread().getName() + " startTime: " + Instant.now()); try { Thread.sleep(1000); // 1 second } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " endTime: " + Instant.now()); }; ScheduledExecutorService ses = Executors.newScheduledThreadPool(5); ses.scheduleAtFixedRate(fixedRateTask, 0, 5, TimeUnit.SECONDS);
-
Output
pool-1-thread-1 startTime: 2023-09-25T20:47:07.385706Z pool-1-thread-1 endTime: 2023-09-25T20:47:08.422220Z pool-1-thread-1 startTime: 2023-09-25T20:47:12.381992Z pool-1-thread-1 endTime: 2023-09-25T20:47:13.382415Z pool-1-thread-2 startTime: 2023-09-25T20:47:17.381134Z pool-1-thread-2 endTime: 2023-09-25T20:47:18.381550Z
-
-
-
Example of fixedRate case2 - when tasks take more than the delay time, then each run waits for previous task to be completed.
-
Runnable fixedRateTask = () -> { System.out.println(Thread.currentThread().getName() + " startTime: " + Instant.now()); try { Thread.sleep(5000); // 5 seconds } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " endTime: " + Instant.now()); }; ScheduledExecutorService ses = Executors.newScheduledThreadPool(5); ses.scheduleAtFixedRate(fixedRateTask, 0, 1, TimeUnit.SECONDS);
-
Output
pool-1-thread-1 startTime: 2023-09-25T20:49:26.340729Z pool-1-thread-1 endTime: 2023-09-25T20:49:31.372340Z pool-1-thread-1 startTime: 2023-09-25T20:49:31.373885Z pool-1-thread-1 endTime: 2023-09-25T20:49:36.374391Z pool-1-thread-2 startTime: 2023-09-25T20:49:36.375383Z pool-1-thread-2 endTime: 2023-09-25T20:49:41.375770Z
-
-
-
Example of fixedDelay
Runnable fixedDelayTask = () -> { System.out.println(Thread.currentThread().getName() + " startTime: " + Instant.now()); try { Thread.sleep(5000); // 5 seconds } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " endTime: " + Instant.now()); }; ScheduledExecutorService ses = Executors.newScheduledThreadPool(5); ses.scheduleAtFixedRate(fixedDelayTask, 0, 1, TimeUnit.SECONDS);
-
Output
pool-1-thread-1 startTime: 2023-09-25T20:57:29.455850Z pool-1-thread-1 endTime: 2023-09-25T20:57:34.492649Z pool-1-thread-1 startTime: 2023-09-25T20:57:35.495405Z pool-1-thread-1 endTime: 2023-09-25T20:57:40.495860Z pool-1-thread-2 startTime: 2023-09-25T20:57:41.496456Z pool-1-thread-2 endTime: 2023-09-25T20:57:46.496884Z
-
- Single Thread Pool Executor
CompletableFuture - Since Java1.8
- Terminology
async(asynchronous)
- Refers to the programming style that enables the initiation of tasks without blocking the calling thread
- Common mechanisms for async programming includes callbacks(in Java8’s CompletabeFuture has callback methods like thenRun, thenAccept), promise(javascript), async/await constructs in languages like Javascript, Python and c#
- Excerpt from this StackOverflow answer which I agree with
Asynchronous means that the execution happens outside the flow of the caller, and is potentially deferred. The execution typically occurs in another thread.
non-blocking
- Refers to the program execution flow which can execute operations sequentially without blocking or waiting.
-
CompletableFuture is used to run tasks asynchronously and also chain multiple asynchronous operations with callback methods like thenRun, thenAccept, thenApply, thenCompose
- Run a task asynchronously
-
Using Executor/ExecutorService Pattern - Java5
Runnable r = () -> System.out.println("Hello"); ExecutorService es = Executors.newSingleThreadExecutor(); Future<?> task = es.submit(r); try { task.get(); // blocking-call waits till task completion, throws checked-exceptions } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } es.shutdown(); /*-blocking-call, doesn't accept new tasks, waits till current tasks are completed */
-
Using CompletableFuture Pattern - Java8
Runnable r = () -> System.out.println("Hello"); CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(r); asyncTask.join(); // blocking-call waits till task completion, throws un-checked exceptions
-
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool()
-
In the above example by default, it executes the runnable in default common Fork Join Pool
-
To pass explicit thread pool and run task in that thread pool
- There is an overloaded version of runAsync i.e.,
runAsync(runnable, executor)
- There is an overloaded version of runAsync i.e.,
-
-
- Run a task that returns a value asynchronously
-
Using Executor/ExecutorService Pattern - Java5
// we use callable to run a job/task that returns result Callable<String> c = () -> "Hello"; // callable call() returns a value and throws Checked Exception ExecutorService es = Executors.newSingleThreadExecutor(); Future<String> task = es.submit(c); // callable returns a future try { task.get(); // blocking-call waits till task completion, throws checked-exceptions } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } es.shutdown(); /*-blocking-call, doesn't accept new tasks, waits till current tasks are completed */
-
Using CompletableFuture Pattern - Java8
/* CompletableFuture does not work with Callable, rather the equivalent here is Supplier*/ Supplier<String> supplier = () -> "Hello"; // Supplier cannot throw any checked exceptions CompletableFuture<String> asyncTask = CompletableFuture.supplyAsync(supplier); String taskResult = asyncTask.join(); /* blocking-call waits till task completion, throws un-checked exceptions */ System.out.println(taskResult); // Prints Hello
-
java.util.concurrent.Future | java.util.concurrent.CompletableFuture |
---|---|
Since Java1.5 | Since Java 1.8 |
Interface. Known Implementations: FutureTask, ForkJoinTask, RecursiveAction, RecursiveTask | class that implements CompletionStage and Future |
Refer Javadoc: java.util.concurrent.Future | Refer javadoc: CompletableFuture and CompletionStage |
Frequently used methods: For more details refer Javadoc | Frequently used methods: For more details refer Javadoc |
v get() - Waits if necessary or the computation to complete, and then retrieve its result - Blocking call |
get() Implemented |
v get(long timeout, TimeUnit unit) - Waits if necessary or the computation to complete, and then retrieve its result - Blocking call |
get(timeout, unit) Implemented |
boolean isCancelled() - Returns true if this task was cancelled before it completed normally |
isCancelled() Implemented |
boolean isDone() - Returns true if this task is completed |
isDone() Implemented |
static methods | |
static CompletableFuture<Void> runAsync(Runnable runnable) - Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action. |
|
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) - Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action. |
|
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) - Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier. |
|
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) - Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. |
|
static <U> CompletableFuture<U> completedFuture(U value) - Returns a new CompletableFuture that is already completed with the given value. |
|
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) - Returns a new CompletableFuture that is completed when any of the given CompletableFutures complete, with the same result. |
|
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) - Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete. |
|
Contains other instance methods used for chaining the CompletableFuture like .thenRun(runnable) , .thenRunAsync(runnable) , .thenAccept(Consumer<? super T> action) , .thenAcceptAsync(Consumer<? super T> action) , .thenApply(Function<I,O> f) , .thenApplyAsync(Function<I,O> f) , .thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) , .thenComposeAsync(Functon<? super T, ? extends CompletionStage<U>> fn) |
- Chaining
.thenRun(runnable)
- returnsCompletableFuture<Void>
- Executes a Runnable once the future completes- Run after the previous stage of CompletableFuture completion
-
Example:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 99); CompletableFuture<Void> actionFuture = future.thenRun(() -> { // Post completion activities System.out.println("Action successful."); });
.thenAccept(consumer)
- returnsCompletableFuture<Void>
- Consume previous stage or CompletableFuture result
-
Example:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 99); CompletableFuture<Void> actionFuture = future.thenAccept(result -> { System.out.println("Result: " + result); });
.thenApply(function)
- returns<U> CompletableFuture<U>
- Transform the result of previous stage or CompletableFuture into different result type
- It’s a like a map which transforms an input type to another output type
-
Example:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 99); CompletableFuture<String> transformedFuture = future.thenApply(result -> "Result: " + result);
.thenCompose()
- Used for chaining multiple asynchronous operations together in a sequential manner, often in situations where one operation depends on the result of a previous one.thenCompose
creates a nested chain of CompletableFutures, and the result of one CompletableFuture influences the execution of the next.-
It’s like a flatMap
-
Example:
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> getEmployeeId()); CompletableFuture<String> composedFuture = cf.thenCompose(empId -> // Perform another asynchronous operation based on the result CompletableFuture.supplyAsync(() -> getEmployeeName(empId)); composedFuture.thenAccept(empName -> System.out.println("Employee Name = " + empName)); composedFuture.join(); // wait for the chain of actions to be completed
- non-async and async versions of instance methods for chaining
- Note: Both are non-blocking calls, it’s just that Async versions run in a separate thread unlike non-async versions which run task in the calling thread itself.
- Example:
thenRun
vsthenRunAsync
-
thenRun
System.out.println("1. Launcher thread -> " + Thread.currentThread().getName()); // 1 ExecutorService es = Executors.newFixedThreadPool(2); CompletableFuture<Integer> future = CompletableFuture.supplyAsync( () -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "3. In supply Async thread -> " + Thread.currentThread().getName()); // 3 return 99; }, es); future.thenRun( () -> { // Runs in the same thread of the previous stage or calling thread System.out.println("4. In thenRun thread -> " + Thread.currentThread().getName()); // 4 System.out.println("5. Action completed synchronously."); // 5 }); System.out.println( "2. If this is printed before thenRun execution " + "-- thenRun is non-blocking call"); // 2 future.join(); // throws un-checked exceptions, and waits/blocks for future to complete es.shutdown(); // stops accepting new tasks, waits for currently executing tasks
Output:
- Launcher thread -> main
- If this is printed before thenRun execution – thenRun is non-blocking call
- In supply Async thread -> pool-1-thread-1
- In thenRun thread -> pool-1-thread-1
- Action completed synchronously.
-
thenRunAsync
System.out.println("1. Launcher thread -> " + Thread.currentThread().getName()); // 1 ExecutorService es = Executors.newFixedThreadPool(2); CompletableFuture<Integer> future = CompletableFuture.supplyAsync( () -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "3. In supply Async thread -> " + Thread.currentThread().getName()); // 3 return 99; }, es); future.thenRunAsync( () -> { // Runs in another thread from the thread pool System.out.println("4. In thenRunAsync thread -> " + Thread.currentThread().getName()); // 4 System.out.println("5. Action completed asynchronously."); // 5 }, es); System.out.println( "2. If this is printed before thenRunAsync execution " + "-- thenRunAsync is non-blocking call"); // 2 future.join(); // throws un-checked exceptions, and waits/blocks for future to complete es.shutdown(); // stops accepting new tasks, waits for currently executing tasks
Output:
- Launcher thread -> main
- If this is printed before thenRunAsync execution – thenRunAsync is non-blocking call
- In supply Async thread -> pool-1-thread-1
- In thenRunAsync thread -> pool-1-thread-2
- Action completed asynchronously.
-
non-Async versions Async versions Runs the task in calling thread or thread of the previous stage Run the task in another from pool if threadpool is passsed explicitly otherwise runs it in another thread from common fork-join pool non-blocking non-blocking thenRun()
thenRunAsync()
thenAccept()
thenAcceptAsync()
thenApply()
thenApplyAsync()
thenCompose()
thenComposeAsync()
In summary, I’ve tried to shed some light on the evolution of concurrency in Java and the corresponding language constructs. To explore more on this, refer to Javadocs, and for a deeper understanding, I would recommend applying these concurrency principles and language constructs in real-time use cases.