From the sample application created in the previous article Migrating from Transport Client to Rest High Level Client, this time I will summarize the Bulk insert processing from Java Client to Elastic saerch using Bulk API. ..
macOS Elasticsearch6.5.2 Java8 Spring Boot 2.1.1
The application I created is listed on GitHub. https://github.com/ohanamisan/Elasticsearch_on_Java
If the version of Elasticsearch is different, please change the jar import part of the gradle file as appropriate. Please refer to the README for other details.
This time, we will implement the data insert processing using the Bulk Processor in the Bulk API. I think that you can do it even if you refer to the document without any big difference, but please think that it is a memo about the comparison between Transport Client and Rest High Level Client. Click here for the original Bulk Processor document ↓
RestHighLevelClient https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html
TransportClient https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
BulkProcessor processor = BulkProcessor.builder((request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
There is no big difference between them, and TransportClient passes the generated client as it is in the first argument of BulkProcessor's builder method, and RestHighLevelClient passes client information in lambda using bulkAsync method. In both of the second arguments, define a Listener that implements the processing before and after execution.
You can use the add method without changing the processing at the time of insertion.
processor.add(new IndexRequest("Input destination index").source("Input data"));
processor.add(new IndexRequest("Input destination index", "Input destination type").source("Input data"));
processor.add(new IndexRequest("Input destination index", "Input destination type", "Unique id").source("Input data"));
Of course, after creating an instance of Listener according to the document, you can separate the definition with the builder of Bulk Processor. When implementing the pre-execution and post-execution processes firmly, if you separate the processes around that, the responsibilities will be clear and it will feel good.
Next, I would like to add a paging function to the sample application using the Scroll API.
Recommended Posts