Everything You Need to Know About Azure Service Bus Brokered Messaging (Part 2)

Azure Cloud Provide Distributed System Messaging

This is part 2 on Azure Service Bus Brokered Messaging and you can find part 1 here.  In the first part we concentrated strictly on how to utilize Service Bus Queues.  Continuing in part 2, due to the sheer breadth of information, our main focus will be on features and strengths of Service Bus Topics and Subscriptions which by now, you should know are different that Queues.

 

Topics

topic_message_full

Now truly it is hard to talk about Topics without Subscriptions because of their close relation and in almost all examples that is exactly what you will see.  But, despite their differences, Topics share a number of similarities to a Service Bus Queue.  So from that standpoint since you already familiar with Service Bus Queues, we’re going to talk about Topics and then introduce Subscription.

Topics just like Queue’s provide a destination to send messages.  But Topics in combination with Subscriptions provide a publish and subscribe pattern for distributing messages.  This is unlike the island view of Queue’s where a queue is both the receiver and retainer of messages it receives.

Simply put, Topics provide the publisher side of a publish/subscribe pattern.  From a 30,000 foot view they don’t seem all that different from Queue’s as they provide a destination to send messages.  However, topics, just like queues, must still be created, managed and sent messages.

Many of the management operations for Topics are utilized and executed in the same fashion as Queues and we can use the same NamespaceManager that we saw in part 1 to facilitate our CRUD operations.

Creating Topics

Creating topic can be as straight forward as giving it a name:

NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(AccountInfo.ConnectionString);
TopicDescription newTopicDescription = await _namespaceManager.CreateTopicAsync(topicName);

Or a topic can be created from a TopicDescription where we define characteristics of a topic.

TopicDescription description = new TopicDescription(topicPath)
{
    SupportOrdering = true,
    AutoDeleteOnIdle = TimeSpan.FromHours(8),
    DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)
};

TopicDescription newTopicDescription = null;

//If not exists, we'll create it
if (!await namespaceManager.TopicExistsAsync(description.Path))
{
    newTopicDescription = await namespaceManager.CreateTopicAsync(description);
}

//Or we'll retrieve it if it already exists
TopicDescription topicDescription = newTopicDescription ?? await namespaceManager.GetTopicAsync(description.Path);

With a more contrived example we can demonstrate some of the utility methods available for checking if a topic exists as well as retrieving details about an existing topic.

 

Updating Topics

As with Queues, we can also update properties of an existing Topic by providing a TopicDescription.  However, there are specific properties that can’t be changed after creation such as RequiresDuplicateDetection

TopicDescription description = new TopicDescription(topicPath)
{
    SupportOrdering = true,
    AutoDeleteOnIdle = TimeSpan.FromHours(8),
    DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)
};

await namespaceManager.UpdateTopicAsync(description);

 

Deleting Topics

When we delete a topic that it doesn’t exist, it will not throw an exception:

await namespaceManager.DeleteTopicAsync(topicName);

 

Sending Messages to Topics

Now, anyone remotely familiar with how Topics and Subscriptions work will be wondering why on earth I want to talk about sending messages to topics before I have even introduced Subscriptions.  Its simple, up to this point including sending messages to topics, not much has to change and it is easy to get a feel for Service Bus Topics without being bogged down with what makes them different.  Stick with me, we’ll be there momentarily.

You probably don’t recall that I stated back in part 1 that the MessageSender is an abstraction for the QueueClient and using the abstraction would have a payoff?  Well, it’s also an abstraction for a TopicClient as well.  A TopicClient is the direct client for sending messages to a Topic.  Therefore, if you can use a MessageSender instead of the relevant client, then your application doesn’t have to care whether it is sending messages to a Queue or a Topic and therein lies the payoff of using the MessageSender.

You can refer back to our Queue example of creating a MessageSender to send messages, and sending one is as easy as it was for sending to a Queue.

await messageSender.SendAsync(new BrokeredMessage("SimpleMessage"));

However, if you’re testing this, you’re in for a big surprise, so I’ll save you the loss of hair (something I good at loosing!).  When a topic doesn’t have a registered subscription, a message is automatically discarded.   So, it appears this would be the opportune time to introduce Subscriptions.

 

Subscriptions

As a reminder, Topics and Subscriptions together represent a publish and subscribe pattern (see the previous topics/subscription diagram).  While the topic represents the publisher, a subscription represents a subscriber. When a subscription is created a topic is provide that it will be subscribe to in order to receive messages the subscription is interested in. In the simplest form, messages incoming to a topic are copied and forwarded onto each subscription who’s subscription policy (we call a rule) is satisfied.

