Learn Service Bus Queue with Azure SDK for Go (1)

When I contributed to Keda, I needed to know the behavior of Service Bus, which I hadn't touched much, so I did a quick survey. The library uses the following.

Queue Client

Authenticate so that the Queue can be operated. Service Bus and Queue are created in advance. Since ConnectionString of ServiceBus namespace is required for connection, it is acquired and passed from the environment variable. An instance of NameSpace is created by passing ConnectionString, and NameSpace creates a Queue client with the NewQueue method. It's very simple and nice.

func main() {
	fmt.Println("Azure ServiceBus Queue Sender")
	 connectionString := os.Getenv("ConnectionString")
	 queueName := os.Getenv("queueName")
	 if len(os.Args) != 2 {
	 	log.Fatalf("Specify the counter parameter. e.g. send 100 Parameter length: %d\n", len(os.Args))
	 }
	 count, err := strconv.Atoi(os.Args[1])
	 if err != nil {
	 	log.Fatalf("count should be integer : %s", os.Args[1])
	 }
	// Create a client to communicate with a Service Bus Namespace
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
	if err != nil {
		log.Fatal("Cannot create a client for the Service Bus Namespace", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	// Create a client to communicate with the queue

	q, err := ns.NewQueue(queueName)
	if err != nil {
		log.Fatal("Cannot create a client for the queue", err)
	}

Send Message

To send a message, use the Send method on the Queue object. If you want to send in bulk, func (q * Queue) SendBatch (ctx context.Context, iterator BatchIterator) error exists, so you can use that.

for i := 0; i < count; i++ {
	err = q.Send(ctx, servicebus.NewMessageFromString("Hello!"))
	fmt.Printf("Send Hello %d \n", i)
	if err != nil {
		log.Fatal(err)
	}
}

Receive Message

Receiving a message is also very simple, if you pass a handler for the callback, the handler will be executed when the message arrives. It looks asynchronous for a moment, but ReceiveOne behaves like blocking until control returns to the handler. The point is the last message.Complete () method. By default, the Service Bus Queue is PeekLock (https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock). When you get a Message with Receive, the message is locked and invisible to other clients. And if the process is successful, the Message will be deleted when Complete is issued. If it doesn't work, issuing ʻAbandon` will assume that Message execution has failed. If it fails a certain number of times, the message is automatically forwarded to DeadQueue.

err = q.ReceiveOne(
	ctx,
	servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
		fmt.Println(string(message.Data))
		return message.Complete(ctx)
	}))

DeadLetter

Sending a Message to DeadLetter is automatically forwarded to DeadLetter in the following cases: Moving messages to the DLQ ..

However, there may be other cases where you may want to send it to DeadLetter for the convenience of your application. In that case, use the Message's DeadLetter () to transfer to the DeadLetter. This behavior works only in PeekLock mode and cannot be used in ReceiveAndDelete mode, which will be explained later.

Personally, DeadLetterWithInfo () is better than DeadLetter (). The reason is that you can pass a custom property and have it as metadata, for example, why you entered DeadLetter. I've also passed properties like ʻerror and servicebus.ErrorInternalError`, but I'm still not sure where to look at this metadata. At least this would be better if you could add load information.

err = q.ReceiveOne(
	ctx,
	servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {

		fmt.Println(message.LockToken.String())  // It will be null when the ReceiveAndDelete mode
		fmt.Println(string(message.Data))
		fmt.Println("Waiting...")
		time.Sleep(time.Second * 10)
		fmt.Println("Done.")
		m := make(map[string]string)
		m["rootCause"] = "hagechabin"
		return message.DeadLetterWithInfo(ctx, errors.New("go to deadletter"), servicebus.ErrorInternalError, m) // Doesn't work for ReceiveAndDelete

	}))

If you look at it in Service Bus Explorer, you can read the metadata firmly. image.png

DeadLetterOnMessageExpiration

