I want to use a reactor to perform blocking processing in multiple threads and summarize the results. The specific mounting method is described in the manual.
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
However, if you actually implement it this way, Schedulers.elastic ()
will generate a lot of threads, which may consume resources.
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //I dare to block for explanation
.subscribeOn(Schedulers.elastic()))
.blockLast();
}
Execution log
・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //I dare to block for explanation
.subscribeOn(Schedulers.elastic())
, 10) //Specify 10 for concurrency
.blockLast();
}
Execution log
02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・
The number of threads in the elastic scheduler is limited to 10.
In fact, even when calling flatMap ()
, concurrency is specified internally. However, because the value is Large It looked like a log with no restrictions.
Also, since Executor can be specified for subscribeOn
, it is possible to perform the same processing using FixedThreadPool or WorkStealingPool (ForkJoinPool), although there are minor algorithm differences. Specify .subscribeOn (Schedulers.fromExecutor (executorService)))
instead of.subscribeOn (Schedulers.elastic ())
.
Also, if you want to surely shut down these worker threads when the application is stopped, it seems effective to use ExecutorService.