Everything You Need to Know About Azure Service Bus Brokered Messaging (Part 1)

Distributed Systems through Azure Service Bus

The ability to share data between decoupled systems is not a problem that is easily tackled.  A distributed system is not a new concept but is a hot topic that continues to grow in popularity with the ever growing need to share data.  As information continues to be the nucleus of our applications, the need to share and the way in which that information must be dispersed will continue to pose challenges.

Microsoft Azure’s Service Bus is a cloud service that helps facility the ability to share data between decoupled systems.  In this article, we are going to learn how to leverage Azure’s Service Bus with Brokered messaging to distribute data between systems.  However, if you are at all familiar with Azure services that provide support for distributed systems, you’ll know that Service Bus is not the only service of its type.  Azure’s Queue Storage service also provides similar functionality and facilitates the ability to share data between distributed systems.  So what queue service is right for you?  These questions as well as the following topics are all areas we will be covering.

 

What is Service Bus

Simply put, Service Bus is the second message queuing platform build by Azure that provides Relay and Brokered messaging capabilities.  It is a feature rich and matured service that can provides a means for decoupled systems to exchange information independently.

Azure’s Service Bus is one of the many Platform as a Service (PaaS) services and can be as simple as a single queue or highly complex message workflow with a near infinite number of interrelated queues, topics and subscriptions.

Service Bus vs. Queue Storage

As I have already pointed out, Service Bus is not the only message queue service Azure offers nor was it the first. But there are significant differentiating features between Service bus and Queue Storage, Azure’s first message queuing service.  We are going to be doing a deep dive on the features Service Bus with brokered messaging offer, but it’s important that you are aware of when to use which service.

Microsoft has provided a compare and contrast document to help you make that decision.  However, with the limited reasons for using the Queue Storage service, we can quickly summarize when Queue Storage would be the best chose.  If you are in need of the least complex approach and your application meets the following needs, Queue Storage would be the best chose:

  • Needs to retain more than 80 GB of data in queue
  • Message time to live less than 7 days
  • The ability to track message processing within a queue*

 

As you can see, message storage size and time to live are the two key differentiating points of Azure Storage Queue service. Naturally, the data retention aspect is of the Storage Queue service is due to the underlying Storage service. So, unless one of these points is a critical requirement, the Service Bus service is going to be a more feature rich and versatile option.

However, within the Service Bus service, there are more than one messaging capability. This article is focused on Service Bus with brokered messaging. But, brokered messaging is not the only messaging capability that Service Bus offers. Relay is another option that you will read about when investigating Azure’s Service bus, so let’s take a quick moment to distinguish the two.

 

Brokered vs. Relay Messaging

So far we have only mentioned “brokered” messaging with Azure Service Bus.  But this is not the only messaging capability provided by Service bus. Instead of the pattern of queuing messages that we have been alluding to so far, Relay messaging provides the ability to “bounce” a message off of a service to an connected receiver.  It requires that the receiver expecting the message is online and available.  A strong point of Relay messaging is the ability to expose the service’s endpoint without the typical network firewall and infrastructure configuration hoop-jumping to make it available to external clients.

However, durability is not a strong point of Relay as it is with Brokered messaging.  Brokered messaging supports the scenario of truly temporal decoupled systems where either message producer or consumers availability is not guaranteed.  Therefore, messages that are not immediately delivered must live somewhere and that is where the “broker” comes into play.  With Brokered messaging, the queue is the broker that retains a message created by a producer and where the consumer can retrieve the message when ready.

So while the exposure of service endpoints is one of Relay messaging strong points and queues provides the durability of Brokered messaging, queues coming more than one flavor.  Therefore,  we need to look at the different queue options before we get to the implementation of Service Bus Brokered Messaging.

 

Queues vs. Topics and Subscriptions

