This article is part of "Implementing Asynchronous Processing in Tomcat".
-"Implementation of asynchronous processing in Tomcat" --"Implementation of asynchronous processing for single tenant in Tomcat" ← This article -"Implementation of multi-tenant asynchronous processing in Tomcat"
Here, in a single tenant, we will implement a Tomcat application that limits the flow of asynchronous tasks using a thread pool.
We have confirmed the operation in the following software and library environments. For other dependent libraries, please refer to pom.xml described later.
Create a Maven project using Eclipse or Maven and add the contents shown below to pom.xml. The dependency spring-boot-starter-actuator is a library for Tomcat monitoring, but it poses a security risk in the production environment, so remove it.
<project xmlns=...>
:
<build>
:
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.4.RELEASE</version>
</plugin>
</plugins>
:
</build>
<dependencies>
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>1.5.22.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>1.5.22.RELEASE</version>
</dependency>
:
</dependencies>
</project>
** Figure 3.2 Additions to pom.xml **
Create "application.properties" which is a configuration file of Spring Boot.
#Tomcat monitoring
#Settings for operation monitoring such as monitoring and shutting down the Tomcat container
#Remove it as it poses a security risk in the production environment
endpoints.shutdown.enabled=true
management.security.enabled=false
management.endpoints.web.exposure.include=**
#Thread pool
#Specify the number of accepted asynchronous tasks and the number of concurrent executions
#Queues as many tasks as queueCapacity and runs in the background as many as maxPoolSize
#Maximum "maxPoolSize+Accepts as many asynchronous tasks as "queueCapacity", and rejects when that number is exceeded(Not accepted)
# corePoolSize =Initial Thread number
# queueCapacity =Number to queue when corePoolSize is full
# maxPoolSize =Maximum number of Threads when queueCapacity is exceeded
threadPool.corePoolSize=1
threadPool.queueCapacity=2
threadPool.maxPoolSize=2
threadPool.keepAliveSeconds=1
#Specify logging log output level
#Specify the output level of Logging
# https://docs.oracle.com/javase/jp/8/docs/api/java/util/logging/Level.html
logging.level.root=INFO
logging.level.org.springframework.web=INFO
logging.level.web01=INFO
** Figure 3.3 Contents of application.properties **
--References -"Everyone is surprised at how Spring Boot reads property files !!"
Create a POJO class that holds the thread pool settings. The Spring Framework automatically injects settings in application.properties that start with "threadPool" into this class.
@Component
@ConfigurationProperties(prefix = "threadPool")
public class ThreadPoolSettings {
private String corePoolSize;
private String queueCapacity;
private String maxPoolSize;
private String keepAliveSeconds;
public void setCorePoolSize(String corePoolSize) {this.corePoolSize = corePoolSize;}
public void setQueueCapacity(String queueCapacity) {this.queueCapacity = queueCapacity;}
public void setMaxPoolSize(String maxPoolSize) {this.maxPoolSize = maxPoolSize;}
public void setKeepAliveSeconds(String keepAliveSeconds) {this.keepAliveSeconds = keepAliveSeconds;}
public int getCorePoolSize() {return Integer.valueOf(corePoolSize);}
public int getQueueCapacity() {return Integer.valueOf(queueCapacity);}
public int getMaxPoolSize() {return Integer.valueOf(maxPoolSize);}
public int getKeepAliveSeconds() {return Integer.valueOf(keepAliveSeconds);}
}
** Figure 3.4 Contents of ThreadPoolSettings class **
Create an Application class that launches the built-in Tomcat container.
@SpringBootApplication is automatically added to the Spring Framework @Configuration and [@Component](https://docs.spring.io/ It tells you to scan classes such as spring-framework / docs / current / javadoc-api / org / springframework / stereotype / Component.html). @Repository, @Service .io / spring-framework / docs / current / javadoc-api / org / springframework / stereotype / Service.html), @Controller -api / org / springframework / stereotype / Controller.html) is also @Component ), So these are also detected. However, please note that by default, the scanning target is limited to the same package or packages under it.
@SpringBootApplication
public class Application {
public static void main(final String[] args) {
SpringApplication.run(Application.class, args);
}
}
** Figure 3.5 Contents of Application class **
--References
Create a SampleConfig class that initializes the application when Tomcat starts.
Declare this class to be the default class @Configuration /Configuration.html) and @EnableAsync to enable asynchronous calls There is EnableAsync.html). @Bean in taskExecutor1 () is a Bean in Spring Framework. Is to register. By default, the bean name is the same as the method name, where "taskExecutor" is the bean name.
taskExecutor1 () creates a ThreadPoolTaskExecutor, which is a thread pool with queues. By changing the setting value of [application.properties](#creation of configuration file) mentioned above according to the number of cores and memory capacity of the production environment, the capacity of the queue and thread pool can be tuned.
In addition, the threads held by the thread pool should be "** non-daemon **" so that the running threads are not terminated when Tomcat is terminated. ThreadPoolTaskExecutor is non-daemon by default, but is explicitly specified here. However, shutting down the thread pool destroys all queued tasks. If you want the task that was stuck in the queue to be executed when you restart the Tomcat application, an additional implementation is required, but I will omit it here.
@Configuration
@EnableAsync
public class SampleConfig {
//application.of properties"threadPool"Variable that holds the setting value starting with
@Autowired
ThreadPoolSettings threadPoolSettings;
//@Register the thread pool for Async as a Spring bean
//The bean name will be "taskExecutor1" which is the same as the method name.
@Bean
public ThreadPoolTaskExecutor taskExecutor1() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolSettings.getCorePoolSize());
executor.setQueueCapacity(threadPoolSettings.getQueueCapacity());
executor.setMaxPoolSize(threadPoolSettings.getMaxPoolSize());
executor.setKeepAliveSeconds(threadPoolSettings.getKeepAliveSeconds());
executor.setThreadNamePrefix("Executor1-");
executor.setDaemon(false);
executor.initialize();
return executor;
}
}
** Figure 3.6 Contents of SampleConfig class **
Create the SampleService class shown below.
@Service of this class is DDD (Domain-Driven Design) It is one of the components and bears the debt to process the business logic.
cmd () starts the command specified by the argument and returns a Future object that can check the execution status of the command without blocking. @Async to declare that it is the target of an asynchronous call ) Is added, and the "taskExcecutor1" parameter is added to link to the thread pool defined in the SampleConfig class.
@Service
public class SampleService {
@Async("taskExecutor1")
public CompletableFuture<String> cmd(final String[] command) {
final int exitValue = run(command);
return CompletableFuture.completedFuture("exitValue="+exitValue);
}
//Execute external command
public static int run(final String[] command) {
try{
final Process p = new ProcessBuilder(command).redirectErrorStream(true).start();
try(final BufferedReader r = new BufferedReader(new InputStreamReader(p.getInputStream()))){
String str;
while((str = r.readLine()) != null) {
System.out.println("Command output: "+str);
}
}
return p.waitFor();
}catch(IOException | InterruptedException e) {
e.printStackTrace();
return -1;
}
}
}
** Figure 3.7 Contents of SampleService class **
Create the SampleController class shown below.
@Controller corresponds to the Controller of the MVC model and is from the client. Take on the debt to meet the demand.
Fields with @Autowired added to this class There are @Component and [@Bean](https: / Instruct Spring Framework to automatically inject parts defined in /docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/annotation/Bean.html) etc. Thing.
cmd () calls cmd () of SampleService in response to "GET http: //.../ cmd". It looks like a synchronous call, but the callee is [@Async](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/annotation/Async. Since html) is added, Spring Framework automatically changes it to an asynchronous call.
If the thread pool queue or thread is full and cannot accept asynchronous processing, a RejectedExecutionException will be thrown. This makes it possible to control the flow rate of asynchronous processing. Here, for the sake of simplicity, the string "Rejected" is returned to the client, but originally the HTTP "429 Too Many Requests" It would be a good idea to return a status code such as ".
In response to "GET http: // .../ status", status () scans the Future object returned by cmd () of SampleService and returns the execution status of the asynchronous call as a character string.
@Controller
public class SampleController {
private static final String[] COMMAND = new String[] {"cmd", "/c", "ping", "-n", "10", "localhost", ">", "nul"};
private AtomicInteger counter = new AtomicInteger(1);
private HashMap<String, Future<String>> futures = new HashMap<>();
@Autowired
private SampleService sampleService;
@RequestMapping(value="/cmd", method=RequestMethod.GET)
@ResponseBody
public String cmd() {
final String id = String.format("<noTenantId>#%03d", counter.getAndIncrement());
try{
final CompletableFuture<String> future = sampleService.cmd(COMMAND)
.exceptionally(ex -> id+": Exception: "+ex.getMessage()+"\n");
synchronized(this.futures) {
this.futures.put(id, future);
}
return "Accepted: id="+id+"\n";
}catch(RejectedExecutionException e) {
final CompletableFuture<String> future = new CompletableFuture<>();
future.complete("Rejected");
synchronized(this.futures) {
this.futures.put(id, future);
}
return "Rejected: id="+id+"\n";
}
}
@RequestMapping(value="/status", method=RequestMethod.GET)
@ResponseBody
public String status() {
final Map<String, Future<String>> map;
synchronized(this.futures) {
map = (Map<String, Future<String>>)this.futures.clone();
}
return map.entrySet().stream()
.map(entry->SampleController.futureGet(entry.getKey(), entry.getValue()))
.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
.toString();
}
//Convert asynchronous task status to string
private static String futureGet(String tenantId, Future<String> future) {
if(future.isDone()==false && future.isCancelled()==false) {
return tenantId+": Running\n";
}else if(future.isCancelled()) {
return tenantId+": Canceled\n";
}
try {
return tenantId+": "+future.get()+"\n";
} catch (InterruptedException | ExecutionException | CancellationException e) {
return tenantId+": Exception\n";
}
}
}
** Figure 3.8 Contents of SampleController class **
You can start the Tomcat container locally and execute the command as follows to check the operation.
$ curl -X GET http://localhost:8080/cmd
Accepted: id=<noTenantId>#001
#After waiting a little over 10 seconds/GET status
$ curl -X GET http://localhost:8080/status
<noTenantId>#001: exitValue=0
#Normal termination of tomcat container
$ curl -X POST http://localhost:8080/shutdown
** Figure 3.9 Checking the operation of asynchronous tasks **
Continue to "Implementation of multi-tenant asynchronous processing in Tomcat" ...
Recommended Posts