"Experience" reactive programming with NoSQL (touch the Couchbase Reactive API)

Introduction

A new programming paradigm is constantly emerging around programming languages. Here, the theme is a new paradigm called reactive programming (By the way, in Japanese Wikipedia about programming paradigm, reactive programming is listed at the end of the list-that is, as the newest one-. Was done).

Now, what kind of approach can a beginner who is interested in reactive programming think about when learning reactive programming?

As a starting point, you can think of a conceptual understanding. If you search the Internet for the keyword "reactive programming", you will first find articles such as "what is reactive programming (what)".

And I think the next thing that comes to mind is actually programming using the framework of reactive programming.

But I can imagine many people stumbling here. It's easy to end up in a situation where you don't know what to make, or you've read the framework tutorials and tried to write a few lines of programming, but you can't go any further. There may be.

Here, I would like to "experience" reactive programming by first using the API implemented using the reactive framework. Speaking of what kind of experience, by using data manipulation in a (NoSQL) database as a theme, you can experience the difference between reactive (non-blocking API) and non-reactive (blocking API) in terms of behavior when handling large-scale data. Is intended to be.

It may be a little derailed, but in the past there was a time when web applications (and web browsers) were new. At this time, if a person who has never used a web application as a user develops a web application, let alone learn the concept of a web application, it is a very special situation. You can see it. In that sense as well, I think the approach here has a certain meaning.

Selection of materials for the experience

You can use any database that provides a reactive (non-blocking) API, but here we will use Couchbase Server, a document-oriented NoSQL database. Java will be used as the programming language.

Techniques for experience

Use both the reactive (non-blocking) API and the blocking (non-reactive) API to do the same thing and observe the difference in behavior.

Program explanation

Base class

We have defined a process that is common to both the reactive and block APIs as a base class.

I think it's easy to imagine that someone who doesn't know Couchbase in particular connects to a database and gets a data storage destination (Collection in Bucket) similar to an RDB table.

Data registration, which will be used later, is also executed in this class (using the upsert method, new additions or overwrites are always performed at runtime). This time, we will focus on seeing the difference in data acquisition behavior, so we are using the old block API for this upsert process.

import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.json.JsonObject;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public abstract class Base {

    protected final Cluster cluster;
    protected final Bucket bucket;
    protected final Collection collection;

    public static final String bucketName = "default";
    public static final String userName = "Administrator";
    public static final String userPass = "password";
    public static final String seedNode = "127.0.0.1";
    
    public int NUM_KEY = 20000;
    protected List<String> keys = new ArrayList<String>(NUM_KEY);
    protected List<Object> listResults = new ArrayList<Object>(NUM_KEY);
    

    public Base() {
        cluster = Cluster.connect(seedNode, userName, userPass);
        bucket = cluster.bucket(bucketName);
        collection = bucket.defaultCollection();
        bucket.waitUntilReady(Duration.ofSeconds(30));
        prepKeys();
        prepDocuments();
    }

    private void prepKeys() {
        for (int i = 0; i < NUM_KEY; i++) {
            keys.add(String.valueOf(i));
        }
    }
    private void prepDocuments() {
        for (String key : keys) {
        	JsonObject content = JsonObject.create().put("key", String.valueOf(key));
            collection.upsert(key, content);
        }
    }
    private void disconnect() {
        cluster.disconnect();
    }

    public void exec() {
    	long startTime = System.currentTimeMillis();
    	List<Object> listResults = get();
        long endTime = System.currentTimeMillis();

        System.out.println("First data:" + listResults.get(0));
        System.out.println("End data:" + listResults.get(NUM_KEY -1));
        System.out.println("Acquisition size:" + listResults.size());
        System.out.println("Elapsed time (milliseconds):" + (endTime - startTime));
        disconnect();
    }

    protected abstract List<Object> get();

Block API usage class

It inherits the base class and implements the abstract method (get) using the block API.

import java.util.List;

import com.couchbase.client.java.kv.GetResult;

public class Sequential extends Base {

    @Override
    protected List<Object> get() {	
    	for (String key : keys) {
            GetResult result = collection.get(String.valueOf(key));
            listResults.add(result);
    	}
        return listResults;
    }

    public static void main(String[] args) {
        new Sequential().exec();
    }
}

Reactive API usage class

It inherits the base class and implements the abstract method (get) using the reactive API.

import java.util.List;

import com.couchbase.client.java.ReactiveCollection;
import reactor.core.publisher.Flux;

public class Concurrent extends Base {

    @Override
    protected List<Object>  get() {
        ReactiveCollection reactiveCollection = collection.reactive();
 
        Flux<Object> resultFlux = Flux.fromArray(keys.toArray())
            .flatMap( k -> reactiveCollection.get(String.valueOf(k)));

        List<Object> listResults = resultFlux.collectList().block();
        return listResults;
    }

    public static void main(String[] args) {
        new Concurrent().exec();
    }
}

Execution result

The following is the execution result of each class.

Sync API

First data:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x165349f2729e0000, expiry=Optional.empty}
End data:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x165349f307a30000, expiry=Optional.empty}
Acquisition size:20000
Elapsed time (milliseconds): 1201

Asynchronous API

First data:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x16534a84e29c0000, expiry=Optional.empty}
End data:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x16534a856f4e0000, expiry=Optional.empty}
Acquisition size:20000
Elapsed time (milliseconds): 302

I have Couchbase Server installed on my Mac laptop and I'm running the program on my laptop as well, but in my environment I've seen a roughly four times faster difference.

In this case, the acquired data is blocked when it is stored in the list, so if it is a use case that can make more use of non-blocking processing, even higher efficiency can be expected.

Finally

This time, I focused on the programming content. If you are interested in it, we would appreciate it if you could actually execute the source code posted this time and see it, or try more advanced data manipulation by yourself.

The explanation and installation of Couchbase Server itself is beyond the scope of this article, so I had to omit it, but if you want to get started, installing Couchbase Server and operating from the management console are not necessary. It's easy enough to beat, so please do it according to your interests.

Recommended Posts

"Experience" reactive programming with NoSQL (touch the Couchbase Reactive API)
Programming with ruby (on the way)
Solving the knapsack problem with dynamic programming
Easy JSON database experience with Docker ~ Try the latest version of Couchbase Server
Getting Started with Doma-Using Projection with the Criteira API
Getting Started with Doma-Using Joins with the Criteira API
Getting Started with Doma-Introduction to the Criteria API