This can be confusing for someone just getting introduced to Service Bus Brokered Messaging, so I want to try and make this as clear as possible.  First off, don’t lose sight of the fact that at the end of the day we are always talking about queues.   A Service Bus Queue provides the simplest message delivery option.  Messages in a Queue are organized by first in, first out (FIFO) and each message is expected to be processed by a single consumer.  I like to visualize Queues as a single tube where a message is feed into tube and is consumed by a single consumer on the other end.

queue_message_full

 

However, Topics and Subscriptions constitute a publish/subscribe pattern allowing the same message to be processed by N number of consumers.  Subscription Rules and features like Auto-Forwarding allow for a tree like visualization of how this process works.

topic_message_full

We will be getting into the details of the above, but at a high level, a single message can be added to a topic and for every subscription rule that is satisfied, a copy of the message will be added to that subscription.  In this case, each subscription becomes the queue, where consumers can process the messages on a subscription individually.

 

Getting Started with the Building Blocks

The end result that we are aiming for is producing messages that consumers can consume.  But in order to get there, we have to start with the building blocks.  One of the main building blocks that you will use for a bulk of your direct and indirect interactions with Service Bus is the NamespaceManager object.

The NamespaceManager facilitates the ability to manage and create core entities such as queues, topics and subscriptions through factory methods just to name a few. However, NamespaceManager has a couple dependencies that we need to provide, so let’s discuss those first.

Those dependencies that we need to materialize are the following:

  • Service Bus Namespace (used by the Service Bus Service Uri)
  • Token Provider
  • Service Bus Service Uri

 

A service bus namespace is exactly what it sounds like and ultimately defines our own personal namespace within our Service Bus service.  This first dependency will require being created from within the Azure portal under “Service Bus”.

 

create-namespace

 

You’ll need to specify the namespace (make note of this) in which your service bus service endpoint will be known and also information such as the region where it will be hosted.  Finally, there is the important note of Basic vs. Standard Messaging Tier.  For an understanding of each you can check out this Azure documentation and can impact ability to host Topics and Subscriptions.

With the Namespace created, we can focus on the last two dependencies; Token Provider and Service Bus Service Uri.  Token Providers provide the authentication mechanism that the NamespaceManager will use.  There are a few out-of-the-box providers that can be used and will be looking at some of these in more details when we get to security.  For now we are going to use the Shared Access Signature Token Provider and specify the default Shared Access Signature (SAS) policy name and key. These two pieces of information can be found under the “Configure” section of your newly created Service Bus Namespace in the Azure Portal.  With the policy name and key we can compose a Token Provider:

TokenProvider tokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(AccountInfo.PolicyName,
AccountInfo.Key);

NOTE: AccountInfo simply supplies the proper Shared Access Signature policy name and key.

The Service Bus Uri provides the NamespaceManager the service endpoint that operations will be operating against. This is where our earlier created Namespace will be utilized.  A service endpoint such as mynamespace.servicebus.windows.net can be created using one of the static methods from the ServiceBusEnvironment class and specifying our namespace and the protocol.

//”sb” defines the scheme of the service Uri
Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri("sb", "shopit", string.Empty); 

 

[alert type=”info” icon=”icon-help-2″]With the Visual Studio Azure Tools, many of the Azure Service Bus project templates through the use of the Azure Service Bus Configuration NuGet package.  But, understanding these underlying dependencies will help facilitate your own custom implementation when needed.[/alert]

 

Creating the Namespace Manager

Dependencies in hand, we can now generate our NamespaceManager:

NamespaceManager namespaceManger = new NamespaceManager(serviceBusUri, tokenProvider);<br>

Optionally, there is also an override that will allow us to specify a NamespaceManagerSettings object that allows us to further specify additional specifications such as operation timeout periods as well as a retry policy (i.e. exponential retry policy or a custom retry policy).

NamespaceManagerSettings settings = new NamespaceManagerSettings {OperationTimeout = new TimeSpan(0, 1, 0), TokenProvider = tokenProvider};
NamespaceManager namespaceManger = new NamespaceManager(serviceBusUri, settings);&nbsp;

 

