This is part 2 on Azure Service Bus Brokered Messaging and you can find part 1 here. In the first part we concentrated strictly on how to utilize Service Bus Queues. Continuing in part 2, due to the sheer breadth of information, our main focus will be on features and strengths of Service Bus Topics and Subscriptions which by now, you should know are different that Queues.
Topics
Now truly it is hard to talk about Topics without Subscriptions because of their close relation and in almost all examples that is exactly what you will see. But, despite their differences, Topics share a number of similarities to a Service Bus Queue. So from that standpoint since you already familiar with Service Bus Queues, we’re going to talk about Topics and then introduce Subscription.
Topics just like Queue’s provide a destination to send messages. But Topics in combination with Subscriptions provide a publish and subscribe pattern for distributing messages. This is unlike the island view of Queue’s where a queue is both the receiver and retainer of messages it receives.
Simply put, Topics provide the publisher side of a publish/subscribe pattern. From a 30,000 foot view they don’t seem all that different from Queue’s as they provide a destination to send messages. However, topics, just like queues, must still be created, managed and sent messages.
Many of the management operations for Topics are utilized and executed in the same fashion as Queues and we can use the same NamespaceManager
that we saw in part 1 to facilitate our CRUD operations.
Creating Topics
Creating topic can be as straight forward as giving it a name:
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(AccountInfo.ConnectionString); TopicDescription newTopicDescription = await _namespaceManager.CreateTopicAsync(topicName);
Or a topic can be created from a TopicDescription
where we define characteristics of a topic.
TopicDescription description = new TopicDescription(topicPath) { SupportOrdering = true, AutoDeleteOnIdle = TimeSpan.FromHours(8), DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0) }; TopicDescription newTopicDescription = null; //If not exists, we'll create it if (!await namespaceManager.TopicExistsAsync(description.Path)) { newTopicDescription = await namespaceManager.CreateTopicAsync(description); } //Or we'll retrieve it if it already exists TopicDescription topicDescription = newTopicDescription ?? await namespaceManager.GetTopicAsync(description.Path);
With a more contrived example we can demonstrate some of the utility methods available for checking if a topic exists as well as retrieving details about an existing topic.
Updating Topics
As with Queues, we can also update properties of an existing Topic by providing a TopicDescription
. However, there are specific properties that can’t be changed after creation such as RequiresDuplicateDetection
TopicDescription description = new TopicDescription(topicPath) { SupportOrdering = true, AutoDeleteOnIdle = TimeSpan.FromHours(8), DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0) }; await namespaceManager.UpdateTopicAsync(description);
Deleting Topics
When we delete a topic that it doesn’t exist, it will not throw an exception:
await namespaceManager.DeleteTopicAsync(topicName);
Sending Messages to Topics
Now, anyone remotely familiar with how Topics and Subscriptions work will be wondering why on earth I want to talk about sending messages to topics before I have even introduced Subscriptions. Its simple, up to this point including sending messages to topics, not much has to change and it is easy to get a feel for Service Bus Topics without being bogged down with what makes them different. Stick with me, we’ll be there momentarily.
You probably don’t recall that I stated back in part 1 that the MessageSender
is an abstraction for the QueueClient
and using the abstraction would have a payoff? Well, it’s also an abstraction for a TopicClient
as well. A TopicClient
is the direct client for sending messages to a Topic. Therefore, if you can use a MessageSender
instead of the relevant client, then your application doesn’t have to care whether it is sending messages to a Queue or a Topic and therein lies the payoff of using the MessageSender
.
You can refer back to our Queue example of creating a MessageSender to send messages, and sending one is as easy as it was for sending to a Queue.
await messageSender.SendAsync(new BrokeredMessage("SimpleMessage"));
However, if you’re testing this, you’re in for a big surprise, so I’ll save you the loss of hair (something I good at loosing!). When a topic doesn’t have a registered subscription, a message is automatically discarded. So, it appears this would be the opportune time to introduce Subscriptions.
Subscriptions
As a reminder, Topics and Subscriptions together represent a publish and subscribe pattern (see the previous topics/subscription diagram). While the topic represents the publisher, a subscription represents a subscriber. When a subscription is created a topic is provide that it will be subscribe to in order to receive messages the subscription is interested in. In the simplest form, messages incoming to a topic are copied and forwarded onto each subscription who’s subscription policy (we call a rule) is satisfied.
I like to equate this relationship between topics and subscriptions to an airport arrival scenario. Imagine if you will, a topic represented by an airport. You, as the message arrive at the airport and make you way through baggage claim and arrive where the limousine drivers are standing holding signs with different people’s names. These drivers represent the subscription policies (rules). Assuming you find a limousine driver who is holding a sign with your name, s/he escorts you to their limousine (subscription) where you then reside in the limousine and escorted off to some great, all-inclusive vacation and no longer residing at the airport. Well, what if you didn’t find a driver holding a sign with your name on it?
Have you ever seen the movie The Terminal with Tom Hanks? Where Viktor Navorski (Tom Hanks) is trapped at the JFK International airport because he is denied entry to the U.S. and at the same time can’t return to his native country due to a revolution? Well, unlike Viktor who has a place to stay at (The Terminal), when a message arrives at a topic that does not satisfy any associated subscription’s rule, the message is discarded.
So let’s take a closer look at what all these characteristics of subscriptions and how they tie into a Topic.
Creating a Subscription
When we create a subscription, at a bare minimum we associate it to a topic:
SubscriptionDescription switftSubscription = await namespaceManager.SubscriptionExistsAsync(“singingtelegrams”, “TaylorSwift”);
In addition, we can set some properties of the subscription by passing in a SubscriptionDescription
SubscriptionDescription subscription = new SubscriptionDescription((“singingtelegrams”, “TaylorSwift”) { EnableDeadLetteringOnFilterEvaluationExceptions = true, EnableDeadLetteringOnMessageExpiration = true, EnableBatchedOperations = true, }; SubscriptionDescription bieberSubscription = await namespaceManager.CreateSubscriptionAsync(subscription);
In the above examples we providing the path of the existing topic as well as the name for the subscription we are creating. In this fashion, these subscriptions have no explicit rule defining what messages they should receive. Therefore, they have subscribed to receive all messages sent to the “singingtelegrams”. When any message is sent to the “singingtelegram” Topic, both the TaylorSwift and JustinBieber subscriptions will receive a copy in their virtual queue.
[alert type=”info” icon=”icon-help-2″]Currently the limitation on number of subscriptions that are subscribed to a Topic is 2000.[/alert]Deleting a Subscription
The only slight difference with the deletion of a Queue and a Subscription is that we also must provide the Topic Path that the subscription is associated with as well as the name of the subscription.
await namespaceManager.DeleteSubscriptionAsync(topicPath, subscriptionName);
Update a Subscription
Again, just as with Queues and Topics, there are certain properties that must be set at creation time such as RequireSession
, but we can provide a SubscriptionDescription
instance to update properties of our Subscription:
SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName) { DefaultMessageTimeToLive = TimeSpan.FromHours(8), LockDuration = TimeSpan.FromSeconds(90) }; SubscriptionDescription updatedSubscription = await namespaceManager.UpdateSubscriptionAsync(subscription);
Recall back in part 1, I had stated that in the end we really are still dealing with queues? Well, Microsoft does define a Subscription as a virtual queue. Copies of messages that are sent to the subscription reside there and are retrieved as they are with a Service Bus Queue.
Retrieving Messages on a Subscription
As in the case of using the abstract MessageSender
, if we use the abstract MessageReceiver
we don’t have to worry about whether it is a Service Bus Queue or a Subscription that we are pulling messages from. Below is the exact code from receiving messages from queues in part 1 with the exception of a single parameter passed in for the CreateMessageReceiverAsync
method:
MessageReceiver messageReceiver = await _messagingFactory.CreateMessageReceiverAsync("singingtelegram/subscriptions/TaylorSwift"); try { while (!cancellationToken.IsCancellationRequested) { var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); if (msg.LockedUntilUtc == DateTime.UtcNow) await msg.RenewLockAsync(); await ProcessAndReleaseMessageAsync(msg); await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken); } } catch (Exception ex) { //log error; throw; } private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message) { MessageProcessingAction action = MessageProcessingAction.Abandon; try { //Process message action = MessageProcessingAction.Complete; } catch (Exception ex) { //log } finally { //if something fails update with abandon //C# 6.0 allows await calls in a finally blocks UpdateMessageState(message, action); } } private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action) { switch (action) { case MessageProcessingAction.Complete: await message.CompleteAsync(); break; case MessageProcessingAction.Abandon: await message.AbandonAsync(); break; case MessageProcessingAction.Deadletter: await message.DeadLetterAsync(); break; default: await message.AbandonAsync(); break; } }
What we do have to care about is the string we provide the MessagingFactory.CreateMessageReceiverAsync(entityPath)
method. In our Queue example, we just provide a string of the queue path. But, in this case we need our subscription path in the form of <Topic Path>/subscriptions/<subscription name> as shown by “singingtelegram/subscriptions/TaylorSwift”.
Upon receiving any messages, we can follow the exact same process we did in our Queue example for updating a Message by calling Abondon, Complete or DeadLetter. I’ll let you review the message receiving details back in part 1.
Until now, the only compelling reason to use Topics and Subscriptions over Queues has been the ability to provide copies of a single message to multiple interested parties (subscriptions). But that is only one compelling feature of Topics and Subscriptions. Another compelling feature is message routing and the ability to control what messages are routed to what subscriptions.
We can provide message routing through Rules. Now, just because we didn’t define a rule for the subscriptions we already created, doesn’t mean a rule doesn’t exist. Indeed all subscriptions at a minimum have one rule.
Rules, Filters and Actions Oh My!
It is here that we have reached the zenith of what makes Topics and Subscriptions so powerful. A rule in world of subscriptions define what messages should be copied to a particular Subscription. Each subscription has access to all messages received by a Topic and individually can filter out what messages belong to it. This filtering is done through the Rule’s Filter which we’ll talk about in depth.
In addition to filtering, when a message is filtered out by a Subscription because the message met the rule’s condition, we have the opportunity to carry out actions against the message. These actions include adding, removing and updating custom properties and changing system property values. This is all defined by the rule’s action . Therefore, it really is the filter and actions that make up a rule.
[alert type=”info” icon=”icon-help-2″]On a subjective side note, a rule resembles more of a policy than a rule. Where a rule defines not only the condition(s) (filters) but also what should take place when a condition is met smells more of a policy. I only state that in case someone has an easier time conceptualizing a rule as a policy.[/alert]Another way to state the above verbose explanation is that a subscription filters out messages that meets a rule’s conditions and optionally applies actions against the properties of a message when the filtered message is copied to the subscription. So let’s see examples of all these:
Rule Filter
In the .NET landscape, a subscription rule is in the form of a RuleDescription
and defines the rule’s Filter
and Action
. When we created our subscriptions, an available override to pass in a RuleDescription
is available:
RuleDescription rule = new RuleDescription { Name = "ForwardToTaylorSwift", Filter = new SqlFilter("RequestedSinger = 'Justin Bieber’ OR RequestedSinger = 'Taylor Swift’") }; SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName); SubscriptionDescription description = await namespaceManager.CreateOrRetrieveSubscriptionAsync(subscription, rule);
So here is our first introduction to a Rule and its filter. As the filter’s type implies, it allows for defining a SQL92 standard expression in its constructor that will govern what messages are filtered out. We can read this as, all messages received at the “singingtelegram’ topic that have a custom property “RequestedSinger” with the value “Justin Bieber” or “Taylor Swift” will be filtered out to the “TaylorSwift” subscription.
So if we were to send a BrokeredMessage
that matches the above rule to our ‘”singingtelegram” topic:
BrokeredMessage message = new BrokeredMessage(); message.Properties.Add("RequestedSinger", "Justin Bieber"); await messageSender.SendAsync(message);
But, this doesn’t exclude other subscriptions from also receiving this message as well. Recall, that I mentioned earlier that all subscriptions have a rule. When we don’t explicitly define a rule as the case was in our original subscription creation examples, a default rule is provided for us that has a default filter. Since the rule condition is defined in the rule filter, let’s take a look at what out-of-the-box filters that are available:
· SQLFilter – The filter that a number of other filters derive from such TrueFilter and FalseFilter
· TrueFilter – This is the default filter provided through the default rule that is generated for us when a rule or filter is not explicitly provided when creating a subscription. Ultimately, this generates the SQL92 expression 1=1 and subscribes to receive all messages of the associated topic.
· FalseFilter – The antithesis of a TrueFilter that generates a SQL92 expression of 1=0; a subscription with this filter will never subscript to any messages of the associated topic.
· CorrelationFilter – This filter subscribes the subscription to all messages of a particular CorrelationId property of the message.
[alert type=”info”]Be aware that the comparison values in your SQL expression are case sensitive, while the property names are not (e.g. “RequestedSinger = ‘Taylor Swift’” is not the same as “RequestedSinger = ‘taylor swift’”)[/alert]Adding Rules
Subscriptions are not limited to one rule however. We can add additional rules to an existing subscription. However, this is a perfect example of where the MessageReceiver
abstraction class can’t fulfill our needs and we’ll need to turn to the SubscriptionClient
.
SubscriptionDescription subscription = await _operations.CreateOrRetrieveSubscriptionAsync(topicPath, subscriptionName); //MessagingFactory as demo'd in part 1 SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient(subscription.TopicPath, subscription.Name); RuleDescription ruleDescription = new RuleDescription("NeverGrowUpSongRule", new SqlFilter("RequestedSong = 'Never Grow Up'")); await subscriptionClient.AddRuleAsync(ruleDescription);
The first operation is simply a personal wrapper that returns a verified SubscriptionDescription
. With it verified, we can acquire a MessagingFactory
(see Sending Messages in part 1) that will allow us to create a SubscriptionClient
that will in turn allow us to add a rule to the existing subscription.
Quick Note on CoorelationFilter
The CoorelationFilter
is a special filter for a specific use. It is one of available ways to implement a coorelation pattern in which a subscription can retrieve all messages bound to the same CoorelationId of a message. We’ll look at this in part 3 when we examine messaging patterns.
Rule Actions
Every rule has to have a filter; this is what defines the rule’s condition. But, a rule does not have to have an action. As mentioned earlier, a rule’s action allows us to manipulate the properties (both system and custom) on a message that meets the rule’s condition.
As we stated before, the rule is the container for both the filter and the action and we can define an action in our Action
property of our RuleDescription
:
RuleDescription ruleDescription = new RuleDescription("ChangeRequestedSinger") { Filter = new SqlFilter("RequestedSinger = 'Justin Bieber'"), Action = new SqlRuleAction("SET RequestedSinger = 'Taylor Swift'") };
Here we are creating a RuleDescription
that will filter out any messages that has a RequestedSinger with the value of “Justin Bieber” and will change the value of that same property to “Taylor Swift” as defined by the “SET RequestedSinger = ‘Taylor Swift’” action.
If we were to add this rule, or as shown below, create a subscription with this rule:
SubscriptionDescription newSubscriptionDescription = await _namespaceManager.CreateSubscriptionAsync(topicPath, subscriptionName, ruleDescription );
Then send a message that satisfies the subscription’s condition (i.e. Filter = new SqlFilter(“RequestedSinger = ‘Justin Bieber'”)
BrokeredMessage message = new BrokeredMessage(); message.Properties.Add("RequestedSinger", "Justin Bieber"); await messageSender.SendAsync(message);
When we receive the message, we will find that the RequestedSinger custom property value will no longer be “Justin Bieber” but “TaylorSwift”
BrokeredMessage msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); message.Properties["RequestedSinger"] // ="Taylor Swift"
Manipulating custom properties is only one of the available features of the Action. Adding custom properties or changing system properties is also possible. Imagine you wanted a catch-all subscription that caught any messages without a defined RequestedSinger and created the RequestedSinger property and updated the system property ContentType
.
var ruleDescription = new RuleDescription("ChangeRequestedSinger") { Filter = new SqlFilter("RequestedSinger IS NULL"), Action = new SqlRuleAction("SET sys.ContentType = 'audio/mpeg'; SET RequestedSinger = 'Taylor Swift'") };
With a Subscription with the above RuleDescription
, any messages filtered will have the System property ContentType
updated to audio/mpeg and create a new custom property RequestedSinger.
So as you can see there is a lot of power in rule actions. You can learn more about the syntax and available options for actions at Microsoft’s MSDN page.
Conclusion
Well as you can clearly see, Topics is a big “topic” and is inseparable from Subscriptions. We’ve learned that two of the most powerful features in Topics and Subscriptions is the ability to distribute messages to multiple interested parties (subscriptions) and those parties are able to filter out what messages they are specifically interested in. There is still a good bit to cover on the topic of Azure Service Bus. So in the finally part, we’ll be diving into some of the more advanced features, patterns, security and best practices.