Lernen Sie die Service Bus-Warteschlange mit Azure SDK for Go kennen (1)

Als ich zu Keda beitrug, musste ich das Verhalten von Service Bus kennen, das ich nicht sehr berührt hatte, also machte ich eine kurze Umfrage. Die Bibliothek verwendet Folgendes.

Queue Client

Authentifizieren Sie sich, damit die Warteschlange betrieben werden kann. ServiceBus und Queue werden im Voraus erstellt. Da für die Verbindung "ConnectionString" des ServiceBus-Namespace erforderlich ist, wird dieser von der Umgebungsvariablen erfasst und übergeben. Eine Instanz von "NameSpace" wird durch Übergeben von "ConnectionString" erstellt, und "NameSpace" erstellt einen Queue-Client mit der Methode "NewQueue". Es ist sehr einfach und schön.

func main() {
	fmt.Println("Azure ServiceBus Queue Sender")
	 connectionString := os.Getenv("ConnectionString")
	 queueName := os.Getenv("queueName")
	 if len(os.Args) != 2 {
	 	log.Fatalf("Specify the counter parameter. e.g. send 100 Parameter length: %d\n", len(os.Args))
	 }
	 count, err := strconv.Atoi(os.Args[1])
	 if err != nil {
	 	log.Fatalf("count should be integer : %s", os.Args[1])
	 }
	// Create a client to communicate with a Service Bus Namespace
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
	if err != nil {
		log.Fatal("Cannot create a client for the Service Bus Namespace", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	// Create a client to communicate with the queue

	q, err := ns.NewQueue(queueName)
	if err != nil {
		log.Fatal("Cannot create a client for the queue", err)
	}

Send Message

Verwenden Sie zum Senden einer Nachricht die Methode "Senden" für das Warteschlangenobjekt. Wenn Sie in großen Mengen senden möchten, ist der Fehler "func (q * Queue) SendBatch (ctx context.Context, iterator BatchIterator)" vorhanden, sodass Sie diesen verwenden können.

for i := 0; i < count; i++ {
	err = q.Send(ctx, servicebus.NewMessageFromString("Hello!"))
	fmt.Printf("Send Hello %d \n", i)
	if err != nil {
		log.Fatal(err)
	}
}

Receive Message

Das Empfangen einer Nachricht ist auch sehr einfach. Wenn Sie einen Handler für den Rückruf übergeben, wird der Handler ausgeführt, wenn die Nachricht eintrifft. Obwohl es für einen Moment asynchron aussieht, verhält sich "ReceiveOne" wie Blockieren, bis die Steuerung zum Handler zurückkehrt. Der Punkt ist die letzte message.Complete () Methode. Standardmäßig lautet die Service Bus-Warteschlange PeekLock (https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock). Wenn Sie eine Nachricht mit "Empfangen" erhalten, ist die Nachricht gesperrt und für andere Clients unsichtbar. Und wenn der Prozess erfolgreich ist, wird die Nachricht gelöscht, wenn "Vollständig" ausgegeben wird. Wenn dies nicht funktioniert, wird bei der Ausgabe von "Abandon" davon ausgegangen, dass die Nachrichtenausführung fehlgeschlagen ist. Wenn es eine bestimmte Anzahl von Malen fehlschlägt, wird die Nachricht automatisch an DeadQueue weitergeleitet.

err = q.ReceiveOne(
	ctx,
	servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
		fmt.Println(string(message.Data))
		return message.Complete(ctx)
	}))

DeadLetter

Das Senden einer Nachricht an DeadLetter wird in den folgenden Fällen automatisch an DeadLetter weitergeleitet: Verschieben von Nachrichten in den DLQ ..

Es kann jedoch auch andere Fälle geben, in denen Sie es zur Vereinfachung Ihrer Anwendung an DeadLetter senden möchten. Verwenden Sie in diesem Fall den DeadLetter () der Nachricht, um zum DeadLetter zu übertragen. Dieses Verhalten funktioniert nur im PeekLock-Modus und kann nicht im ReceiveAndDelete-Modus verwendet werden, der später erläutert wird.

Persönlich ist "DeadLetterWithInfo ()" besser als "DeadLetter ()". Der Grund dafür ist, dass Sie eine benutzerdefinierte Eigenschaft übergeben und als Metadaten verwenden können, z. B. warum Sie DeadLetter eingegeben haben. Ich habe auch andere Eigenschaften wie "error" und "servicebus.ErrorInternalError" übergeben, bin mir aber immer noch nicht sicher, wo ich diese Metadaten anzeigen soll. Zumindest wäre dies besser, wenn Sie Ladeinformationen hinzufügen könnten.

err = q.ReceiveOne(
	ctx,
	servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {

		fmt.Println(message.LockToken.String())  // It will be null when the ReceiveAndDelete mode
		fmt.Println(string(message.Data))
		fmt.Println("Waiting...")
		time.Sleep(time.Second * 10)
		fmt.Println("Done.")
		m := make(map[string]string)
		m["rootCause"] = "hagechabin"
		return message.DeadLetterWithInfo(ctx, errors.New("go to deadletter"), servicebus.ErrorInternalError, m) // Doesn't work for ReceiveAndDelete

	}))

