Share via


Azure Functions dynamic queue message routing for Storage Queue and Service Bus samples

Azure Functions bindings is super cool feature. However, I wanted to dynamically route the queue in run time.

I want to queue to the agent_1 queue for Agent One, and the agent_2 queue for Agent Two. Azure Functions already has this functionality called Binder.
This is the function comes WebJobs. Until recently, we only have a documentation on webjobs GitHub. However, now we have it as the Azure Function documentation!

Let's try it. We can use IBinder interface and BindAsync<T>() method. However, problem is, we might not know which Class can we choose for type T. I'd like to share which class can we user for this purpose. One of the biggest pain of this issue is, there is no error if you use a wrong type for T. It looks success however, there are no queue has been emitted.

I'd like to share some successful scenarios. This function trigged by the "acceptprint" queue then send queue message to "agent_1" or "agent_2" queue according to the AgentId value.

Service Bus Queue

  [FunctionName("RequestHandler")]
 public async static Task RunAsync([ServiceBusTrigger("acceptprint", AccessRights.Manage, Connection = "ServiceBus:Connection")]string myQueueItem,  TraceWriter log, IBinder binder)
 {
   var myQueueItemObject = JsonConvert.DeserializeObject<PrintSpool>(myQueueItem);
   var serviceBusQueueAttribute = new ServiceBusAttribute($"agent_{myQueueItemObject.AgentId}", AccessRights.Manage);
   serviceBusQueueAttribute.Connection = "ServiceBus:Connection";
   var queueMessageJson = JsonConvert.SerializeObject(myQueueItemObject);
   var outputMessages = await binder.BindAsync<IAsyncCollector<string>>(serviceBusQueueAttribute);
    await outputMessages.AddAsync(queueMessageJson);
}

According to the Service Bus output bindings page, we can use these class as type T

  • T (some class which can JSON serialize)
  • string
  • byte[]
  • BrokeredMessage
  • ICollector<T> or IAsyncCollector<T>

However, we can't use T, string, byte[] and BrokerMessage as  type T in this case. BrokeredMessage doesn't have setter for message body.  I saw a sample for blob storage to use TextWriter or Stream. However, it works with no-error but it doesn't actually work. I recommend to use ICollector<T> or IAsyncCollector<T> in this case.

Storage Queue

I also tried Storage Queue. It was easy. I found a sample in this page. Although it is for Webjobs.

  var queueAttribute = new QueueAttribute($"agent{myQueueItemObject.AgentId}");
 queueAttribute.Connection = "BlobStorage:Connection";
 var outputQueue = await binder.BindAsync<CloudQueue>(queueAttribute);
 await outputQueue.AddMessageAsync(new CloudQueueMessage(queueMessageJson));

NOTE:  Service Bus can't use '_' as a queue name.

According to the Storage Queue output bindings page,  we can use there class as type T

  •  JSON serializable POCO
  • string
  • byte[]
  • CloudQueueMessage
  • ICollector<T> or IAsyncCollector<T>
  • CloudQueue

In this case, we have several choice. Cloud Queue Messages, ICollector<T> or IAsyncCollector<T>

Blob Storage

I also dynamically change the blob name to store. I need to store pdf file. However, I want to decide the pdf file name in run time. We can also use Binder in this case as well.

 // Upload to the Blob Storage
 var guid = Guid.NewGuid();
 var path = $"doc/{guid.ToString()}.pdf";
 var blobAttribute = new BlobAttribute(path, FileAccess.Write);
 blobAttribute.Connection = "BlobStorage:Connection";
 
 using (var writer = await binder.BindAsync<Stream>(blobAttribute))
 {
  await writer.WriteAsync(pdfBytes, 0, pdfBytes.Length);
 }

According to the Blob Storage input-output bindings, we can use these types.

  • out string
  • TextWriter
  • Stream
  • ICloudBlob
  • CloudBlockBlob
  • CloudPageBlob
  • CloudAppendBlob

I use Stream for this time. In this case, Stream or TextWriter might be handy. :)

2017/12/18 added

The Azure document team update this section. This is very useful for dynamic binding. :)