Apprenez la file d'attente Service Bus avec Azure SDK pour Go (1)

Lorsque j'ai contribué à Keda, j'avais besoin de connaître le comportement de Service Bus, ce que je n'avais pas beaucoup touché, alors j'ai fait une rapide enquête. La bibliothèque utilise les éléments suivants.

Queue Client

Authentifiez-vous pour que la file d'attente puisse être utilisée. ServiceBus et Queue sont créés à l'avance. Puisque ConnectionString de l'espace de noms ServiceBus est requis pour la connexion, il est acquis et transmis à partir de la variable d'environnement. Une instance de NameSpace est créée en passant ConnectionString, et NameSpace crée un client Queue avec la méthode NewQueue. C'est très simple et agréable.

func main() {
	fmt.Println("Azure ServiceBus Queue Sender")
	 connectionString := os.Getenv("ConnectionString")
	 queueName := os.Getenv("queueName")
	 if len(os.Args) != 2 {
	 	log.Fatalf("Specify the counter parameter. e.g. send 100 Parameter length: %d\n", len(os.Args))
	 }
	 count, err := strconv.Atoi(os.Args[1])
	 if err != nil {
	 	log.Fatalf("count should be integer : %s", os.Args[1])
	 }
	// Create a client to communicate with a Service Bus Namespace
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
	if err != nil {
		log.Fatal("Cannot create a client for the Service Bus Namespace", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	// Create a client to communicate with the queue

	q, err := ns.NewQueue(queueName)
	if err != nil {
		log.Fatal("Cannot create a client for the queue", err)
	}

Send Message

Pour envoyer un message, utilisez la méthode Send sur l'objet Queue. Si vous voulez envoyer en masse, une erreur `func (q * Queue) SendBatch (ctx context.Context, iterator BatchIterator) 'existe, vous pouvez donc l'utiliser.

for i := 0; i < count; i++ {
	err = q.Send(ctx, servicebus.NewMessageFromString("Hello!"))
	fmt.Printf("Send Hello %d \n", i)
	if err != nil {
		log.Fatal(err)
	}
}

Receive Message

Recevoir un message est également très simple, si vous passez un gestionnaire pour le rappel, le gestionnaire sera exécuté lorsque le message arrivera. Bien qu'il semble asynchrone pendant un moment, ReceiveOne se comporte comme un blocage jusqu'à ce que le contrôle revienne au gestionnaire. Le point est la dernière méthode message.Complete (). Par défaut, la file d'attente Service Bus est PeekLock (https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock). Lorsque vous recevez un message avec «Recevoir», le message est verrouillé et invisible pour les autres clients. Et si le processus réussit, le message sera supprimé lorsque «Complete» sera émis. Si cela ne fonctionne pas, émettre ʻAbandon` supposera que l'exécution du message a échoué. S'il échoue un certain nombre de fois, le message est automatiquement transmis à DeadQueue.

err = q.ReceiveOne(
	ctx,
	servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
		fmt.Println(string(message.Data))
		return message.Complete(ctx)
	}))

DeadLetter

L'envoi d'un message à DeadLetter est automatiquement transmis à DeadLetter dans les cas suivants: Déplacement des messages vers le DLQ ..

Cependant, il peut y avoir d'autres cas où vous voudrez peut-être l'envoyer à DeadLetter pour la commodité de votre application. Dans ce cas, utilisez le DeadLetter () du Message pour transférer vers DeadLetter. Ce comportement ne fonctionne qu'en mode PeekLock et ne peut pas être utilisé en mode ReceiveAndDelete, ce qui sera expliqué plus loin.

Personnellement, DeadLetterWithInfo () est meilleur que DeadLetter (). La raison est que vous pouvez transmettre une propriété personnalisée et l'avoir comme métadonnées, par exemple, pourquoi vous avez entré DeadLetter. J'ai également passé des propriétés telles que ʻerror et servicebus.ErrorInternalError`, mais je ne suis toujours pas sûr de savoir où regarder ces métadonnées. Au moins, ce serait mieux si vous pouviez ajouter des informations de chargement.

err = q.ReceiveOne(
	ctx,
	servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {

		fmt.Println(message.LockToken.String())  // It will be null when the ReceiveAndDelete mode
		fmt.Println(string(message.Data))
		fmt.Println("Waiting...")
		time.Sleep(time.Second * 10)
		fmt.Println("Done.")
		m := make(map[string]string)
		m["rootCause"] = "hagechabin"
		return message.DeadLetterWithInfo(ctx, errors.New("go to deadletter"), servicebus.ErrorInternalError, m) // Doesn't work for ReceiveAndDelete

	}))

Si vous le regardez dans Service Bus Explorer, vous pouvez lire les métadonnées fermement. image.png

DeadLetterOnMessageExpiration

Vous pouvez le configurer pour qu'il accède automatiquement à la lettre morte lorsque le message expire, mais ce n'est pas la valeur par défaut. Il est bon de le définir pour l'instance appelée QueueManager. Cependant, si vous faites cela deux fois, vous obtiendrez une erreur «Conflit». En fait, il semble bon de créer une file d'attente avec un modèle ARM lors de la création d'un ServiceBus. Cette API semble être une API ARM, et si vous regardez Create, elle a BODY. Passez probablement le modèle ARM à Body. Au fait, Put n'a pas de corps, mais il le créera s'il n'existe pas, donc cela semble être mieux lors de l'écriture d'un test. Si vous le supprimez et le faites avec Put, je ne pense pas que vous obtiendrez l'erreur Conflict.

	qm := ns.NewQueueManager()
	qe, err := qm.Put(ctx, "abc", servicebus.QueueEntityWithDeadLetteringOnMessageExpiration()) // Doesn't work
	if err != nil {
		fmt.Println("error :", err.Error())
	} else {
		fmt.Println("endity anme: ", qe.Name)
	}

Une fois définis, vous pouvez voir que les paramètres ont été modifiés comme suit.

image.png

ReceiveAndDelete Mode

PeekLock met d'abord le verrou sur le message, puis le supprime avec Complete, mais ReceiveAndDelete supprime le message de la file d'attente lorsqu'il est reçu. Vous ne pouvez donc pas le transférer vous-même vers DeadLetter. L'API Queue de DeadLette est également disponible, mais comme elle est en lecture seule, je ne veux pas que vous écriviez vous-même le processus d'envoi à DeadLetter avec ReceiveAndDelete. Si vous voulez vraiment le faire, vous devez créer une file d'attente personnalisée et l'exécuter là-bas. La modification des paramètres est facile, spécifiez simplement les options suivantes lors de la création d'un client de file d'attente.

q, err := ns.NewQueue(queueName, servicebus.QueueWithReceiveAndDelete())

Active Message Count

En fait, toute cette enquête visait à enquêter sur le nombre de messages. Utiliser * entity.CountDetails.ActiveMessageCount obtiendra le nombre de messages dans la file d'attente, mais le problème est qu'il compte également le nombre de messages actuellement verrouillés. J'écris un KEDA Scaler, j'ai donc besoin d'un certain nombre de messages qui ne sont pas actuellement traités. Que devrais-je faire? Malheureusement, aucune méthode n'a été trouvée jusqu'à présent. J'ai utilisé Peek et il y avait LockToken, alors j'ai essayé de limiter cela au nombre qui n'apparaît pas dans nil, mais il semble que le côté récepteur était verrouillé. Les chiffres sont différents, donc cela semble un peu différent.

S'il s'agit de votre propre application personnalisée, lorsque vous recevez le message, écrivez l'ID sur la carte, supprimez-la avec Complete ou Abandan, et reportez-vous à celle-ci lorsque vous l'obtenez avec Peek. Cependant, la structure de l'application que j'écris est abstraite, et la logique d'échelle et le scaler pour les ressources individuelles sont abstraits, et s'il s'agit de StorageQueue, etc., il peut être compté correctement. Donc ça n'a pas d'importance.

En guise de contre-mesure, si vous définissez ReceiveAndDelete du côté qui reçoit le message, le nombre de décomptes sera raisonnable, mais généralement vous voulez utiliser PeekLock, donc c'est délicat. J'aimerais continuer à enquêter et mettre à jour ce blog lorsque je le découvrirai.

	m := ns.NewQueueManager()
	ctx2 := context.Background()

	for {
		entity, _ := m.Get(ctx2, queueName)
		fmt.Println("ActiveMessageCount: ", *entity.CountDetails.ActiveMessageCount)
		ctx := context.Background()
		iterator, _ := q.Peek(ctx)
		var c int
		for {
			item, _ := iterator.Next(ctx)
			if item != nil && item.LockToken != nil {
				fmt.Printf("lockToken: %s \n", item.LockToken.String())
			} else {
				if item != nil {
					fmt.Println("lockToken: nil")
					c++
				} else {
					fmt.Println("item is nil")
				}
			}
			if item != nil {
				body, _ := json.Marshal(item)
				fmt.Println(string(body))
			}

			if item == nil {
				iterator.Done()
				break
			}
		}
		fmt.Println("Count: ", c)
		time.Sleep(time.Second * 2)
	}

Sommaire

Je n'ai pas pu vérifier comment obtenir le décompte autre que le message verrouillé, ce qui était le but de l'enquête, mais je pouvais confirmer la connaissance périphérique de ServiceBus et le comportement de Go SDK, alors je l'ai résumé dans le blog. J'ai mis le code source dans la ressource, vous pouvez donc l'essayer si vous le souhaitez.

Resource

Recommended Posts

Apprenez la file d'attente Service Bus avec Azure SDK pour Go (1)
introduction
Conseils pour exécuter Go avec Docker
[Azure] Hit Custom Vision Service avec Python
Rejoignez Azure avec Go ~ Pour ceux qui veulent démarrer et connaître Azure avec Go ~
Utilisez Python / Django avec Windows Azure Cloud Service!
[Azure Functions / Python] Fonctions de chaîne avec liaison de stockage de file d'attente
Obtenez AccessToken pour le compte de service avec le SDK Firebase Admin Python
Vérifiez! Comment utiliser Azure Key Vault avec Azure SDK pour Python! (Mesures autour de la certification)