[GO] Can FIFO queues be realized by "Specifying message order" in Cloud Pub / Sub?

theme

Do you use Cloud Pub / Sub? It is used when you want to handle a large number of messages. Is this the order in which messages are sent, when you want to guarantee it, or not? In the case of AWS, I think you should use SQS's FIFO Queue. (I had tried to use it in the past, but I remember that I didn't use it after all because I didn't have the Tokyo region yet and I was squeezed by the limit of 3000 messages per second. I think that it is Cloud Pub / Sub in GCP if it corresponds to SQS, but unfortunately there is no FIFO queue. If you think (Beta version) It seems that the function "Specify message ordering" was created. I wondered if I could subscribe to the messages in the order they were published, so I tried it.

However, it's a fairly about trial method, so for the time being, if you try this kind of trial, you can get a level of understanding that this is the result.

Assumed reader

――I know about GCP. --I have used Cloud Pub / Sub. I don't know what it is. --Golang can be written as it is.

Premise

--Go development environment has been built locally. --GCP contract completed. --Cloud SDK has been set up locally. --The key JSON file path (of the service account with all the required permissions) has been set in the local environment variable GOOGLE_APPLICATION_CREDENTIALS.

Development environment

OS - Linux(Ubuntu)

$ cat /etc/os-release 
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"

#Backend

#Language --Golang

$ go version
go version go1.15.2 linux/amd64

IDE - Goland

GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020

All sources this time

https://github.com/sky0621/go-publisher/tree/v0.1.0 https://github.com/sky0621/go-subscriber/tree/v0.1.0

Source excerpt commentary

go-publisher

Prepare 5 endpoints. The parts of topic.EnableMessageOrdering = true and ʻOrderingKey: operationSequence, are actually the codes required for "ordering messages". However, Topic (" my-normal-topic "`), which publishes messages from this source, corresponds to Subscriptions that are not for "message ordering", so "message ordering" does not work.

main.go


package main

import (
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

func main() {
	project := os.Getenv("PUB_PROJECT")

	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	e.GET("/order01", handler(project, "order01"))
	e.GET("/order02", handler(project, "order02"))
	e.GET("/order03", handler(project, "order03"))
	e.GET("/order04", handler(project, "order04"))
	e.GET("/order05", handler(project, "order05"))

	e.Logger.Fatal(e.Start(":8080"))
}

func handler(project, path string) func(c echo.Context) error {
	return func(c echo.Context) error {
		ctx := c.Request().Context()
		operationSequence := createOperationSequence()

		client, err := pubsub.NewClient(ctx, project)
		if err != nil {
			log.Fatal(err)
		}

		topic := client.Topic("my-normal-topic")
		defer topic.Stop()

		topic.EnableMessageOrdering = true

		message := &pubsub.Message{
			OrderingKey: operationSequence,
			Data:        []byte(path + ":" + operationSequence),
		}
		r := topic.Publish(ctx, message)
		if r == nil {
			log.Fatal("failed to topic.Publish!")
		}
		log.Printf("%+v", r)

		return c.String(http.StatusOK, path+":"+operationSequence)
	}
}

func createOperationSequence() string {
	return fmt.Sprintf("%d", time.Now().UnixNano())
}

go-subscriber

main.go


package main

import (
	"encoding/json"
	"io"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

func main() {
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	e.POST("/", func(c echo.Context) error {
		m, err := unmarshal(c.Request().Body)
		if err != nil {
			return c.String(http.StatusBadRequest, err.Error())
		}
		data := string(m.Message.Data)
		log.Printf("fs_Received__Data:%s", data)
		return c.String(http.StatusOK, "OK")
	})

	e.Logger.Fatal(e.Start(":8080"))
}

type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
	var m PubSubMessage
	body, err := ioutil.ReadAll(r)
	if err != nil {
		log.Printf("ioutil.ReadAll: %v", err)
		return nil, err
	}
	if err := json.Unmarshal(body, &m); err != nil {
		log.Printf("json.Unmarshal: %v", err)
		return nil, err
	}
	return &m, nil
}

A shell that hits 5 endpoints in sequence

It's pretty rough.

#!/usr/bin/env bash
set -euox pipefail

for i in {0..15}
do
	for j in {1..5}
	do
		curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
		sleepenh 0.005
	done
	sleepenh 0.005
done

Practice

image

Put a service with a REST mouth on Cloud Run, publish it to Cloud Pub / Sub when access comes, and the Subscription prepared with Push type is also a REST mouth put on Cloud Run Skip the request to the service with.

screenshot-app.cloudskew.com-2020.10.13-01_50_36.png

For regular Topic and Subscription

Topic and Subscription settings

screenshot-console.cloud.google.com-2020.10.12-23_04_10.png

screenshot-console.cloud.google.com-2020.10.12-23_06_00.png

Message input result

The messages are repeatedly hitting the endpoints in the following order, and although they are in that order on the Publisher log, they are in no particular order on the Subscriber log. /order01 /order02 /order03 /order04 /order05

screenshot-console.cloud.google.com-2020.10.13-00_48_38.png

Of course, it is in order on the log issued by the shell that hits the endpoint.

Screenshot at 2020-10-13 00-52-17.png

For Topic and Subscription that support "Specify message order"

Topic and Subscription settings

Since there is a condition that exists in the same region, specify the Topic region.

screenshot-console.cloud.google.com-2020.10.12-23_14_13.png screenshot-console.cloud.google.com-2020.10.12-23_24_06.png

Of course, "Specify message order" should be "Enabled".

screenshot-console.cloud.google.com-2020.10.12-23_22_08.png

Message input result

After trying it several times, there were some cases where the order of subscribing was changed at the start, but after that, the order was correct. Compared to the normal Topic / Subscription combination, which was rather out of order, the order is stable.

Screenshot at 2020-10-13 01-30-45.png

Summary

Even in the "message order specification" mode, the order was changed, so I couldn't say "it can be realized" for the subject. .. .. Whether this is a matter of how to try it, or maybe it's the Beta version, if you feel like it, let's dig into it. .. ..

Recommended Posts

Can FIFO queues be realized by "Specifying message order" in Cloud Pub / Sub?
Text analysis that can be done in 5 minutes [Word Cloud]