When I contributed to Keda, I needed to know the behavior of Service Bus, which I hadn't touched much, so I did a quick survey. The library uses the following.
Queue Client
Authenticate so that the Queue can be operated. Service Bus and Queue are created in advance. Since ConnectionString
of ServiceBus namespace is required for connection, it is acquired and passed from the environment variable. An instance of NameSpace
is created by passing ConnectionString
, and NameSpace
creates a Queue client with the NewQueue
method. It's very simple and nice.
func main() {
fmt.Println("Azure ServiceBus Queue Sender")
connectionString := os.Getenv("ConnectionString")
queueName := os.Getenv("queueName")
if len(os.Args) != 2 {
log.Fatalf("Specify the counter parameter. e.g. send 100 Parameter length: %d\n", len(os.Args))
}
count, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Fatalf("count should be integer : %s", os.Args[1])
}
// Create a client to communicate with a Service Bus Namespace
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
if err != nil {
log.Fatal("Cannot create a client for the Service Bus Namespace", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create a client to communicate with the queue
q, err := ns.NewQueue(queueName)
if err != nil {
log.Fatal("Cannot create a client for the queue", err)
}
Send Message
To send a message, use the Send
method on the Queue object. If you want to send in bulk, func (q * Queue) SendBatch (ctx context.Context, iterator BatchIterator) error
exists, so you can use that.
for i := 0; i < count; i++ {
err = q.Send(ctx, servicebus.NewMessageFromString("Hello!"))
fmt.Printf("Send Hello %d \n", i)
if err != nil {
log.Fatal(err)
}
}
Receive Message
Receiving a message is also very simple, if you pass a handler for the callback, the handler will be executed when the message arrives. It looks asynchronous for a moment, but ReceiveOne
behaves like blocking until control returns to the handler. The point is the last message.Complete ()
method. By default, the Service Bus Queue is PeekLock (https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock). When you get a Message with Receive
, the message is locked and invisible to other clients. And if the process is successful, the Message will be deleted when Complete
is issued. If it doesn't work, issuing ʻAbandon` will assume that Message execution has failed. If it fails a certain number of times, the message is automatically forwarded to DeadQueue.
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(string(message.Data))
return message.Complete(ctx)
}))
DeadLetter
Sending a Message to DeadLetter is automatically forwarded to DeadLetter in the following cases: Moving messages to the DLQ ..
However, there may be other cases where you may want to send it to DeadLetter for the convenience of your application. In that case, use the Message's DeadLetter ()
to transfer to the DeadLetter. This behavior works only in PeekLock
mode and cannot be used in ReceiveAndDelete
mode, which will be explained later.
Personally, DeadLetterWithInfo ()
is better than DeadLetter ()
. The reason is that you can pass a custom property and have it as metadata, for example, why you entered DeadLetter. I've also passed properties like ʻerror and
servicebus.ErrorInternalError`, but I'm still not sure where to look at this metadata. At least this would be better if you could add load information.
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(message.LockToken.String()) // It will be null when the ReceiveAndDelete mode
fmt.Println(string(message.Data))
fmt.Println("Waiting...")
time.Sleep(time.Second * 10)
fmt.Println("Done.")
m := make(map[string]string)
m["rootCause"] = "hagechabin"
return message.DeadLetterWithInfo(ctx, errors.New("go to deadletter"), servicebus.ErrorInternalError, m) // Doesn't work for ReceiveAndDelete
}))
If you look at it in Service Bus Explorer, you can read the metadata firmly.
DeadLetterOnMessageExpiration
You can set it to automatically go to the Dead Letter when the message Expires, but that's not the default. You can set it for an instance called QueueManager
. However, if you do this twice, you will get a Conflict
error. As a matter of fact, it seems good to create a Queue with ARM Template when creating ServiceBus. This API seems to be an ARM API, and if you look at Create
, it has BODY. Probably pass the ARM Template to Body. By the way, Put has no Body, but it will create it if it doesn't exist, so this seems to be better when writing tests. If you delete it and make it with Put, you shouldn't get the Conflict
error.
qm := ns.NewQueueManager()
qe, err := qm.Put(ctx, "abc", servicebus.QueueEntityWithDeadLetteringOnMessageExpiration()) // Doesn't work
if err != nil {
fmt.Println("error :", err.Error())
} else {
fmt.Println("endity anme: ", qe.Name)
}
Once set, you can see that the settings have been changed as follows.
ReceiveAndDelete Mode
PeekLock
first puts the lock on the message and then deletes it with Complete
, but ReceiveAndDelete
deletes the Message from the Queue when it is received. So you can't forward it to DeadLetter yourself. DeadLette's Queue API is also available, but since it's read-only, I don't want ReceiveAndDelete
to write the process of sending it to DeadLetter by myself. If you really want to do it, you should create a custom Queue and run it there. Changing the settings is easy, just specify the following options when creating a Queue client.
q, err := ns.NewQueue(queueName, servicebus.QueueWithReceiveAndDelete())
Active Message Count
In fact, all this survey was to investigate the message count. Using * entity.CountDetails.ActiveMessageCount
will get the number of messages in the Queue, but the problem is that it also counts the number of messages currently locked.
I'm writing a KEDA Scaler, so I need a number of messages that aren't currently being processed. What should i do? Unfortunately no method has been found so far. I used Peek
and there was LockToken
, so I tried to limit this to the number of things that nil
does not appear, but apparently it was locked on the receiving side. The numbers are different, so it looks a little different.
If it is my custom app, when I get the message, I write the ID on the Map, delete it with Complete or Abandan, and refer to it when I get it with Peek. However, the structure of the application I am writing is abstracted, and the scale logic and scaler for individual resources are abstracted, and if it is StorageQueue etc., it can be counted properly. So it doesn't matter.
As a countermeasure, if you set ReceiveAndDelete
on the side that receives the Message, the number of counts will be reasonable, but usually you want to operate with PeekLock
, so it is delicate. I would like to continue researching and updating this blog when I find out.
m := ns.NewQueueManager()
ctx2 := context.Background()
for {
entity, _ := m.Get(ctx2, queueName)
fmt.Println("ActiveMessageCount: ", *entity.CountDetails.ActiveMessageCount)
ctx := context.Background()
iterator, _ := q.Peek(ctx)
var c int
for {
item, _ := iterator.Next(ctx)
if item != nil && item.LockToken != nil {
fmt.Printf("lockToken: %s \n", item.LockToken.String())
} else {
if item != nil {
fmt.Println("lockToken: nil")
c++
} else {
fmt.Println("item is nil")
}
}
if item != nil {
body, _ := json.Marshal(item)
fmt.Println(string(body))
}
if item == nil {
iterator.Done()
break
}
}
fmt.Println("Count: ", c)
time.Sleep(time.Second * 2)
}
I couldn't verify how to get the count other than the locked message, which was the purpose of the investigation, but I could confirm the peripheral knowledge of Service Bus and the behavior of Go SDK, so I summarized it in the blog. I've put the source code in the resource, so you can try it if you like.
Resource
Recommended Posts