[alert type=”info” icon=”icon-help-2″]I discussed implementing a custom retry policy to track when retries actually occur in this article that utilizes the Enterprise Transient Fault Application Block.  We’ll look at Retry policies later.[/alert]

 

Asynchronous Approach

One of the highly referenced best practices is to utilize an asynchronous development approach to your Service Bus operations.  This can easily be done by taking advantage of the asynchronous methods provided to you for a majority of your operations.  You will find that the library provides you 2 different asynchronous patterns to choose from which include the Asynchronous Programming Model or the Task-Based Asynchronous Pattern.   We’ll talk more about this when we get to Best Practices. To try and stay true to the best practice as well as the fact that most demonstrations are done synchronously, here most examples will be delivered asynchronously (pun intended)!

[alert type=”info” icon=”icon-help-2″]In order to intercept and handle thrown exceptions by asynchronous operations, you would at minimum, wrap your calls in a Try/Catch.  However, unless demonstrating error handling, we will exclude that code in order to focus in on the demonstrated operation.[/alert]

 

Service Bus Queues

With our NamespaceManager created we have laid the foundation for the bulk of operations we are going to look at regarding Service Bus Brokered Messaging.  I have already explained the two message delivery options that Service Bus offers and we are going to start off with the simplest route by looking at utilizing Service Bus Queues.

Though Queues don’t support message filtering that makes Topics and Subscriptions so unique, they do support, sessions, duplication detection, message deferral, dead lettering and message expiry just to name a few.  Even though allot of these features are also supported by Topic and Subscriptions; I have decided to review both Queues and Topics/Subscriptions separately.

 

Creating Queues

Creating a queue is the first step to allowing us to store and retrieve messages. Queue names are always created in lowercase despite what casing you provide for your queue name.

QueueDescription queueDescription = await namespaceManager.CreateQueueAsync(queueName);&nbsp;

To demonstrate a few other utility methods, we can first attempt to verify if the queue already exists and return the existing queue information if it does:

QueueDescription newQueueDescription = null;
if (!await _namespaceManager.QueueExistsAsync(queueName))
{
    newQueueDescription = await _namespaceManager.CreateQueueAsync(queueName);
}

QueueDescription queueDescription = newQueueDescription ?? await _namespaceManager.GetQueueAsync(queueName);

CreateQueueAsync override takes a QueueDescription object that allows you to specify values outside of the defaults that a queue is constructed with.  I was reluctant to discuss the many different queue properties this early on, but since a number of them can’t be updated after the creation of the queue, we might as well discuss them now.

 

Queue Properties

Queues contain a number of properties that directly affects the queue while others might pertain to the Brokered Messages it would host.  No all properties can be updated or changed after a queue is created, so if an update is required be mindful of these restrictions.  The below list is not an exhaustive list, but are some of the more common properties that you will find you are interested in:

1. DefaultMessageTimeToLive – This is an interesting one.  This dictates the time to live of a message in two scenarios 1) if the message does not have its direct TimeToLive property value set or 2) when the messages TimeToLive is greater than the queue’s DefaultMessageTimeToLive property.  However, if the message’s TimeToLive value is lower, the message’s TimeToLive will be the time at which the message will expire.

2. RequiresDuplicationDetection – Allows you to turn on message duplication detection.  This works in conjunction with DuplicationDetectionHistoryTimeWindow. This is one of those values that must be set at creation and cannot be updated afterwards.

3. DuplicateDetectionHistoryTimeWindow – You can specify a time period that the queue will retain message ID’s in order to carry out message duplication detection.  The time can be no greater than the maximum time a message can live on a queue which is 7 days.
[alert type=”info” icon=”icon-attention-2″]Be aware that there is an overhead with this feature.  A message ID that must be retained by the queue has an overhead of 64 bytes.  Therefore, a simple example would be a queue that processes 1000 messages would require at 64,000 bytes of overhead.  This might not sound like much, but crunch the numbers if you were sending 30 messages a second and attempted to retain the DuplicateDetectionHistoryTimeWindow for the max of 7 days. The overhead would be ~1GB.[/alert] 4. EnableDeadLetteringOnMessageExpirationAllow your messages to be moved to the deadletter queue when either the queue’s DefaultMessageTimeToLive or the messages TimeToLive is reached first and the message expires.