Wenn Sie es sich im Service Bus Explorer ansehen, können Sie die Metadaten genau lesen. image.png

DeadLetterOnMessageExpiration

Sie können festlegen, dass beim Ablauf der Nachricht automatisch der tote Brief angezeigt wird. Dies ist jedoch nicht die Standardeinstellung. Sie können es für eine Instanz namens "QueueManager" festlegen. Wenn Sie dies jedoch zweimal tun, wird ein Konfliktfehler angezeigt. Tatsächlich scheint es gut, beim Erstellen eines ServiceBus eine Warteschlange mit ARM-Vorlage zu erstellen. Diese API scheint eine ARM-API zu sein, und wenn Sie sich "Erstellen" ansehen, hat sie BODY. Übergeben Sie wahrscheinlich die ARM-Vorlage an Body. Übrigens hat Put keinen Körper, aber er wird ihn erstellen, wenn er nicht existiert. Dies scheint also besser zu sein, wenn Sie einen Test schreiben. Wenn Sie es löschen und mit Put erstellen, sollte der Fehler "Konflikt" nicht angezeigt werden.

	qm := ns.NewQueueManager()
	qe, err := qm.Put(ctx, "abc", servicebus.QueueEntityWithDeadLetteringOnMessageExpiration()) // Doesn't work
	if err != nil {
		fmt.Println("error :", err.Error())
	} else {
		fmt.Println("endity anme: ", qe.Name)
	}

Nach dem Einstellen können Sie sehen, dass die Einstellungen wie folgt geändert wurden.

image.png

ReceiveAndDelete Mode

PeekLock setzt zuerst die Sperre für die Nachricht und löscht sie dann mit Complete, aber ReceiveAndDelete löscht die Nachricht aus der Warteschlange, wenn sie empfangen wird. Sie können es also nicht selbst auf DeadLetter übertragen. Die Warteschlangen-API von DeadLette ist ebenfalls verfügbar, aber da sie schreibgeschützt ist, möchte ich nicht, dass Sie den Prozess des Sendens an DeadLetter selbst mit "ReceiveAndDelete" schreiben. Wenn Sie dies wirklich tun möchten, sollten Sie eine benutzerdefinierte Warteschlange erstellen und dort ausführen. Das Ändern der Einstellungen ist einfach. Geben Sie beim Erstellen eines Warteschlangenclients einfach die folgenden Optionen an.

q, err := ns.NewQueue(queueName, servicebus.QueueWithReceiveAndDelete())

Active Message Count

