[GO] Une file d'attente FIFO peut-elle être créée en "spécifiant l'ordre des messages" dans Cloud Pub / Sub?

thème

Utilisez-vous Cloud Pub / Sub? Il est utilisé lorsque vous souhaitez gérer un grand nombre de messages. S'agit-il de l'ordre dans lequel les messages sont envoyés, lorsque vous souhaitez le garantir ou non? Dans le cas d'AWS, je pense que vous devriez utiliser la [FIFO Queue] de SQS (https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html). (J'avais essayé de l'utiliser dans le passé, mais je me souviens que je ne l'ai pas utilisé après tout parce que je n'avais pas encore la région de Tokyo et j'étais pressé par la limite de 3000 messages par seconde.) Je pense que c'est Cloud Pub / Sub dans GCP si cela correspond à SQS, mais malheureusement il n'y a pas de file d'attente FIFO. Si vous pensez (Version bêta) Il semble que la fonction "Spécifier l'ordre des messages" a été créée. Je me suis demandé si je pouvais m'abonner aux messages dans l'ordre de leur publication, alors j'ai essayé.

Cependant, il s'agit plutôt d'une méthode d'essai, donc pour le moment, si vous essayez ce type d'essai, vous pouvez comprendre que c'est le résultat.

Lecteur supposé

--Je connais GCP.

supposition

Environnement de développement

OS - Linux(Ubuntu)

$ cat /etc/os-release 
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"

#Backend

#Langue --Golang

$ go version
go version go1.15.2 linux/amd64

IDE - Goland

GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020

Toutes les sources cette fois

https://github.com/sky0621/go-publisher/tree/v0.1.0 https://github.com/sky0621/go-subscriber/tree/v0.1.0

Commentaire d'extrait de source

go-publisher

Préparez 5 points de terminaison. Les parties de topic.EnableMessageOrdering = true et ʻOrderingKey: operationSequence,` sont en fait les codes nécessaires pour "commander des messages". Cependant, le sujet («mon-sujet-normal») qui publie des messages avec cette source correspond à un abonnement qui n'est pas pour «l'ordre des messages», donc «l'ordre des messages» ne fonctionne pas.

main.go


package main

import (
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

func main() {
	project := os.Getenv("PUB_PROJECT")

	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	e.GET("/order01", handler(project, "order01"))
	e.GET("/order02", handler(project, "order02"))
	e.GET("/order03", handler(project, "order03"))
	e.GET("/order04", handler(project, "order04"))
	e.GET("/order05", handler(project, "order05"))

	e.Logger.Fatal(e.Start(":8080"))
}

func handler(project, path string) func(c echo.Context) error {
	return func(c echo.Context) error {
		ctx := c.Request().Context()
		operationSequence := createOperationSequence()

		client, err := pubsub.NewClient(ctx, project)
		if err != nil {
			log.Fatal(err)
		}

		topic := client.Topic("my-normal-topic")
		defer topic.Stop()

		topic.EnableMessageOrdering = true

		message := &pubsub.Message{
			OrderingKey: operationSequence,
			Data:        []byte(path + ":" + operationSequence),
		}
		r := topic.Publish(ctx, message)
		if r == nil {
			log.Fatal("failed to topic.Publish!")
		}
		log.Printf("%+v", r)

		return c.String(http.StatusOK, path+":"+operationSequence)
	}
}

func createOperationSequence() string {
	return fmt.Sprintf("%d", time.Now().UnixNano())
}

go-subscriber

main.go


package main

import (
	"encoding/json"
	"io"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

func main() {
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	e.POST("/", func(c echo.Context) error {
		m, err := unmarshal(c.Request().Body)
		if err != nil {
			return c.String(http.StatusBadRequest, err.Error())
		}
		data := string(m.Message.Data)
		log.Printf("fs_Received__Data:%s", data)
		return c.String(http.StatusOK, "OK")
	})

	e.Logger.Fatal(e.Start(":8080"))
}

type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
	var m PubSubMessage
	body, err := ioutil.ReadAll(r)
	if err != nil {
		log.Printf("ioutil.ReadAll: %v", err)
		return nil, err
	}
	if err := json.Unmarshal(body, &m); err != nil {
		log.Printf("json.Unmarshal: %v", err)
		return nil, err
	}
	return &m, nil
}

Un shell qui atteint 5 extrémités en séquence

C'est assez dur.

#!/usr/bin/env bash
set -euox pipefail

for i in {0..15}
do
	for j in {1..5}
	do
		curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
		sleepenh 0.005
	done
	sleepenh 0.005
done

Entraine toi

image

Un service avec une bouche REST est placé sur Cloud Run, et lorsque l'accès arrive, il est publié sur Cloud Pub / Sub, et l'abonnement préparé en tant que type push est également placé sur Cloud Run. Passer la demande au service avec.

screenshot-app.cloudskew.com-2020.10.13-01_50_36.png

Pour le sujet régulier et l'abonnement

Paramètres de sujet et d'abonnement

screenshot-console.cloud.google.com-2020.10.12-23_04_10.png

screenshot-console.cloud.google.com-2020.10.12-23_06_00.png

Résultat d'entrée de message

Les messages atteignent les points de terminaison à plusieurs reprises dans l'ordre suivant, et bien qu'ils soient dans cet ordre dans le journal de l'éditeur, ils ne sont pas dans un ordre particulier dans le journal des abonnés. /order01 /order02 /order03 /order04 /order05

screenshot-console.cloud.google.com-2020.10.13-00_48_38.png

Bien sûr, c'est dans l'ordre sur le journal émis par le shell qui atteint le point final.

Screenshot at 2020-10-13 00-52-17.png

Pour le sujet et l'abonnement prenant en charge "Spécifier l'ordre des messages"

Paramètres de sujet et d'abonnement

Puisqu'il y a une condition qui «existe dans la même région», spécifiez la région du sujet.

screenshot-console.cloud.google.com-2020.10.12-23_14_13.png screenshot-console.cloud.google.com-2020.10.12-23_24_06.png

Bien entendu, "Spécifier l'ordre des messages" doit être "Activé".

screenshot-console.cloud.google.com-2020.10.12-23_22_08.png

Résultat d'entrée de message

Après l'avoir essayé plusieurs fois, il y a eu des cas où l'ordre d'abonnement a été modifié au début, mais après cela, l'ordre était correct. L'ordre est plus stable que la combinaison Sujet / Abonnement normale, qui était plutôt dans le désordre.

Screenshot at 2020-10-13 01-30-45.png

Sommaire

Même dans le mode "spécification de l'ordre des messages", l'ordre a été changé, donc je ne pouvais pas dire "cela peut être réalisé" pour le sujet. .. .. Que ce soit une question de savoir comment l'essayer, ou peut-être la version bêta, si vous en avez envie, découvrons-le. .. ..

Recommended Posts

Une file d'attente FIFO peut-elle être créée en "spécifiant l'ordre des messages" dans Cloud Pub / Sub?
Analyse de texte pouvant être effectuée en 5 minutes [Word Cloud]