5. LockDuration – When we get to message handling, we’ll talk more about PeekLock, but this is the setting that sets how long a message is locked for handling before it is released back onto the queue.

6. MaxSizeInMegabytes – The total size of the queue.  The default is 1GB.  This is adversely affected by the overhead produced by the DuplicationDetectionHistoryTimeWindow.

7. AutoDeleteOnIdle – This is actually a timespan which denotes how long a queue can stay alive once it is idle before it is automatically deleted.  The minimum time is 5mins.

 

Update Queues

Certain queue properties can be updated after its creation, while others cannot.  Below we can see how we can utilize a QueueDescription object to update certain properties of an existing queue:

QueueDescription newQueueDescription = new QueueDescription("TestOperationQueue")
{
    DefaultMessageTimeToLive = TimeSpan.FromDays(3),
    AutoDeleteOnIdle = TimeSpan.FromHours(1),
    MaxSizeInMegabytes = 2048
};

QueueDescription&nbsp; queueDescription = await _namespaceManager.UpdateQueueAsync(newQueueDescription );&nbsp;

Here we have updated the queue properties to not allow message time-to-live to exceed 3 days, set the queue’s own time-to-live when idle to 1 hr. and the max queue size to 2GB.

[alert type=”info”]Deletion of a queue with AutoDeleteOnIdle does indeed delete even with messages in queue.  Read: a queue idle time starts on the last enqueuing or dequeuing of messages.[/alert]

 

Delete Queues

Delete does not throw and exception in the case that the queue does not exists.

_namespaceManager.DeleteQueueAsync(queuePath);

We can set the AutoDeleteOnIdle queue property to auto delete itself after it has went idle and the set time period has expired.

 

Sending Messages

Now that our queue is in place, we can exercise sending messages to the queue.  In order for that to be possible we need to generate a facilitator for sending messages.  When discussing strictly Service Bus Queue’s as we are, there are only 2 options (QueueClient and MessageSender).

It is important to understand that the MessageSender is an abstraction for the QueueClient and should be utilized unless it is absolutely necessary to get to functionality specific to a QueueClient.  We will see the benefits of utilizing the MessageSender over a QueueClient when we get to the subject of Topics.

We can create a MessageSender from one of the factory methods of a MessagingFactory. However, the queue author is not always the same entity that sends messages to the queue. Therefore, our MessagingFactory also requires a proper service bus endpoint and token provider just as our NamespaceManager did.

Earlier, when we generated our NamespaceManager we could have created it with a connection string instead of manually generating the two dependencies Service Bus Uri and Token Provider.  Therefore, for demonstration purposes, we’ll look at generating a MessageFactory using a connection string that we can acquire from our Azure portal.  You can acquire your service bus namespace’s Connection String by clicking on “Connection Information” at the bottom of the Azure portal after you have selected the “Service Bus” service icon.

conenctionStrings

 

The connection string is parsed to supply the Service Bus Uri (endpoint) and Token Provider (shared access signature) for the MessageFactory when we utilize the FromConnectionString method.

MessagingFactory factory = MessagingFactory.CreateFromConnectionString(AccountInfo.ConnectionString);
MessageSender messageSender = factory.CreateMessageSender(QueueName);&nbsp;

With a MessageSender we are prepared to send messages

//Some object
SingingTelegram singingTelegram = new SingingTelegram
{
    RecipientFirstName = "Elvis",
    RecipientLastName = "Presley",
    SongName = "Won't you come back again?"
};

BrokeredMessage message = new BrokeredMessage(singingTelegram);