Tatsächlich bestand diese ganze Umfrage darin, die Anzahl der Nachrichten zu untersuchen. Wenn Sie * entity.CountDetails.ActiveMessageCount verwenden, wird die Anzahl der Nachrichten in der Warteschlange abgerufen. Das Problem besteht jedoch darin, dass auch die Anzahl der aktuell gesperrten Nachrichten gezählt wird. Ich schreibe einen KEDA-Scaler, daher benötige ich eine Reihe von Nachrichten, die derzeit nicht verarbeitet werden. Was soll ich machen? Leider wurde bisher keine Methode gefunden. Ich habe "Peek" verwendet und es gab "LockToken", also habe ich versucht, dies auf die Zahl zu beschränken, die nicht in "nil" erscheint, aber anscheinend war sie auf der Empfangsseite gesperrt. Die Zahlen sind unterschiedlich, daher sieht es etwas anders aus.

Wenn es sich um Ihre eigene benutzerdefinierte App handelt, schreiben Sie beim Erhalt der Nachricht die ID auf die Karte, löschen Sie sie mit Complete oder Abandan und beziehen Sie sich darauf, wenn Sie sie mit Peek erhalten. Die Struktur der Anwendung, die ich schreibe, wird jedoch abstrahiert, und die Skalierungslogik und der Skalierer für einzelne Ressourcen werden abstrahiert. Wenn es sich um StorageQueue usw. handelt, kann sie ordnungsgemäß gezählt werden. Es spielt also keine Rolle.

Wenn Sie als Gegenmaßnahme "ReceiveAndDelete" auf der Seite festlegen, auf der die Nachricht empfangen wird, ist die Anzahl der Zählungen angemessen. In der Regel möchten Sie jedoch mit "PeekLock" arbeiten, sodass dies schwierig ist. Ich möchte diesen Blog weiter untersuchen und aktualisieren, wenn ich es herausfinde.

	m := ns.NewQueueManager()
	ctx2 := context.Background()

	for {
		entity, _ := m.Get(ctx2, queueName)
		fmt.Println("ActiveMessageCount: ", *entity.CountDetails.ActiveMessageCount)
		ctx := context.Background()
		iterator, _ := q.Peek(ctx)
		var c int
		for {
			item, _ := iterator.Next(ctx)
			if item != nil && item.LockToken != nil {
				fmt.Printf("lockToken: %s \n", item.LockToken.String())
			} else {
				if item != nil {
					fmt.Println("lockToken: nil")
					c++
				} else {
					fmt.Println("item is nil")
				}
			}
			if item != nil {
				body, _ := json.Marshal(item)
				fmt.Println(string(body))
			}

			if item == nil {
				iterator.Done()
				break
			}
		}
		fmt.Println("Count: ", c)
		time.Sleep(time.Second * 2)
	}

Zusammenfassung

Ich konnte nicht überprüfen, wie ich die Anzahl außer der gesperrten Nachricht erhalten konnte, die der Zweck der Untersuchung war, aber ich konnte das periphere Wissen von ServiceBus und das Verhalten von Go SDK bestätigen, also habe ich es im Blog zusammengefasst. Ich habe den Quellcode in die Ressource eingefügt, damit Sie ihn ausprobieren können, wenn Sie möchten.

Resource

Recommended Posts

Lernen Sie die Service Bus-Warteschlange mit Azure SDK for Go kennen (1)
Einführung
Tipps zum Ausführen Gehen Sie mit Docker
[Azure] Klicken Sie mit Python auf Custom Vision Service
Treten Sie Azure mit Go ~ bei Für diejenigen, die Azure mit Go ~ starten und kennenlernen möchten
Verwenden Sie Python / Django mit dem Windows Azure Cloud Service!
[Azure-Funktionen / Python] Kettenfunktionen mit Warteschlangenspeicherbindung
Holen Sie sich AccessToken für das Dienstkonto mit dem Firebase Admin Python SDK
Überprüfen Sie, wie Sie Azure Key Vault mit Azure SDK für Python verwenden! (Maßnahmen rund um die Zertifizierung)