Getting Started with Reactive Streams and the JDK 9 Flow API

Overview

This article is a note when exploring Reactive Streams and the JDK Flow API.

Flow API (java.util.concurrent.Flow) is an API introduced in JDK 9 (JEP 266) and is theReactive Streams Special Interest Group. It corresponds to the specification ([Reactive Streams](https://www.reactive-streams.org/)) created by a working group called(SIG). JVM libraries that support this specification include Akka Streams (Lightbend, Inc.), ReactiveX / RxJava, etc. Yes, Project Reactor (Pivotal Software, Inc.) used in Spring WebFlux is also supported.

environment

reference

About Reactive Streams

** What is Reactive Streams **

The following is taken from the opening sentence of Reactive Streams. (Japanese translation is Google Translate.)

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Reactive Streams is an initiative that provides a standard for asynchronous stream processing with non-blocking back pressure. This includes work on runtime environments (JVM and JavaScript) and network protocols.

The sentence "** asynchronous stream processing with non-blocking back pressure" in this sentence clearly describes the characteristics of Reactive Streams. The explanation of each term is quoted from the glossary below.

** What is non blocking? **

Non-Blocking

The API will access the resource if it is available, otherwise it will return immediately to tell the caller that the resource is not currently available or that the operation has started and has not yet completed. The non-blocking API for resources allows callers to do other work instead of blocking and waiting for resources to become available.

** What is back pressure? **

Back Pressure

It is not permissible for overloaded components to crash catastrophically or lose messages without control. If the process is stuck and can't afford to crash, the component should tell the upstream components that it is overloaded to reduce the load. This mechanism, called back-pressure, is an important feedback mechanism that keeps responding slowly without disrupting the system under overload.

** What is asynchronous? **

Asynchronous

In the context of a reactive declaration, it means "a request sent from a client to a service is processed at any time after it is sent". The client cannot directly observe or synchronize the execution of request processing within the destination service.

Reactive Streams Specification for the JVM

The specification for JVM created by SIG has been updated to version 1.0.3 as of November 2019.

** Deliverables **

Maven's deliverables include the following, but since these are specifications, TCK (Technology Compatibility Kit), and implementation examples, they are not used directly in normal projects, but libraries such as Akka Streams, ReactiveX / RxJava, and Reactor are used. I think it will be done.

<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams -->
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck -->
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-tck</artifactId>
    <version>1.0.3</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck-flow -->
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-tck-flow</artifactId>
    <version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-examples -->
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-examples</artifactId>
    <version>1.0.3</version>
</dependency>

API Components

The following four interfaces are defined in the Reactive Streams specification for JVM version 1.0.3.

Publisher

Publisher is a provider of unlimited or finite sequenced elements (that is, publishing a data stream) that publishes elements when it receives a request from a Subscriber (through Subscription).

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
Method Description
subscribe A factory method that asks Publisher to start streaming data. Can be called multiple times for each new Subscription.

Subscriber

Subscriber consumes the elements subscribed to by Publisher. The onXxx method of this interface is a callback method that corresponds to the signal from Publisher.

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
Method Description
onSubscribe Publisher#Executed after calling subscribe. Subscriber requests or cancels data using the Subscription received as an argument.
onNext Subscription#Executed after calling request.
onError Executed when Publisher data transmission fails.
onComplete Executed when Publisher data transmission is completed normally.(Including cancellation)

Subscription

Subscription is a one-to-one representation of a Publisher and the Subscribers that subscribe to that Publisher. The Subscriber requests the Publisher to send or cancel the data via the Subscription method.

public interface Subscription {
    public void request(long n);
    public void cancel();
}
Method Description
request Request Publisher to send data.
cancel Request Publisher to stop sending data and clean up resources.

Processor

Processor is a component that has both Subscriber and Publisher functionality. The Processor is located between the Publisher at the beginning and the Subscriber at the end, but it is possible to concatenate and place multiple Processors instead of just one.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

The Processor is not always necessary, and if it is not necessary, the Publisher and Subscriber will work directly as shown in the figure below.

+-----------+              +------------+
|           | <-subscribe- |            |
| Publisher |              | Subscriber |
|           | <--request-- |            |
+-----------+              +------------+

The figure below is an image when two Processors (A, B) are connected and placed. A situation where a Processor is needed in the middle like this is when you want to perform filtering or data conversion in the middle of a data stream.

+-----------+              +-----------+              +-----------+              +------------+
|           | <-subscribe- |           | <-subscribe- |           | <-subscribe- |            |
| Publisher |              | Processor |              | Processor |              | Subscriber |
|           | <--request-- |    (A)    | <--request-- |    (B)    | <--request-- |            |
+-----------+              +-----------+              +-----------+              +------------+

Implementation example

An example implementation can be found on GitHub (reactive-streams / reactive-streams-jvm). Below is a demo program that uses the AsyncIterablePublisher class, which is one of the Publisher implementation examples.

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
public class Demo {

  public static void main(String ... args) {
    List<Integer> elements = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
    ExecutorService executor = Executors.newFixedThreadPool(3);

    AsyncIterablePublisher<Integer> pub = new AsyncIterablePublisher<>(elements, executor);

    MySub mySub1 = new MySub("sub_1");
    MySub mySub2 = new MySub("sub_2");
    MySub mySub3 = new MySub("sub_3");

    log.info("start");

    // Publisher#When you call subscribe
    //Subscriber's onSubscribe method is called back
    pub.subscribe(mySub1);
    pub.subscribe(mySub2);
    pub.subscribe(mySub3);

    log.info("end");

    try {
      //Wait 30 seconds until the process is completed due to asynchronous processing
      TimeUnit.SECONDS.sleep(30);
      executor.shutdown();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  static class MySub implements Subscriber<Integer> {
    private final String name;
    private Subscription s;

    public MySub(String name) {
      this.name = name;
    }

    private Long getId() {
      return Thread.currentThread().getId();
    }

    @Override
    public void onSubscribe(Subscription s) {
      log.info("({}) onSubscribe:[{}]", getId(), name);
      this.s = s;
      //Request Publisher to publish data when subscription is complete
      //By requesting in the onSubscribe method, data issuance starts at the same time as the subscription is completed.
      s.request(1);
    }

    @Override
    public void onNext(Integer integer) {
      //OnNext method is called back when data is published from Publisher
      log.info("({}) onNext:[{}] item:{}", getId(), name, integer);

      //Perform data processing within this method
      //Do some data processing

      //Request Publisher to publish the following data
      s.request(1);

      //Or cancel
      //s.cancel();
    }

    @Override
    public void onError(Throwable t) {
      //Called back when an error occurs in publishing data of Publisher
      log.info("onError:[{}]", name);
    }

    @Override
    public void onComplete() {
      //Called back when Publisher data issuance completed (or canceled)
      log.info("({}) onComplete:[{}]", getId(), name);
    }

  }

}

Execution result

[main] INFO Demo - start
[main] INFO Demo - end
[pool-1-thread-2] INFO Demo - (15) onSubscribe:[sub_2]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:2
[pool-1-thread-3] INFO Demo - (16) onSubscribe:[sub_3]
[pool-1-thread-1] INFO Demo - (14) onSubscribe:[sub_1]
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:3
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:4
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:2
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:2
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:5
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:3
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:3
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:4
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:4
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:7
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:5
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:5
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:8
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:6
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:9
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:11
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:9
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:12
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:9
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:10
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:12
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:13
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:19
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:14
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:20
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:15
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_2]
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:16
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:12
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:17
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:20
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:18
[pool-1-thread-1] INFO Demo - (14) onComplete:[sub_1]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:20
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_3]

JDK Flow API

java.util.concurrent.Flow

The Flow class declares four interfaces that correspond to the Reactive Streams specification. You need to implement these interfaces when developing applications that support reactive streams.

public final class Flow {

    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }

    public static interface Subscriber<T> {
        public void onSubscribe(Subscription subscription);
        public void onNext(T item);
        public void onError(Throwable throwable);
        public void onComplete();
    }

    public static interface Subscription {
        public void request(long n);
        public void cancel();
    }

    public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

}

SubmissionPublisher<T>

For Publisher, there is an implementation class called SubmissionPublisher \ <T >, which can be used as it is or inherited to implement original processing.

constructor

constructor
SubmissionPublisher()
SubmissionPublisher​(Executor executor, int maxBufferCapacity)
SubmissionPublisher​(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,​? super Throwable> handler)
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>()) {
  //abridgement
}
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 8)) {
  //abridgement
}
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 8, (subscriber, throwable) -> {
})) {
  //abridgement
}

