Use Bulk API with RestHighLevelClient

Previous article and content of the article

From the sample application created in the previous article Migrating from Transport Client to Rest High Level Client, this time I will summarize the Bulk insert processing from Java Client to Elastic saerch using Bulk API. ..

environment

macOS Elasticsearch6.5.2 Java8 Spring Boot 2.1.1

Sample app

The application I created is listed on GitHub. https://github.com/ohanamisan/Elasticsearch_on_Java

If the version of Elasticsearch is different, please change the jar import part of the gradle file as appropriate. Please refer to the README for other details.

Implementation

This time, we will implement the data insert processing using the Bulk Processor in the Bulk API. I think that you can do it even if you refer to the document without any big difference, but please think that it is a memo about the comparison between Transport Client and Rest High Level Client. Click here for the original Bulk Processor document ↓

RestHighLevelClient https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html

TransportClient https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html

Generating Bulk Processor with Transport Client


BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {

			@Override
			public void beforeBulk(long l, BulkRequest bulkRequest) {
				System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
			}

			@Override
			public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
				throwable.printStackTrace();
			}

			@Override
			public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
				System.out.println(
						"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
			}
		}).setBulkActions(20)
		  .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
		  .setFlushInterval(TimeValue.timeValueSeconds(5))
          .setConcurrentRequests(0)
		  .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
          .build();

Generating Bulk Processor with RestHighLevelClient


BulkProcessor processor =  BulkProcessor.builder((request, bulkListener) ->
                client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {

            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                System.out.println(
                        "bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
            }
        }).setBulkActions(20)
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
          .setFlushInterval(TimeValue.timeValueSeconds(5))
          .setConcurrentRequests(0)
          .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
          .build();

There is no big difference between them, and TransportClient passes the generated client as it is in the first argument of BulkProcessor's builder method, and RestHighLevelClient passes client information in lambda using bulkAsync method. In both of the second arguments, define a Listener that implements the processing before and after execution.

Insert processing

You can use the add method without changing the processing at the time of insertion.

processor.add(new IndexRequest("Input destination index").source("Input data"));
processor.add(new IndexRequest("Input destination index", "Input destination type").source("Input data"));
processor.add(new IndexRequest("Input destination index", "Input destination type", "Unique id").source("Input data"));

at the end

Of course, after creating an instance of Listener according to the document, you can separate the definition with the builder of Bulk Processor. When implementing the pre-execution and post-execution processes firmly, if you separate the processes around that, the responsibilities will be clear and it will feel good.

Next, I would like to add a paging function to the sample application using the Scroll API.

Recommended Posts

Use Bulk API with RestHighLevelClient
How to use Java API with lambda expression
Use JavaFX Clipboard API
Use ProGuard with Gradle
Use Puphpeteer with Docker
Use XVim2 with Xcode 12.0.1
Use CentOS with LXD
Use ngrok with Docker
Use webmock with Rspec
Use WebJars with Gradle
Use jlink with gradle
Use Lambda Layers with Java
Use GDAL with Python with Docker
How to use Chain API
Test Web API with junit
Use pfx certificate with Okhttp3
Use Face API from Ruby
API creation with Rails + GraphQL
Use SDKMAN! With Git Bash
Use multiple databases with Rails 6.0
REST API testing with REST Assured
Use Spring JDBC with Spring Boot
Link API with Spring + Vue.js
Use Ruby with Google Colab
Use SpatiaLite with Java / JDBC
Use log4j2 with YAML + Gradle
[Docker] Use whenever with Docker + Rails
Sample of using Salesforce's Bulk API from Java client with PK-chunking
Use PlantUML with Visual Studio Code
Use Basic Authentication with Spring Boot
Spring with Kotorin --4 REST API design
Use java with MSYS and Cygwin
Use constructor with arguments in cucumber-picocontainer
Use PostgreSQL inet type with DbUnit
REST API test with REST Assured Part 2
Why use orchestration tools with Docker
Use bootstrap 4 with PlayFramework 2.6 (no CDN)
Let's use Amazon Textract with Ruby
Use Git with SourceTree and Eclipse
Use Azure Bing SpellCheck with Java
[Rails] Use cookies in API mode
Use JDBC with Java and Scala.
Micro benchmark made with JFR API
Use DataDog APM with unsupported frameworks
Use Java 11 with Google Cloud Functions
How to use mssql-tools with alpine
Beginning with Spring Boot 0. Use Spring CLI
Use cuda11.0 with pytorch using Docker