[GO] Kann eine FIFO-Warteschlange durch "Festlegen der Reihenfolge der Nachrichten" in Cloud Pub / Sub realisiert werden?

Thema

Verwenden Sie Cloud Pub / Sub? Es wird verwendet, wenn Sie eine große Anzahl von Nachrichten verarbeiten möchten. Ist dies die Reihenfolge, in der Nachrichten gesendet werden, wenn Sie dies garantieren möchten oder nicht? Im Fall von AWS sollten Sie die [FIFO-Warteschlange] von SQS verwenden (https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html). (Ich hatte in der Vergangenheit versucht, es zu verwenden, aber ich erinnere mich, dass ich es doch nicht verwendet habe, weil ich die Region Tokio noch nicht hatte und durch das Limit von 3000 Nachrichten pro Sekunde gequetscht wurde.) Ich denke, dass es Cloud Pub / Sub in GCP ist, wenn es SQS entspricht, aber leider gibt es keine FIFO-Warteschlange. Wenn du denkst (Beta-Version) Es scheint, dass die Funktion "Reihenfolge der Nachrichten festlegen" erstellt wurde. Ich fragte mich, ob ich die Nachrichten in der Reihenfolge abonnieren könnte, in der sie veröffentlicht wurden, also versuchte ich es.

Es handelt sich jedoch ziemlich um eine Testmethode. Wenn Sie diese Art von Test versuchen, können Sie vorerst ein gewisses Maß an Verständnis dafür erlangen, dass dies das Ergebnis ist.

Angenommener Leser

Annahme

--Go Entwicklungsumgebung wurde lokal erstellt.

Entwicklungsumgebung

OS - Linux(Ubuntu)

$ cat /etc/os-release 
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"

#Backend

#Language --Golang

$ go version
go version go1.15.2 linux/amd64

IDE - Goland

GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020

Alle Quellen diesmal

https://github.com/sky0621/go-publisher/tree/v0.1.0 https://github.com/sky0621/go-subscriber/tree/v0.1.0

Quellenauszug Kommentar

go-publisher

Bereiten Sie 5 Endpunkte vor. Die Teile von "topic.EnableMessageOrdering = true" und "OrderingKey: operationSequence" sind tatsächlich der Code, der zum "Bestellen von Nachrichten" erforderlich ist. Das Thema ("Mein normales Thema"), das Nachrichten mit dieser Quelle veröffentlicht, entspricht jedoch einem Abonnement, das nicht für "Nachrichtenbestellung" vorgesehen ist, sodass "Nachrichtenbestellung" nicht funktioniert.

main.go


package main