I like to equate this relationship between topics and subscriptions to an airport arrival scenario.  Imagine if you will, a topic represented by an airport.  You, as the message arrive at the airport and make you way through baggage claim and arrive where the limousine drivers are standing holding signs with different people’s names.  These drivers represent the subscription policies (rules).  Assuming you find a limousine driver who is holding a sign with your name, s/he escorts you to their limousine (subscription) where you then reside in the limousine and escorted off to some great, all-inclusive vacation and no longer residing at the airport.  Well, what if you didn’t find a driver holding a sign with your name on it?

Have you ever seen the movie The Terminal with Tom Hanks?  Where Viktor Navorski (Tom Hanks) is trapped at the JFK International airport because he is denied entry to the U.S. and at the same time can’t return to his native country due to a revolution?  Well, unlike Viktor who has a place to stay at (The Terminal), when a message arrives at a topic that does not satisfy any associated subscription’s rule, the message is discarded.

So let’s take a closer look at what all these characteristics of subscriptions and how they tie into a Topic.

Creating a Subscription

When we create a subscription, at a bare minimum we associate it to a topic:

SubscriptionDescription switftSubscription = await namespaceManager.SubscriptionExistsAsync(“singingtelegrams”, “TaylorSwift”);

In addition, we can set some properties of the subscription by passing in a SubscriptionDescription

SubscriptionDescription subscription = new SubscriptionDescription((“singingtelegrams”, “TaylorSwift”)
{
    EnableDeadLetteringOnFilterEvaluationExceptions = true,
    EnableDeadLetteringOnMessageExpiration = true,
    EnableBatchedOperations = true,
};

SubscriptionDescription bieberSubscription = await namespaceManager.CreateSubscriptionAsync(subscription);

In the above examples we providing the path of the existing topic as well as the name for the subscription we are creating.  In this fashion, these subscriptions have no explicit rule defining what messages they should receive.  Therefore, they have subscribed to receive all messages sent to the “singingtelegrams”.  When any message is sent to the “singingtelegram” Topic, both the TaylorSwift and JustinBieber subscriptions will receive a copy in their virtual queue.

[alert type=”info” icon=”icon-help-2″]Currently the limitation on number of subscriptions that are subscribed to a Topic is 2000.[/alert]

Deleting a Subscription

The only slight difference with the deletion of a Queue and a Subscription is that we also must provide the Topic Path that the subscription is associated with as well as the name of the subscription.

await namespaceManager.DeleteSubscriptionAsync(topicPath, subscriptionName);

 

Update a Subscription

Again, just as with Queues and Topics, there are certain properties that must be set at creation time such as RequireSession, but we can provide a SubscriptionDescription instance to update properties of our Subscription:

SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName)
{
    DefaultMessageTimeToLive = TimeSpan.FromHours(8),
    LockDuration = TimeSpan.FromSeconds(90)
};

SubscriptionDescription updatedSubscription = await namespaceManager.UpdateSubscriptionAsync(subscription);

Recall back in part 1, I had stated that in the end we really are still dealing with queues?  Well, Microsoft does define a Subscription as a virtual queue.  Copies of messages that are sent to the subscription reside there and are retrieved as they are with a Service Bus Queue.

 

Retrieving Messages on a Subscription

As in the case of using the abstract MessageSender, if we use the abstract MessageReceiver we don’t have to worry about whether it is a Service Bus Queue or a Subscription that we are pulling messages from.   Below is the exact code from receiving messages from queues in part 1 with the exception of a single parameter passed in for the CreateMessageReceiverAsync method:

MessageReceiver messageReceiver =  await _messagingFactory.CreateMessageReceiverAsync("singingtelegram/subscriptions/TaylorSwift");

try
{
    while (!cancellationToken.IsCancellationRequested)
    {
        var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
        if (msg.LockedUntilUtc == DateTime.UtcNow) await msg.RenewLockAsync();
        await ProcessAndReleaseMessageAsync(msg);
        await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
    }
}
catch (Exception ex)
{
    //log error;
    throw;
}

private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)
{
    MessageProcessingAction action = MessageProcessingAction.Abandon;

    try
    {
        //Process message
        action = MessageProcessingAction.Complete;
    }
    catch (Exception ex)
    {
        //log
    }
    finally
    {
        //if something fails update with abandon
        //C# 6.0 allows await calls in a finally blocks
        UpdateMessageState(message, action);
    }
}

private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)
{
    switch (action)
    {
        case MessageProcessingAction.Complete:
            await message.CompleteAsync();
            break;
        case MessageProcessingAction.Abandon:
            await message.AbandonAsync();
            break;
        case MessageProcessingAction.Deadletter:
            await message.DeadLetterAsync();
            break;
        default:
            await message.AbandonAsync();
            break;
    }
}

