You can control the processing flow of Spring Batch with JavaConfig as well as XML. Use tasklet-based steps as a sample to organize by processing pattern.
This is the simplest processing pattern, and it is OK if you register the steps in order in jobBuilder.
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Task1 task1;
@Autowired
private Task2 task2;
@Autowired
private Task3 task3;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(task1).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(task2).build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3").tasklet(task3).build();
}
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 ->It is executed in the order of step3.
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.next(step3)
.build();
}
}
Alternatively, each step is combined into one processing flow and registered in jobBuilder.
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 ->Create a flow for step3
Flow flow = new FlowBuilder<Flow>("flow")
.from(step1)
.next(step2)
.next(step3)
.build();
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow)
.end()
.build();
}
You can register the task processing result by setting Exit Status in Step Contribution during the task processing.
@Component
public class Task1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (isCheckOK()) {
//success
contribution.setExitStatus(ExitStatus.COMPLETED);
} else {
//Failure
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
......
}
Get the result of the task and conditional branch with on.
.....
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> OK -> step2
// NG -> step3
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step1).on(ExitStatus.FAILED.getExitCode()).to(step3)
.end()
.build();
}
If there is no subsequent task depending on the condition, you can use fail to end the process.
.....
@Bean
public Job job(Step step1, Step step2) throws Exception {
// step1 -> OK -> step2
// NG -> end
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step2).on(ExitStatus.FAILED.getExitCode()).fail()
.end()
.build();
}
This is an asynchronous processing pattern. Use the split of flow as follows.
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
//Register step1 with flow1
Flow flow1 = new FlowBuilder<Flow>("flow1").start(new FlowBuilder<Flow>("step1").from(step1).end()).build();
//Register step2 and step3 of parallel processing in flow2
Flow flow2 = new FlowBuilder<Flow>("flow2").start(new FlowBuilder<Flow>("step2").from(step2).end())
.split(new SimpleAsyncTaskExecutor()).add(new FlowBuilder<Flow>("step3").from(step3).end()).build();
// flow1 ->Register with jobBuilder in the order of flow2
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow1)
.next(flow2)
.end()
.build();
}
Unlike split, partition cannot describe different processing for each thread. Concurrency becomes one step, and that step is duplicated according to the amount of processing, and it is processed in multiple threads. Things necessary:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DemoPartitioner demoPartitioner;
@Autowired
private SlaveTask slaveTask;
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep").tasklet(slaveTask).build();
}
@Bean
public Step masterStep() {
//Set slave, handler, partitioner in master
return stepBuilderFactory.get("masterStep").partitioner(slaveStep().getName(), demoPartitioner)
.partitionHandler(handler()).build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(masterStep())
.build();
}
@Bean
public PartitionHandler handler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setGridSize(10);
handler.setTaskExecutor(taskExecutor());
handler.setStep(slaveStep());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
In the partitioner class, set the input information of each thread process in ExecutionContext. Example below: Thread1 is 1 ~ 10, Thread is 11 ~ 20 ...
@Component
public class DemoPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();
int range = 10;
int from = 1;
int to = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putString("name", "thread" + i);
context.putInt("from", from);
context.putInt("to", to);
map.put("partition" + i, context);
from = to + 1;
to += range;
}
return map;
}
}
In the slave task, get the input information of the process from ExecutionContext.
@Component
public class SlaveTask implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
String name = (String)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("name");
int fromId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("from");
int toId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("to");
System.out.println(name + ":" + fromId + "~" + toId);
return RepeatStatus.FINISHED;
}
}
The above execution result
thread1:1~10
thread4:31~40
thread7:61~70
thread6:51~60
thread3:21~30
thread10:91~100
thread9:81~90
thread2:11~20
thread8:71~80
thread5:41~50
Reference: https://sites.google.com/site/soracane/home/springnitsuite/spring-batch
Recommended Posts