import (
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

func main() {
	project := os.Getenv("PUB_PROJECT")

	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	e.GET("/order01", handler(project, "order01"))
	e.GET("/order02", handler(project, "order02"))
	e.GET("/order03", handler(project, "order03"))
	e.GET("/order04", handler(project, "order04"))
	e.GET("/order05", handler(project, "order05"))

	e.Logger.Fatal(e.Start(":8080"))
}

func handler(project, path string) func(c echo.Context) error {
	return func(c echo.Context) error {
		ctx := c.Request().Context()
		operationSequence := createOperationSequence()

		client, err := pubsub.NewClient(ctx, project)
		if err != nil {
			log.Fatal(err)
		}

		topic := client.Topic("my-normal-topic")
		defer topic.Stop()

		topic.EnableMessageOrdering = true

		message := &pubsub.Message{
			OrderingKey: operationSequence,
			Data:        []byte(path + ":" + operationSequence),
		}
		r := topic.Publish(ctx, message)
		if r == nil {
			log.Fatal("failed to topic.Publish!")
		}
		log.Printf("%+v", r)

		return c.String(http.StatusOK, path+":"+operationSequence)
	}
}

func createOperationSequence() string {
	return fmt.Sprintf("%d", time.Now().UnixNano())
}

go-subscriber

main.go


package main

import (
	"encoding/json"
	"io"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

func main() {
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	e.POST("/", func(c echo.Context) error {
		m, err := unmarshal(c.Request().Body)
		if err != nil {
			return c.String(http.StatusBadRequest, err.Error())
		}
		data := string(m.Message.Data)
		log.Printf("fs_Received__Data:%s", data)
		return c.String(http.StatusOK, "OK")
	})

	e.Logger.Fatal(e.Start(":8080"))
}

type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
	var m PubSubMessage
	body, err := ioutil.ReadAll(r)
	if err != nil {
		log.Printf("ioutil.ReadAll: %v", err)
		return nil, err
	}
	if err := json.Unmarshal(body, &m); err != nil {
		log.Printf("json.Unmarshal: %v", err)
		return nil, err
	}
	return &m, nil
}

Eine Shell, die nacheinander 5 Endpunkte trifft

Es ist ziemlich rau.

#!/usr/bin/env bash
set -euox pipefail

for i in {0..15}
do
	for j in {1..5}
	do
		curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
		sleepenh 0.005
	done
	sleepenh 0.005
done

Trainieren

Bild

Ein Dienst mit einem REST-Mund wird auf Cloud Run gesetzt, und wenn der Zugriff erfolgt, wird er in Cloud Pub / Sub veröffentlicht, und das als Push-Typ vorbereitete Abonnement wird auch auf Cloud Run gesetzt. Überspringen Sie die Anfrage an den Dienst mit.

screenshot-app.cloudskew.com-2020.10.13-01_50_36.png

Für reguläres Thema und Abonnement

Themen- und Abonnementeinstellungen

screenshot-console.cloud.google.com-2020.10.12-23_04_10.png

screenshot-console.cloud.google.com-2020.10.12-23_06_00.png

Ergebnis der Nachrichteneingabe

Die Nachrichten treffen wiederholt auf die Endpunkte in der folgenden Reihenfolge, und obwohl sie im Publisher-Protokoll in dieser Reihenfolge vorliegen, befinden sie sich im Abonnentenprotokoll in keiner bestimmten Reihenfolge. /order01 /order02 /order03 /order04 /order05

screenshot-console.cloud.google.com-2020.10.13-00_48_38.png

Natürlich ist es in der Reihenfolge, in der das von der Shell ausgegebene Protokoll den Endpunkt erreicht.

Screenshot at 2020-10-13 00-52-17.png

Für Themen und Abonnements, die "Nachrichtenreihenfolge angeben" unterstützen

Themen- und Abonnementeinstellungen

Da es eine Bedingung gibt, dass "in derselben Region vorhanden ist", geben Sie die Themenregion an.

screenshot-console.cloud.google.com-2020.10.12-23_14_13.png screenshot-console.cloud.google.com-2020.10.12-23_24_06.png

Natürlich sollte "Nachrichtenreihenfolge angeben" "Aktiviert" sein.

screenshot-console.cloud.google.com-2020.10.12-23_22_08.png

Ergebnis der Nachrichteneingabe

Nach mehrmaligem Versuch gab es einige Fälle, in denen die Reihenfolge des Abonnements zu Beginn geändert wurde, aber danach war die Reihenfolge korrekt. Die Reihenfolge ist stabiler als die normale Kombination aus Thema und Abonnement, die eher außer Betrieb war.

Screenshot at 2020-10-13 01-30-45.png

Zusammenfassung

Selbst im Modus "Spezifikation der Nachrichtenreihenfolge" wurde die Reihenfolge geändert, sodass ich für den Betreff nicht sagen konnte, dass sie realisiert werden kann. .. .. Ob es darum geht, wie man es versucht, oder ob es sich um die Beta-Version handelt, wenn Sie Lust dazu haben, lassen Sie uns herausfinden. .. ..

Recommended Posts

Kann eine FIFO-Warteschlange durch "Festlegen der Reihenfolge der Nachrichten" in Cloud Pub / Sub realisiert werden?
Textanalyse, die in 5 Minuten durchgeführt werden kann [Word Cloud]