What we do have to care about is the string we provide the MessagingFactory.CreateMessageReceiverAsync(entityPath) method.  In our Queue example, we just provide a string of the queue path.   But, in this case we need our subscription path in the form of <Topic Path>/subscriptions/<subscription name> as shown by “singingtelegram/subscriptions/TaylorSwift”.

Upon receiving any messages, we can follow the exact same process we did in our Queue example for updating a Message by calling Abondon, Complete or DeadLetter.   I’ll let you review the message receiving details back in part 1.

Until now, the only compelling reason to use Topics and Subscriptions over Queues has been the ability to provide copies of a single message to multiple interested parties (subscriptions).  But that is only one compelling feature of Topics and Subscriptions.  Another compelling feature is message routing and the ability to control what messages are routed to what subscriptions.

We can provide message routing through Rules.  Now, just because we didn’t define a rule for the subscriptions we already created, doesn’t mean a rule doesn’t exist.  Indeed all subscriptions at a minimum have one rule.

 

Rules, Filters and Actions Oh My!

It is here that we have reached the zenith of what makes Topics and Subscriptions so powerful.  A rule in world of subscriptions define what messages should be copied to a particular Subscription.  Each subscription has access to all messages received by a Topic and individually can filter out what messages belong to it.  This filtering is done through the Rule’s Filter which we’ll talk about in depth.

In addition to filtering, when a message is filtered out by a Subscription because the message met the rule’s condition, we have the opportunity to carry out actions against the message.  These actions include adding, removing and updating custom properties and changing system property values.  This is all defined by the rule’s action .  Therefore, it really is the filter and actions that make up a rule.

[alert type=”info” icon=”icon-help-2″]On a subjective side note, a rule resembles more of a policy than a rule.  Where a rule defines not only the condition(s) (filters) but also what should take place when a condition is met smells more of a policy.  I only state that in case someone has an easier time conceptualizing a rule as a policy.[/alert]

Another way to state the above verbose explanation is that a subscription filters out messages that meets a rule’s conditions and optionally applies actions against the properties of a message when the filtered message is copied to the subscription.  So let’s see examples of all these:

 

Rule Filter

In the .NET landscape, a subscription rule is in the form of a RuleDescription and defines the rule’s Filter and Action.  When we created our subscriptions, an available override to pass in a RuleDescription is available:

RuleDescription rule = new RuleDescription
{
    Name = "ForwardToTaylorSwift",
    Filter = new SqlFilter("RequestedSinger = 'Justin Bieber’ OR RequestedSinger = 'Taylor Swift’")
};

SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName);
SubscriptionDescription description = await namespaceManager.CreateOrRetrieveSubscriptionAsync(subscription, rule);

So here is our first introduction to a Rule and its filter.  As the filter’s type implies, it allows for defining a SQL92 standard expression in its constructor that will govern what messages are filtered out.   We can read this as, all messages received at the “singingtelegram’ topic that have a custom property “RequestedSinger” with the value “Justin Bieber”  or “Taylor Swift” will be filtered out to the “TaylorSwift” subscription.

So if we were to send a BrokeredMessage that matches the above rule to our ‘”singingtelegram” topic:

BrokeredMessage message = new BrokeredMessage();
message.Properties.Add("RequestedSinger", "Justin Bieber");

await messageSender.SendAsync(message);

But, this doesn’t exclude other subscriptions from also receiving this message as well.  Recall, that I mentioned earlier that all subscriptions have a rule.  When we don’t explicitly define a rule as the case was in our original subscription creation examples, a default rule is provided for us that has a default filter.  Since the rule condition is defined in the rule filter, let’s take a look at what out-of-the-box filters that are available:

· SQLFilter – The filter that a number of other filters derive from such TrueFilter and FalseFilter

· TrueFilter – This is the default filter provided through the default rule that is generated for us when a rule or filter is not explicitly provided when creating a subscription.  Ultimately, this generates the SQL92 expression 1=1 and subscribes to receive all messages of the associated topic.

· FalseFilter – The antithesis of a TrueFilter that generates a SQL92 expression of 1=0; a subscription with this filter will never subscript to any messages of the associated topic.

· CorrelationFilter – This filter subscribes the subscription to all messages of a particular CorrelationId property of the message.

[alert type=”info”]Be aware that the comparison values in your SQL expression are case sensitive, while the property names are not (e.g. “RequestedSinger = ‘Taylor Swift’” is not the same as “RequestedSinger = ‘taylor swift’”)[/alert]

Adding Rules

Subscriptions are not limited to one rule however.  We can add additional rules to an existing subscription.  However, this is a perfect example of where the MessageReceiver abstraction class can’t fulfill our needs and we’ll need to turn to the SubscriptionClient.

