Creating a Scala custom ExecutionContext

We will create a custom ExecuitonContext.

background

When I usually use Future, I somehow DI the ExecutionContext and pass it to Future implicitly.

Until now, one ExecutionContext was passed to Future for asynchronous processing for Elasticsearch. However, due to various reasons, it became necessary to modify not only Elastisearch but also DocumentDB (MongoDB's AWS managed service) so that one application can refer to and save documents.

Until now, asynchronous processing was performed for Elasticsearch with one default ExecutionContext, but now it is also used for connection with DocumentDB.

I wanted to use it if I could use a common ExecutionContext, but with regard to DocumentDB this time, if I put a connection unnecessarily, there are too many threads waiting and I can not process it. (By default, you can wait up to 500 threads)

I actually get the following error.

error contents

com.mongodb.MongoWaitQueueFullException: Too many threads are already waiting for a connection. Max number of threads (maxWaitQueueSize) of 500 has been exceeded.

Therefore, it is necessary to tune the maximum number of connections that can be posted on the DocumentDB side (the number of connections that can be pooled) and the number of threads that connect on the application side.

If the number of threads running in parallel on the application side does not exceed the number of connection pools set on the DocumentDB side, it should be able to process normally.

Reference: DocumentDB connection count setting

Therefore, it became necessary to customize the ExecutionContext that I usually use casually.

What is ExecutionContext?

A standard scala library that runs programs asynchronously without the need for a thread pool.

An overview is commented out in the ExecutionContext library and is lengthy.

ExecutionContext.scala


/**
 * An `ExecutionContext` can execute program logic asynchronously,
 * typically but not necessarily on a thread pool.
 *
 * A general purpose `ExecutionContext` must be asynchronous in executing
 * any `Runnable` that is passed into its `execute`-method. A special purpose
 * `ExecutionContext` may be synchronous but must only be passed
 * ............Continue

Create your own ExecutionContext

The ExecutionContext library has a bit of a good way to customize it.

* A custom `ExecutionContext` may be appropriate to execute code
 * which blocks on IO or performs long-running computations.
 * `ExecutionContext.fromExecutorService` and `ExecutionContext.fromExecutor`
 * are good ways to create a custom `ExecutionContext`.

It seems that there is an interface to create your own ExecutionContext, such as ExecutionContext.fromExecutor.

I will actually make it

I would like to customize it in two ways.

1. First

ExecutionContextTest1.scala



class MongoRepository1 {

  implicit val service: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))

  def find = {
    Future {
      for (i <- 1 to 50) {
        println(i * 2)
      }
    }
  }
}

We are passing an Executors instance as an argument to fromExecutorServicen. With newFixedThreadPool (), you can specify a fixed number of threads that can be pooled.

By passing it implicitly to Future, you can execute asynchronous processing with a customized ExecutionContext.

2. Second

How to customize ExecutionContext on play framework. This is the way to implement it neatly in DI.

ExecutionContextTest.scala



package study.executionContextTest

import java.util.concurrent.{ExecutorService, Executors}

import com.google.inject.name.{Named, Names}
import com.google.inject.{AbstractModule, Inject, Singleton}

import scala.concurrent.{ExecutionContext, Future}

//Create your own ExecutionContext
class Execution @Inject()(mongoRepository: MongoRepository) {
  mongoRepository.find
}

//I am making a module to DI
//Here, since you create your own ExecutionContext, you can define various settings of your own.
class MongoExecutionContext(threadCount: Int) extends ExecutionContext {

  //You can create a thread pool by using Java's ExecutorService
  //The number of threads to be pooled is defined by a fixed value in newFixedThreadPool.
  private val executorService: ExecutorService =
    Executors.newFixedThreadPool(threadCount)

  //Future internally wraps the argument block in Rannable and executes the execute method of ExecutionContext.
  override def execute(runnable: Runnable): Unit =
    executorService.execute(runnable)

  override def reportFailure(cause: Throwable): Unit = throw cause
}

//Create your own module
//google guice specs
// application.By defining it in the module of conf, you can DI with google guice
class MongoExecutionModule extends AbstractModule {
  override def configure(): Unit = {
    bind(classOf[ExecutionContext])
      .annotatedWith(Names.named("MongoExecutionContext"))
      .toInstance(new MongoExecutionContext(50))
  }
}

//Only one instance should be created.
//Tuning bugs when multiple ExecutionContexts are created
@Singleton
class MongoRepository @Inject()(
  implicit @Named("MongoExecutionContext") ec: ExecutionContext
) {

  //Since implicit executionContext is used in this find etc.
  //Instead of using the default ExecutionContext here, I'll use an ExecutionContext suitable for connecting to mongo.
  //In this way, the ExecutionContext used for each middleware can be changed.
  //In this example, the processing passed to Future apply is processed asynchronously.
  //At that time, since the ExecutionContext injected above is used, parallel processing is performed up to a maximum of 50 threads.
  def find = {
    Future {
      for (i <- 1 to 50) {
        println(i * 2)
      }
    }
  }
}

Details are written in the comment out, but the general flow is as follows.

Eventually I want to DI a custom ExecutionContext.

―― 1. Create a MongoExecutionContext that inherits the ExecutionContext. --1-1. In that, create an ExecutionService that specifies the thread pool. ―― 2. Create a MongoExecutionModule that inherits the AbstractModule --2-1. Implement so that you can DI with @Name --3. DI the MongoExecutionModule created in 2 with MongoRepository.

Summary

As I mentioned earlier, I used to use ExecutionContext as if it were a magic trick, but by making it myself, my understanding deepened.

reference

It helped me a lot in implementing it.

Recommended Posts

Creating a Scala custom ExecutionContext
Creating a local repository
Creating a test case
Creating a batch of Liferay
Creating a calendar using Ruby
[Rails] Creating a search box
[Rails] Creating a new project with rails new
Creating a timer app with a muddy
[Kotlin / Android] Create a custom view