await messageSender.SendAsync(message);<br>

But hold on!

 

What Kind of Message Are You Sending?

Before we can exercise the “distribute” in Distributed System and start flinging messages at our queue, we need to have an understanding of what constitutes a message.  In our case, its simply.  We’re sending a BrokeredMessage.

But there is allot to understand about a BrokeredMessage that can be broken down to characteristics and features.  Allot of the features will be discussed under specifics areas, but for now I wanted to stop and discuss some of the characteristics you need to be aware of when it comes to BrokeredMessages.

First off, a message at its heart is made up of a body and properties. The total max size of a message is 256kb.  The max size of all properties is 64kb.  The max size of the body is the remainder of the max size of the message and current size of the properties.   There are ways to increase the max size (i.e. sessions) but were not discussing that at the moment.

 

[alert type=”info”]Due to certain protocol limitations such as HTTP header sizes, it is recommended that you keep your custom properties between 2 – 4 kb.[/alert]

 

Body

The body is simply any serializable object and would constitute the payload of the message.  Whether you need to use the body of the message is up to you.  You will see that there are a number of different BrokeredMessage constructors.  These are generally all setting the body of the message as in:

BrokeredMessage msg1 = new BrokeredMessage("My Message Body");
BrokeredMessage msg2 = new BrokeredMessage(new SingingTelegram());&nbsp;

Properties

Properties are made up of two parts, system and custom properties.  The custom properties are simply a key/value pair collection that can be any string / object.  They allow us to transport specific custom properties at the header level of our message.  However, custom properties really come into play when we get to the discussion of subscription rules when we get to Topics and Subscriptions.

Another way to think about message properties in general is when discussing a message flowing between different protocols (HTTP/S, AMQP, SMB).  A message originating from HTTP/S request would end up mapping request headers to message properties.

When we want to promote data to the header level of a message we can utilize the custom Properties property as easily as:

BrokeredMessage message = new BrokeredMessage(singingTelegram);

foreach (object info in importantInformation)
{
    message.Properties.Add(info.Key, info.Value);
}

There are a considerable amount of important system properties on a BrokeredMessage that you can adjust.  Many of these we’ll touch on in specific features.  However, a few worth mentioning are:

1. ScheduledEnqueueTimeUtcIf you wanted to delay the visibility of a message on a queue, you can set a future UTC time in which the message will be unavailable until the specified UTC time.

2. ExpiresAtUtc – If set, will be a specific UTC time in which the message will expire.

3. TimeToLive – Similar to ExpiresAtUtc, this is an amount of time that must pass after the message has been enqueued that the message will expire once.  Both this property and the ExpiresAtUtc directly affect when a message is expired, one at a set time, while the other after an amount of time.

Again, this is not an exhaustive list and it would benefit you to be aware of some of the other influential properties such as how MessageId is used for message duplication, or optional properties such as Label and ContentType can be used.

 

Receiving Messages

Namespace established.  Check.  Queue created.  Check.  Messages sent.  Check.  The obviously last piece of this simple scenario is the other aspect of a distributed system doing something with the messages we have enqueued.  As with a MessageSender we also want to use the MessageReceiver abstraction for getting messages.

To generate a MessageReceiver, we would need a MessagingFactory and you can refer back to Sending Messages for reference.

MessageReceiver messageReceiver =&nbsp;
await _messagingFactory.CreateMessageReceiverAsync(queueName);<br>

From here we can start to receive messages.  We can utilize the Timespan parameter of the ReceiveAsync to specify how long to poll before giving up.  In addition, the following example will do a continuous loop until the cancelation token is canceled.

while (!cancellationToken.IsCancellationRequested)
{
    //Wait up to 1 minute for a message
    var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
    await ProcessAndReleaseMessageAsync(msg);
    await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
}