SubscriptionDescription subscription = await _operations.CreateOrRetrieveSubscriptionAsync(topicPath, subscriptionName);

//MessagingFactory as demo'd in part 1
SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient(subscription.TopicPath, subscription.Name);

RuleDescription ruleDescription = new RuleDescription("NeverGrowUpSongRule", new SqlFilter("RequestedSong = 'Never Grow Up'"));

await subscriptionClient.AddRuleAsync(ruleDescription);

The first operation is simply a personal wrapper that returns a verified SubscriptionDescription.  With it verified, we can acquire a MessagingFactory (see Sending Messages in part 1) that will allow us to create a SubscriptionClient that will in turn allow us to add a rule to the existing subscription.

[alert type=”info”]Understand that every satisfied rule will generate a copy of a message.  That means if a subscription has 3 separate rules, a single message that satisfies all 3 rule conditions, will receive 3 separate copies of the message.  If you want a single message based on various conditions, those conditions need to be specified in a single rule filter (e.g. “RequestedSinger = ‘Justin Bieber’ OR RequestedSinger = ‘TaylorSwift’”).[/alert]

 

Quick Note on CoorelationFilter

The CoorelationFilter is a special filter for a specific use.  It is one of available ways to implement a coorelation pattern in which a subscription can retrieve all messages bound to the same CoorelationId of a message.  We’ll look at this in part 3 when we examine messaging patterns.

 

Rule Actions

Every rule has to have a filter; this is what defines the rule’s condition.  But, a rule does not have to have an action.  As mentioned earlier, a rule’s action allows us to manipulate the properties (both system and custom) on a message that meets the rule’s condition.

As we stated before, the rule is the container for both the filter and the action and we can define an action in our Action property of our RuleDescription:

RuleDescription ruleDescription = new RuleDescription("ChangeRequestedSinger")
{
    Filter = new SqlFilter("RequestedSinger = 'Justin Bieber'"),
    Action = new SqlRuleAction("SET RequestedSinger = 'Taylor Swift'")
};

Here we are creating a RuleDescription that will filter out any messages that has a RequestedSinger with the value of “Justin Bieber” and will change the value of that same property to “Taylor Swift” as defined by the “SET RequestedSinger = ‘Taylor Swift’” action.

If we were to add this rule, or as shown below, create a subscription with this rule:

SubscriptionDescription newSubscriptionDescription = await _namespaceManager.CreateSubscriptionAsync(topicPath, subscriptionName, ruleDescription );

Then send a message that satisfies the subscription’s condition (i.e. Filter = new SqlFilter(“RequestedSinger = ‘Justin Bieber'”)

BrokeredMessage message = new BrokeredMessage();
message.Properties.Add("RequestedSinger", "Justin Bieber");

await messageSender.SendAsync(message);

When we receive the message, we will find that the RequestedSinger custom property value will no longer be “Justin Bieber” but “TaylorSwift”

BrokeredMessage msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
message.Properties["RequestedSinger"] // ="Taylor Swift"

Manipulating custom properties is only one of the available features of the Action.  Adding custom properties or changing system properties is also possible.  Imagine you wanted a catch-all subscription that caught any messages without a defined RequestedSinger and created the RequestedSinger property and updated the system property ContentType.

var ruleDescription = new RuleDescription("ChangeRequestedSinger")
{
    Filter = new SqlFilter("RequestedSinger IS NULL"),
    Action = new SqlRuleAction("SET sys.ContentType = 'audio/mpeg'; SET RequestedSinger = 'Taylor Swift'")
};

With a Subscription with the above RuleDescription, any messages filtered will have the System property ContentTypeupdated to audio/mpeg and create a new custom property RequestedSinger. 

[alert type=”info”]Notice the prefix “sys” in the SQLRuleAction, is required to set the scope of the property to a System property.  By default, the scope is set to the User properties (custom properties) which is denoted by user.<property_name>[/alert]

So as you can see there is a lot of power in rule actions.  You can learn more about the syntax and available options for actions at Microsoft’s MSDN page.

Conclusion

Well as you can clearly see, Topics is a big “topic” and is inseparable from Subscriptions.  We’ve learned that two of the most powerful features in Topics and Subscriptions is the ability to distribute messages to multiple interested parties (subscriptions) and those parties are able to filter out what messages they are specifically interested in.  There is still a good bit to cover on the topic of Azure Service Bus.  So in the finally part, we’ll be diving into some of the more advanced features, patterns, security and best practices.

About the author

Max McCarty

Max McCarty is a software developer with a passion for breathing life into big ideas. He is the founder and owner of LockMeDown.com and host of the popular Lock Me Down podcast.