Asynchronous processing with Shoryuken + SQS

CA Advance Advent Calendar 2020 --Qiita This is the article on the 18th day.

Background of introduction of Shoryuken

I created a function to register tens of thousands of CSV data, but the process took a long time and a timeout occurred before the process was completed. We also considered extending the timeout, but since it is a stick-on blade, we needed a function that could handle asynchronous processing so that we could handle tens of thousands of registrations.

The processing flow examined is as follows.

--When uploading CSV from SPA screen, save it in S3 and register the queue in SQS --Get a queue with Worker, download CSV from S3 in the background, and register the data in CSV in DB

What is Shoryuken?

Active Job Worker working with AWS SQS. It seems that sidekiq is famous, but while it uses Redis as the queueStore, Shoryuken uses SQS as the queueStore. At first I considered introducing Sidekiq, but when I was looking for something else because I wanted to use SQS with high fault tolerance, I found Shortyuken that was suitable for my purpose. It was introduced immediately.

Creating a queue

Create a queue in SQS in advance.

201218-0001.png

Introduction

Gemfile


gem 'shoryuken'

bundle install

$ bundle install

The configuration file is as follows. Since the number of 1 processing is large, the number of threads is conservative.

config/shoryuken.yml


concurrency: 5
delay: 30
require: /usr/src/app/app/workers
pidfile: /usr/src/app/tmp/pids/shoryuken.pid
queues:
  - sqs_queue_sample

Create a server configuration to get messages. This time, Shoryuken is used only for getting the queue, but in the case of queue registration, if you define configure_client, you can register the queue via Shoryuken.

Since the worker creates a container with ECS fargate, the log is output to standard output so that it can be checked with Cloudwatch Logs.

config/initializers/shoryuken.rb


Shoryuken.configure_server do |config|
  config.sqs_client = Aws::SQS::Client.new(
    region: "ap-northeast-1",
    access_key_id: ENV['AWS_ACCESS_KEY_ID'],
    secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
    verify_checksums: false
  )
  Rails.logger = Shoryuken::Logging.logger
  Rails.logger.level = :info
end

Shoryuken::Logging.initialize_logger(STDOUT)

Implement the main processing of the worker. When the message is queued, the perform method is executed and the message is received. After that, I will write the CSV data registration process.

workers/do_worker.rb


class doWorker
  include Shoryuken::Worker

  shoryuken_options queue: 'sqs_queue_sample', auto_delete: true, body_parser: json

  def perform(sqs_msg, body)
    (Write worker processing here)
  end
end

After that, you can start it with the following command. When operating in a container, add the following command to CMD in dockerfile and start it as a worker.

$ bin/bundle exec shoryuken -R -C config/shoryuken.yml

After introduction

By making the CSV registration process in the background, it does not time out due to the number of CSV records. Also, threading with SQS and Shortyuken no longer puts an excessive load. It has been in operation for less than a month, but no problems have occurred.

I was able to implement it quickly by using the gem module. I wanted to implement FakeSQS instead of SQS in the development environment, but due to time constraints, I decided not to. I'm creating an SQS queue for development, but I'm going to be able to develop with a FakeSQS container as it will incur charges.

reference

-Batch processing platform created with Shortyuken-Toreta developer blog -Easy to create Job Queue Worker using SQS in Rails with shortyuken gem | Developers.IO

Recommended Posts

Asynchronous processing with Shoryuken + SQS
Spring with Kotorin --6 Asynchronous processing
Asynchronous processing with Spring Boot using @Async
Christmas with Processing
Asynchronous processing with regular execution in Spring Boot
[Swift] Asynchronous processing "GCD"
Getting Started with Doma-Annotation Processing
[Swift] About asynchronous processing "Operation"
[Swift] What is asynchronous processing?
[Swift] Asynchronous processing using PromiseKit
Presentation slides made with Processing
I want to perform asynchronous processing and periodic execution with Rail !!!
Try implementing asynchronous processing in Azure
NLP4J [006-031] 100 language processing knocks with NLP4J # 31 verb
Processing speed with and without static
Implementation of asynchronous processing in Tomcat
Server processing with Java (Introduction part.1)
Develop Processing with IntelliJ + Kotlin + Gradle
Image processing: Let's play with the image