Use Bulk API with RestHighLevelClient

From the sample application created in the previous article Migrating from Transport Client to Rest High Level Client, this time I will summarize the Bulk insert processing from Java Client to Elastic saerch using Bulk API. ..


macOS Elasticsearch6.5.2 Java8 Spring Boot 2.1.1

Sample app

The application I created is listed on GitHub.

If the version of Elasticsearch is different, please change the jar import part of the gradle file as appropriate. Please refer to the README for other details.


This time, we will implement the data insert processing using the Bulk Processor in the Bulk API. I think that you can do it even if you refer to the document without any big difference, but please think that it is a memo about the comparison between Transport Client and Rest High Level Client. Click here for the original Bulk Processor document ↓



Generating Bulk Processor with Transport Client

BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {

			public void beforeBulk(long l, BulkRequest bulkRequest) {
				System.out.println("bulkRequest = " + bulkRequest.numberOfActions());

			public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

			public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
						"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
		  .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
		  .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))

Generating Bulk Processor with RestHighLevelClient

BulkProcessor processor =  BulkProcessor.builder((request, bulkListener) ->
                client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {

            public void beforeBulk(long l, BulkRequest bulkRequest) {
                System.out.println("bulkRequest = " + bulkRequest.numberOfActions());

            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                        "bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
          .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))

There is no big difference between them, and TransportClient passes the generated client as it is in the first argument of BulkProcessor's builder method, and RestHighLevelClient passes client information in lambda using bulkAsync method. In both of the second arguments, define a Listener that implements the processing before and after execution.

Insert processing

You can use the add method without changing the processing at the time of insertion.

processor.add(new IndexRequest("Input destination index").source("Input data"));
processor.add(new IndexRequest("Input destination index", "Input destination type").source("Input data"));
processor.add(new IndexRequest("Input destination index", "Input destination type", "Unique id").source("Input data"));

at the end

Of course, after creating an instance of Listener according to the document, you can separate the definition with the builder of Bulk Processor. When implementing the pre-execution and post-execution processes firmly, if you separate the processes around that, the responsibilities will be clear and it will feel good.

Next, I would like to add a paging function to the sample application using the Scroll API.