private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)
{
    MessageProcessingAction action = MessageProcessingAction.Abandon;
    try
    {
        //Process message
        action = MessageProcessingAction.Complete;
        await UpdateMessageState(message, action);
    }
    catch (Exception ex)
    {
        //log
    }
    finally
    {
        //C# 6.0 allows await calls in a finally blocks
        UpdateMessageState(message, action);
    }
}

private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)
{
    switch (action)
    {
        case MessageProcessingAction.Complete:
            await message.CompleteAsync();
            break;
        case MessageProcessingAction.Abandon:
            await message.AbandonAsync();
            break;
        case MessageProcessingAction.Deadletter:
            await message.DeadLetterAsync();
            break;
        default:
            await message.AbandonAsync();
            break;
    }
}

Simply put, we wait up to 1 min for a message to be pulled from the queue.  Upon successfully pulling a message, the message is processed and then we explicitly notifying Azure Service Bus with the state of the message by calling CompleteAsync or AbandonAsync.  This is because of a default mode of the Queue where messages received are received in a PeekLock mode.

PeekLock specifies that a message is only temporarily checked out for some duration of time to be processed.  Unless the message is checked in as complete will it be removed from the queue.  There are other consequences that would directly affect a message being removed from the queue such as time to live setting of the queue or message.  But as far as message handling by our receiver, we will need to complete, abandon or dead-letter the message.   We’ll talk more about dead-letter queue and moving messages to there later.

As far as the example above, a couple of points; obviously, a switch statement could be more elegantly refactored, but would also obscure what I wanted to demonstrate about updating the message state so that the queue can properly handle it.  Whether that means leaving it on the queue in the case where our attempt to process it failed, or setting it to be complete so the queue removes it.

PeekLock is not the only mode that messages can be received.  Let’s look at message receive modes as well as other Queue behaviors and features I have withheld up until now.

 

Dealing with Queues, Again!

I have purposely tried not to over complicate queues with all the different features.  But before closing out Part 1 on Service bus, I did want to return to talk about a few important points that are worth noting.

As I already mentioned, there are 2 modes of message retrieval, PeekLock and ReceiveAndDelete (technically there is a 3rd Session + Peek, but not in the scope of this discussion).  We have already discussed PeekLock, so RecieveAndDelete is a less durable option as it removes the message from the queue when it is retrieved.  Therefore, as you might expect, any exception in your message processing that you have not accounted for can cause a loss of that message.  We can change this when creating our MessageReceiver:

MessageReceiver messageReceiver =&nbsp;
await _messagingFactory.CreateMessageReceiverAsync(queueName, ReceiveMode.ReceiveAndDelete);

As mentioned under the queue properties that we can set the LockDuration of a message when received under the PeekLock mode which dictates how long a message can be retrained by a receiver before being released back on the queue to be processed.  This is at a default of 1 min., but can be set to a max 5 min.   In the case that a message’s LockUntilUtc property shows it is about to expire, we can request a renewal of the message lock:

await msg.RenewLockAsync();<br>

Queues have two modes of delivery, Pull and Forward.  So far we have seen pull when our MessageReceiver pulls on request to the queue to receive a message.  But we can forward messages by setting the ForwardTo property of our queue so that any messages delivered will automatically be forwarded onto another queue.

QueueDescription queueDescription = await namespaceManager.CreateQueueAsync(new QueueDescription(QueueName)
{
    ForwardTo = destinationQueueName
});

 

Part 1 Conclusion

To recap, we learned allot of the differences that make up the Azure Service Bus, how to manage all aspects of Queues, as well as how to send and receive messages through Queues.  Now, we have already covered allot with the Service Bus, but there is allot more to cover.  Don’t worry though, I’ll let you take a breather and return in Part II to cover topics such as Topics and Subscriptions, Dead Lettering, Auto Forwarding, Transactions, Batch processing, Security and Service Bus best practices.

 

About the author

Max McCarty

Max McCarty is a software developer with a passion for breathing life into big ideas. He is the founder and owner of LockMeDown.com and host of the popular Lock Me Down podcast.