CompletableFuture Getting Started 3 (Try issuing an asynchronous request)

CompletableFuture Getting Started 2 (Try to make CompletableFuture) Continued

Issue multiple requests for asynchronous processing

There is a list of String as below, and each of these is used as an argument for asynchronous processing.

private static final List<String> argsList 
= Arrays.asList("test1", "test2", "test3", "test4");

Asynchronous processing is performed using Stream.

public static List<String> getDoubles() {
    List<CompletableFuture<String>> doubleFutures = argsList.stream()
            //Create a Completable Future using a factory method
            .map(arg -> CompletableFuture.supplyAsync(
                    () -> String.format("value: %f", doSomeLongComputation(arg))
            ))
            .collect(Collectors.toList());

    List<String> strs = doubleFutures.stream()
            //Use join to get CompletableFuture results
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    return strs;
}

There are two points to note. Described in Completable Future Getting Started 2, Using the CompletableFuture.supplyAsync factory method, Creating a list of CompletableFuture.

In addition, when getting the processing result from the created CompletableFuture list, You are using the CompletableFuture.join method.

The difference between the get method and the join method is explained in detail on the following page. completablefuture join vs get Simply put, when using the join method, you don't have to explicitly write a try-catch statement for the exception. (However, it is not without exceptions, so care is required.)

Description of continuous asynchronous processing

public static List<Double> getDoubleByTimes() {
    List<CompletableFuture<Double>> futures = argsList.stream() 
            .map(arg -> CompletableFuture.supplyAsync(
                    () -> doSomeLongComputation(arg)
            )) // Stream<CompletableFuture>Generate a
            //Pass the result of the first CompletableFuture to the second CompletableFuture
            .map(future -> future.thenCompose(value ->
                    CompletableFuture.supplyAsync(
                            () -> timeLongComputation(value)))) 
            .collect(toList());
    return futures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

The processing flow is as follows

List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<Double>> // CompletableFuture.thenCompose
↓
List<CompletableFuture<Double>> // toList()

By using thenCompose Two different Completable Futures can be used in cascade.

thenCompose(Function<? super T,? extends CompletionStage> fn)

When this stage completes successfully Returns a new CompletionStage that will be executed with this stage set as an argument to the specified function. thenCompose@Oracle

Call the thenCompose of the first CompletableFuture and Pass the result to the function. This function takes the value returned by the first CompletableFuture as an argument The value calculated in the second CompletableFuture using that argument is returned as the return value.

notes (difference between then Apply and then Compose)

A similar function is thenApply.

thenApply(Function<? super T,? extends U> fn) Returns a new CompletionStage that will be executed when this stage completes successfully, with the result of this stage set as an argument to the specified function. thenApply@Oracle

CompletableFuture | thenApply vs thenCompose

The explanation of StackOverFlow above is detailed. In particular, the following answers were very helpful.

thenApply() returned the nested futures as they were, but thenCompose() flattened the nested CompletableFutures so that it is easier to chain more method calls to it.

If you use thenApply in the above code, the return value will be, It looks like CompletableFuture <CompletableFuture <Double >> Completable Future will now be nested.

List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<CompletableFuture<Double>>> // CompletableFuture.thenApply
↓
List<CompletableFuture<CompletableFuture<Double>>> // toList()

The Java 9 documentation cites the following examples as similar concepts: CompletableFuture.thenApply ⇄ Stream.map CompletableFuture.thenCompose ⇄ Stream.flatMap

For map and flatMap, Toward understanding map and flatmap in Stream (1) So, I summarized it a little.

When using the results of two independent Completable Futures

thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

Returns a new CompletionStage that will be executed when both this stage and the other specified stages complete successfully. (At run time, two results are used as arguments to the specified function).

List<CompletableFuture<Double>> futures = argsList.stream()
        .map(arg -> CompletableFuture.supplyAsync(
                () -> doSomeLongComputation(arg)
        ))
        .map(future -> future.thenCombine(
                CompletableFuture.supplyAsync(
                        () -> doSomeLongComputation("test")),
                (resultFromFirstFuture, resultFromSecondsFuture) -> resultFromFirstFuture * resultFromSecondsFuture
        ))
        .collect(toList());

return futures.stream()
        .map(CompletableFuture::join)
        .collect(toList());

notes: thenCombine / thenCombineAsync If you have an Async suffix, The second argument, BiFunction, is passed to the thread pool and It will be executed asynchronously as a different task.

reference Java8 In Action

Recommended Posts

CompletableFuture Getting Started 3 (Try issuing an asynchronous request)
CompletableFuture Getting Started 2 (Try to make CompletableFuture)