You can set it to automatically go to the Dead Letter when the message Expires, but that's not the default. You can set it for an instance called QueueManager. However, if you do this twice, you will get a Conflict error. As a matter of fact, it seems good to create a Queue with ARM Template when creating ServiceBus. This API seems to be an ARM API, and if you look at Create, it has BODY. Probably pass the ARM Template to Body. By the way, Put has no Body, but it will create it if it doesn't exist, so this seems to be better when writing tests. If you delete it and make it with Put, you shouldn't get the Conflict error.

	qm := ns.NewQueueManager()
	qe, err := qm.Put(ctx, "abc", servicebus.QueueEntityWithDeadLetteringOnMessageExpiration()) // Doesn't work
	if err != nil {
		fmt.Println("error :", err.Error())
	} else {
		fmt.Println("endity anme: ", qe.Name)
	}

Once set, you can see that the settings have been changed as follows.

image.png

ReceiveAndDelete Mode

PeekLock first puts the lock on the message and then deletes it with Complete, but ReceiveAndDelete deletes the Message from the Queue when it is received. So you can't forward it to DeadLetter yourself. DeadLette's Queue API is also available, but since it's read-only, I don't want ReceiveAndDelete to write the process of sending it to DeadLetter by myself. If you really want to do it, you should create a custom Queue and run it there. Changing the settings is easy, just specify the following options when creating a Queue client.

q, err := ns.NewQueue(queueName, servicebus.QueueWithReceiveAndDelete())

Active Message Count

In fact, all this survey was to investigate the message count. Using * entity.CountDetails.ActiveMessageCount will get the number of messages in the Queue, but the problem is that it also counts the number of messages currently locked. I'm writing a KEDA Scaler, so I need a number of messages that aren't currently being processed. What should i do? Unfortunately no method has been found so far. I used Peek and there was LockToken, so I tried to limit this to the number of things that nil does not appear, but apparently it was locked on the receiving side. The numbers are different, so it looks a little different.

If it is my custom app, when I get the message, I write the ID on the Map, delete it with Complete or Abandan, and refer to it when I get it with Peek. However, the structure of the application I am writing is abstracted, and the scale logic and scaler for individual resources are abstracted, and if it is StorageQueue etc., it can be counted properly. So it doesn't matter.

As a countermeasure, if you set ReceiveAndDelete on the side that receives the Message, the number of counts will be reasonable, but usually you want to operate with PeekLock, so it is delicate. I would like to continue researching and updating this blog when I find out.

	m := ns.NewQueueManager()
	ctx2 := context.Background()

	for {
		entity, _ := m.Get(ctx2, queueName)
		fmt.Println("ActiveMessageCount: ", *entity.CountDetails.ActiveMessageCount)
		ctx := context.Background()
		iterator, _ := q.Peek(ctx)
		var c int
		for {
			item, _ := iterator.Next(ctx)
			if item != nil && item.LockToken != nil {
				fmt.Printf("lockToken: %s \n", item.LockToken.String())
			} else {
				if item != nil {
					fmt.Println("lockToken: nil")
					c++
				} else {
					fmt.Println("item is nil")
				}
			}
			if item != nil {
				body, _ := json.Marshal(item)
				fmt.Println(string(body))
			}

			if item == nil {
				iterator.Done()
				break
			}
		}
		fmt.Println("Count: ", c)
		time.Sleep(time.Second * 2)
	}

Summary

I couldn't verify how to get the count other than the locked message, which was the purpose of the investigation, but I could confirm the peripheral knowledge of Service Bus and the behavior of Go SDK, so I summarized it in the blog. I've put the source code in the resource, so you can try it if you like.

Resource

Recommended Posts

Learn Service Bus Queue with Azure SDK for Go (1)
Azure SDK for Go setup (macOS)
Azure SDK for Go setup (macOS)
Introduction
Learn algorithms with Go @ recursive call
Tips for running Go with docker
[Azure] Hit Custom Vision Service with Python
Try using S3 object upload and download with AWS SDK for Go v2
Join Azure Using Go ~ For those who want to start and know Azure with Go ~
Upgrade the Azure Machine Learning SDK for Python
Learn algorithms with Go @ Full search_Linear search method
Use Python / Django with Windows Azure Cloud Service!
[Azure Functions / Python] Chain functions with Queue Storage binding
Get an Access Token for your service account with the Firebase Admin Python SDK
Check! How to use Azure Key Vault with Azure SDK for Python! (Measures around authentication)