Data issuance

The SubmissionPublisher class has submit and ʻoffer` methods for publishing data.

Data publishing method
public int submit​(T item)
public int offer​(T item, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)
public int offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,​? super T> onDrop)

submit

submit blocks until the data can be submitted.

int lag = pub.submit(value);

if (lag < 0) {
  //submit does not drop
} else {
  //Maximum delay estimate(Number of items sent but not yet consumed)
}

offer

The offer does not block the data transmission, and can execute the processing (whether to resend or not, etc.) when the data cannot be transmitted. In this example, the data is dropped without resending.

int lag = offer(item, (subscriber, value) -> {
  subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
  return false; //Do not resend
});

if (lag < 0) {
  //Number of drops
} else {
  //Maximum delay estimate(Number of items sent but not yet consumed)
}

offer

You can also specify a timeout period. If it cannot be sent in this example, it will wait up to 1 second.

int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
  subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
  return false; //Do not resend
});

if (lag < 0) {
  //Number of drops
} else {
  //Maximum delay estimate(Number of items sent but not yet consumed)
}

Implementation example

Below is a demo program that uses the SubmissionPublisher class.

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

@Slf4j
public class Demo {

  public static void main(String ... args) {
    log.info("start");

    MySub<Integer> mySub1 = new MySub<>("sub_1");
    MySub<Integer> mySub2 = new MySub<>("sub_2");
    MySub<Integer> mySub3 = new MySub<>("sub_3");

    ExecutorService executor = Executors.newFixedThreadPool(3);

    try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 256)) {

      pub.subscribe(mySub1);
      pub.subscribe(mySub2);
      pub.subscribe(mySub3);

      log.info("NumberOfSubscribers:{}", pub.getNumberOfSubscribers());
      log.info("MaxBufferCapacity:{}", pub.getMaxBufferCapacity());

      IntStream.rangeClosed(1, 100000).forEach(value -> {
        log.info("publish:{} estimateMinimumDemand:{} estimateMaximumLag:{}", value, pub.estimateMinimumDemand(), pub.estimateMaximumLag());

        int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
          log.info("publish offer on drop:{}", integer);
          subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
          return false; //Do not resend
        });

        if (lag < 0) {
          //Number of drops
          log.info("drops:{}", lag * -1);
        } else {
          //Maximum delay estimate(Number of items sent but not yet consumed)
          log.info("lag:{}", lag);
        }

      });

    }

    log.info("end");

    try {
      TimeUnit.SECONDS.sleep(10);

      mySub1.result();
      mySub2.result();
      mySub3.result();

      if (!executor.isShutdown()) {
        log.info("shutdown");
        executor.shutdown();
      }

    } catch (InterruptedException e) {
      e.printStackTrace();
    }

  }

  static class MySub<Integer> implements Flow.Subscriber<Integer> {
    private final String name;
    private AtomicInteger success = new AtomicInteger(0);
    private AtomicInteger error = new AtomicInteger(0);
    private Flow.Subscription s;

    public MySub(String name) {
      this.name = name;
    }

    private Long getId() {
      return Thread.currentThread().getId();
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
      log.info("({}) onSubscribe:[{}]", getId(), name);
      this.s = subscription;
      s.request(1);
    }

    @Override
    public void onNext(Integer item) {
      log.info("({}) onNext:[{}] item:{}", getId(), name, item);
      success.incrementAndGet();
      s.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
      log.info("({}) onError:[{}]", getId(), name);
      error.incrementAndGet();
    }

    @Override
    public void onComplete() {
      log.info("({}) onComplete:[{}]", getId(), name);
    }

    public void result() {
      log.info("result:[{}] success:{} error:{}", name, success.get(), error.get());
    }

  }

}

Recommended Posts

Getting Started with Reactive Streams and the JDK 9 Flow API
Getting Started with Doma-Using Projection with the Criteira API
Getting Started with Doma-Using Subqueries with the Criteria API
Getting Started with Doma-Using Joins with the Criteira API
Getting Started with Doma-Introduction to the Criteria API
Getting Started with Doma-Dynamicly construct WHERE clauses with the Criteria API
Getting Started with Doma-Criteria API Cheat Sheet
Getting started with the JVM's GC mechanism
Getting Started with Java_Chapter 8_About Instances and Classes
Getting Started with Doma-Using Logical Operators such as AND and OR in the WHERE Clause of the Criteria API
Getting Started with DBUnit
Getting Started with Ruby
Getting Started with Swift
Getting Started with Docker
Getting Started with Doma-Transactions
"Experience" reactive programming with NoSQL (touch the Couchbase Reactive API)
Going back to the beginning and getting started with Java ① Data types and access modifiers
Getting Started with Doma-Annotation Processing
Getting Started with Java Collection
Getting started with Java and creating an AsciiDoc editor with JavaFX
Getting Started with JSP & Servlet
Getting Started with Java Basics
Getting Started with Spring Boot
Now is the time to get started with the Stream API
Getting Started with Ruby Modules
Returning to the beginning, getting started with Java ② Control statements, loop statements
Summarize the main points of getting started with JPA learned with Hibernate
Getting Started with Java_Chapter 5_Practice Exercises 5_4
This and that of the JDK
[Google Cloud] Getting Started with Docker
Getting started with Java lambda expressions
Getting Started with Docker with VS Code
Getting Started with Micronaut 2.x ~ Native Build and Deploy to AWS Lambda ~
Getting Started with Docker for Mac (Installation)
Getting Started with Parameterization Testing in JUnit
Getting Started with Java Starting from 0 Part 1
Getting Started with Ratpack (4)-Routing & Static Content
Getting Started with Language Server Protocol with LSP4J
Download JDK with Gradle and make JRE
Getting Started with Creating Resource Bundles with ListResoueceBundle
[Deprecated] Getting started with JVM GC and memory management that I didn't understand