Windows Azure AppFabric Service Bus Brokered Messaging

At Build 2011, Microsoft released the Windows Azure AppFabric Service Bus Brokered Messaging feature which has been previewed in the AppFabric Labs environment over the last few months. Service Bus Brokered Messaging provides a sophisticated publish/subscribe mechanism supporting disconnected communication between many producers and many consumers. This capability can be used to support load leveling among one or more producers and load balancing among one or more consumers, thereby supporting increased scalability of a service.

Clemens Vasters (@clemensv) announced the release on the Windows Azure Team blog. The MSDN documentation for Windows Azure AppFabric v1.5 release, including Brokered Messaging, is here. Valery Mizonov (@TheCATerminator), of the Windows Azure Customer Advisory Team, has a post on Best Practices for Leveraging Windows Azure Service Bus Brokered Messaging API. Rick Garibay (@rickggaribay) has a post describing the differences between the preview and release versions that includes links to additional resources. Alan Smith (@alansmith) has a post providing an AppFabric Walkthrough: Simple Brokered Messaging (1st in a series). He has recently posted an eBook on Brokered Messaging as well as an update on deadletter sub-queues. There is also a post comparing Windows Azure Storage Queues with AppFabric Service Bus Brokered Messaging.

This post does not cover the WCF bindings and the REST API for Brokered Messaging that are also contained in the release. The post is a replacement for an earlier post describing the preview release of Service Bus Brokered Messaging.

Intro to Brokered Messaging

The name brokered messaging is used to distinguish the functionality from direct messaging in which a producer communicates directly with a consumer, and relayed messaging in which a producer communicates with a consumer through a relay. Both direct messaging and relayed messaging are subject to backpressure from the consumer being limited in how fast it can consume messages, and this causes the producer to throttle message production thereby limiting service scalability. By contrast, brokered messaging uses a high-capacity intermediate store to consume and durably store messages which can then be pulled by the consumer. The benefit of this is that the producer and consumer can scale independently of each other since the intermediate message broker buffers any difference in the real-time ability of the producer to send messages and the consumer to receive them.

The Windows Azure AppFabric Service Bus supports two distinct forms of brokered messages

  • Queues
  • Topics & Subscriptions

Queues represent a persistent sequenced buffer into which one or more producers send messages to the tail and one or more consumers compete to receive messages from the head. A queue has a single cursor, pointing to the current message, that is shared among all consumers. This cursor is moved each time a consumer receives a message. Service Bus Queues provide various methods to modify that default handling – including the timed visibility of messages on the queue, the deferral of message processing, and the ability of messages to reappear on the queue.

Topics supports a persistent sequenced buffer into which one or more producers send messages to the tail of the buffer which has multiple heads referred to as subscriptions, each of which receives distinct copies of the messages. One or more consumers compete to receive messages from the head of a subscription. Each subscription has its own cursor that is moved each time a message is retrieved.  A subscription can be filtered so that only a subset of messages are available through it. This provides for functionality where geographically-focused subscriptions contain messages only for particular regions while an auditing subscription contains all the messages.

TokenProvider Classes

The AppFabric ServiceBus v1.5 release introduced the following classes, derived from TokenProvider, to support authentication with Service Bus Brokered Messaging.

The TokenProvider class creates factory methods to create instances of the various token providers. The factory methods for the SharedSecretTokenProvider supports the creation of a token from the issuer and shared secret for the service bus namespace.

These classes are all in the Microsoft.ServiceBus namespace in the Microsoft.ServiceBus.dll assembly.

NamespaceManager Class

The NamespaceManager and NamespaceManagerSettings classes are used to manage the Windows Azure AppFabric Service Bus namespace and, more specifically, the rendezvous endpoints used in Brokered Messaging.  It exposes methods supporting the creation and deletion of queues, topics and subscriptions. The NamespaceManagerSettings class exposes the OperationTimeout and the TokenProvider used with operations performed by an associated NamespaceManager.

The NamespaceManager class is declared (in truncated form):

public sealed class NamespaceManager {
    public NamespaceManager(String address, NamespaceManagerSettings settings);
    public NamespaceManager(Uri address, NamespaceManagerSettings settings);
    public NamespaceManager(String address, TokenProvider tokenProvider);
    public NamespaceManager(Uri address, TokenProvider tokenProvider);

    public Uri Address { get; }
    public NamespaceManagerSettings Settings { get; }

    public QueueDescription CreateQueue(String path);
    public QueueDescription CreateQueue(QueueDescription description);
    public SubscriptionDescription CreateSubscription(String topicPath, String name, Filter filter);
    public SubscriptionDescription CreateSubscription(String topicPath, String name,
       RuleDescription ruleDescription);
    public SubscriptionDescription CreateSubscription(SubscriptionDescription description,
       RuleDescription ruleDescription);
    public SubscriptionDescription CreateSubscription(String topicPath, String name);
    public SubscriptionDescription CreateSubscription(SubscriptionDescription description,
       Filter filter);
    public SubscriptionDescription CreateSubscription(SubscriptionDescription description);
    public TopicDescription CreateTopic(String path);
    public TopicDescription CreateTopic(TopicDescription description);
    public void DeleteQueue(String path);
    public void DeleteSubscription(String topicPath, String name);
    public void DeleteTopic(String path);
    public QueueDescription GetQueue(String path);
    public IEnumerable<QueueDescription> GetQueues();
    public IEnumerable<RuleDescription> GetRules(String topicPath, String subscriptionName);
    public SubscriptionDescription GetSubscription(String topicPath, String name);
    public IEnumerable<SubscriptionDescription> GetSubscriptions(String topicPath);
    public TopicDescription GetTopic(String path);
    public IEnumerable<TopicDescription> GetTopics();
    public Boolean QueueExists(String path);
    public Boolean SubscriptionExists(String topicPath, String name);
    public Boolean TopicExists(String path);
}

The constructors provide various ways of creating a NamespaceManager from a full namespace URI and a token provider, either directly or through a NamespaceManagerSettings instance. There are also asynchronous versions of all the methods.

The ServiceBusEnvironment class contains some helper methods to create the appropriate namespace URI. For example, the following code creates a shared secret token provider and a service URI – and uses them to create a NamespaceManager instance:

String serviceNamespace = “mynamespace”;
String issuer = “owner”;
String issuerKey = “base64-encoded key”;

TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(
   issuer, issuerKey);
Uri serviceUri = ServiceBusEnvironment.CreateServiceUri(“sb”, serviceNamespace,
   String.Empty);
NamespaceManager namespaceManager = new NamespaceManager(serviceUri, tokenProvider);

The NamespaceManager class provides an extensive set of methods supporting the management of queues, topics and subscriptions – including the creation, deletion and existence checking of them. It also provides methods allowing for the retrieval of the properties of either a specific queue, topic and subscription, or all of each type. Each of these methods is available in both synchronous and asynchronous versions.

The following example uses a NamespaceManager instance to create a queue asynchronously, and then synchronously a topic and an associated subscription.

IAsyncResult iAsyncResult = namespaceManager.BeginCreateQueue(
    “myqueue”,
    (result) =>
    {
        namespaceManager.EndCreateQueue(result);
    },
    null);

TopicDescription topicDescription = namespaceManager.CreateTopic(“mytopic”);
SubscriptionDescription subscriptionDescription =
   namespaceManager.CreateSubscription(topicDescription.Path, “mysubscription”);

The delete and existence-checking methods are invoked similarly to the create methods.

Note that service scalability can be significantly improved by using asynchronous calls rather than blocking synchronous calls. This is particularly true when receiving messages from queues and subscriptions. For clarity, this post primarily uses synchronous calls because it is easier to understand what is going on. However, the Valery Mizonov post on Best Practices for Leveraging Windows Azure Service Bus Brokered Messaging API has an extensive discussion on the robust use of asynchronous calls.

Note that other than to avoid namespace collisions no knowledge of how the namespace is allocated to queues, topics and subscriptions is needed when using the Brokered Messaging API. However, for completeness with a service named myservice, a queue named myqueue, a topic named mytopic and a subscription named mysubscription the following show examples of the full address for a queue, a topic and a subscription respectively:

QueueDescription, TopicDescription and SubscriptionDescription

The QueueDescription, TopicDescription  and SubscriptionDescription classes are used with the NamespaceManager factory methods to parameterize the creation of queues, topics and subscriptions. It is interesting to look at the properties of these classes since they provide a direct way of understanding the functionality exposed through queues, topics and subscriptions. In particular, when displayed in tabular form it becomes immediately apparent which properties affect message sending and which affect message retrieval.

Property Queue Topic Subscription
DefaultMessageTimeToLive get/set get/set get/set
DuplicateDetectionHistoryTimeWindow get/set get/set  
EnableBatchedOperations get/set get/set get/set
EnableDeadLetteringOnFilterEvaluationExceptions     get/set
EnableDeadLetteringOnMessageExpiration get/set   get/set
LockDuration get/set   get/set
MaxDeliveryCount get/set   get/set
MaxSizeInMegabytes get/set get/set  
MessageCount get   get
Name     get/set
Path get/set get/set  
RequiresDuplicateDetection get/set get/set  
RequiresSession get/set   get/set
SizeInBytes get get  
TopicPath     get/set

DefaultMessageTimeToLive specifies the default time to live for a message. DuplicateDetectionHistoryTimeWindow specifies the default time window for duplicate message detection. EnableBatchOperations specifies whether server-side batch operations are enabled. EnableDeadLetteringOnFilterEvaluationExceptions specifies whether messages encountering filter-evaluation exceptions on a subscription are sent to the dead letter subqueue. EnableDeadLetteringOnMessageExpiration specifies whether messages which have reached their time-to-live are sent to the dead letter subqueue. LockDuration specifies the default value for the lock duration when a message is retrieved using the PeekLock receive mode.

MaxDeliveryCount specifies the maximum number of
delivery attempts before a message is sent to the dead letter subqueue. MaxSizeInMegabytes specifies the maximum size in megabytes of the queue. Note that the current quotas allow for queues up to 5 GB. MessageCount returns the number of messages in the queue or subscription. Name specifies the name of a subscription. Path specifies the name of a queue or topic. RequiresDuplicateDetection specifies whether the queue or topic implements duplicate detection. RequiresSession specifies whether a MessageSession receiver must be used to retrieve messages from the queue or subscription. SizeInBytes returns the size of the queue or topic. TopicPath specifies the topic path for a subscription.

BrokeredMessage

The BrokeredMessage class represents a brokered message to be sent to a queue or topic or retrieved from a queue or subscription. It is declared:

public sealed class BrokeredMessage : IXmlSerializable, IDisposable {
public BrokeredMessage(Object serializableObject, XmlObjectSerializer serializer);
public BrokeredMessage(Stream messageBodyStream, Boolean ownsStream);
public BrokeredMessage();
public BrokeredMessage(Object serializableObject);

public String ContentType { get; set; }
public String CorrelationId { get; set; }
public Int32 DeliveryCount { get; }
public DateTime EnqueuedTimeUtc { get; }
public DateTime ExpiresAtUtc { get; }
public String Label { get; set; }
public DateTime LockedUntilUtc { get; }
public Guid LockToken { get; }
public String MessageId { get; set; }
public IDictionary<String,Object> Properties { get; }
public String ReplyTo { get; set; }
public String ReplyToSessionId { get; set; }
public DateTime ScheduledEnqueueTimeUtc { get; set; }
public Int64 SequenceNumber { get; }
public String SessionId { get; set; }
public Int64 Size { get; }
public TimeSpan TimeToLive { get; set; }
public String To { get; set; }

public void Abandon();
public IAsyncResult BeginAbandon(AsyncCallback callback, Object state);
public IAsyncResult BeginComplete(AsyncCallback callback, Object state);
public IAsyncResult BeginDeadLetter(AsyncCallback callback, Object state);
public IAsyncResult BeginDeadLetter(String deadLetterReason,
String deadLetterErrorDescription, AsyncCallback callback, Object state);
public IAsyncResult BeginDefer(AsyncCallback callback, Object state);
public void Complete();
public void DeadLetter();
public void DeadLetter(String deadLetterReason, String deadLetterErrorDescription);
public void Defer();
public void EndAbandon(IAsyncResult result);
public void EndComplete(IAsyncResult result);
public void EndDeadLetter(IAsyncResult result);
public void EndDefer(IAsyncResult result);
public T GetBody<T>();
public T GetBody<T>(XmlObjectSerializer serializer);

public void Dispose();
public override String ToString();
XmlSchema IXmlSerializable.GetSchema();
void IXmlSerializable.ReadXml(XmlReader reader);
void IXmlSerializable.WriteXml(XmlWriter writer);
}

The BrokeredMessage class exposes several constructors as well as various methods to manage methods received using the PeekLock receive mode:

  • Abandon() to have the queue or subscription unlock the message and make it visible to consumers. 
  • Complete() to have the queue or subscription delete the message
  • DeadLetter() to have the queue or subscription transfer the message to the dead letter subqueue.
  • Defer() to have the queue or subscription defer the message for subsequent processing.

The dead letter subqueue is used as a store where messages can be stored for out-of-band processing. This is typically used when there is a problem with the message which needs to be investigated outside normal processing. Note that a deferred message can only be received by the explicit provision of the sequence number for the message. Consequently, it is essential to save the sequence number for later use before invoking Defer().

These methods exist in both synchronous and asynchronous form. The BrokeredMessage class also exposes various accessor methods to the message body. The BrokeredMessage class has several properties which help describe some of the features of brokered messaging.

ContentType is an application-specific value that can be used to specify the type of data in the message. The CorrelationId is used with the CorrelationFilter to correlate two-way messaging between the producer and consumer. DeliveryCount specifies the number of times the message has been delivered to a consumer. EnqueuedTimeUtc specifies when the message was sent to the queue or topic. ExpiresAtUtc specifies when the message expires on the queue or subscription. Label is an application-specific label for the message. LockedUntilUtc specifies the unlock time for a message received using the PeekLock receive mode. LockToken specifies the lock token of the message received using the PeekLock receive mode. The MessageId is an application-specific identifier for a message.

The Properties collection contains a set of application-specific properties which can be used, instead of the message body, to pass message content. The Properties collection is also accessible during the rule processing for message retrieval from a subscription. ReplyTo contains the name of a queue to which any reply should be sent. The ReplyToSessionId specifies a session the message should be replied to. ScheduledEnqueueTimeUtc specifies a future time when a new message will become visible on the queue or topic. By default, a message becomes visible immediately. The SequenceNumber is a Service Bus provided unique identifier for the message.
The SessionId identifies a session, if any, that the message belongs to.
Size specifies the size of the message in bytes. TimeToLive specifies the time-to-live of the message after which it is either deleted or moved to the dead letter subqueue. With regard to To, your guess is as good as mine.

ReceiveMode

A consumer can receive a message in one of two receive modes: PeekLock (the default) and ReceiveAndDelete. In PeekLock receive mode, a consumer receives the message which remains invisible on the queue until the consumer either abandons the message (physically or through a timeout) and the message once again becomes visible or deletes the message on successful completion of processing. This mode supports at least once processing, since each message is consumed one or more times. In ReceiveAndDelete mode, a consumer receives the message and it is immediately deleted from the queue. This mode provides for at most once processing since each message is consumed at most once. A message can be lost in ReceiveAndDelete mode if the consumer fails while processing a message.

MessagingFactory and MessagingClientEntity

Messages are sent and received using instances of classes in the MessagingClientEntity hierarchy:

MessagingClientEntity
   MessageReceiver
     
MessageSession
   MessageSender
   MessagingFactory
   QueueClient
   SubscriptionClient
   TopicClient

The MessagingFactory class contains factory methods that can be used to create a MessagingFactory instance which can then be used to create instances of the MessageReceiver, MessageSender, QueueClient, SubscriptionClient and TopicClient classes. The QueueClient and SubscriptionClient classes expose methods to create MessageSession intances.

The QueueClient class is used to send and receive messages to a queue. The TopicClient class is used to send messages to a topic while the SubscriptionClient class is used to receive messages from a subscription to a topic. The MessageSender class and MessageReceiver classes abstract the sending and receiving functionality so that they can be used against either queues or topics and subscriptions. The MessageSession class exposes functionality allowing queue and subscription receivers to receive related messages as a “session.” After some receiver receives a message in a session, all messages in that session are reserved for that receiver.

QueueClient

The QueueClient class is to some extent a superset of the TopicClient and SubscriptionClient classes. This is because it has to handle both the sending and receiving of messages. It lacks the functionality related to rules, filters and actions that the SubscriptionClient exposes but sending to a topic is pretty much the same as sending to a queue.

The following is a severely truncated class declaration for the QueueClient class:

public abstract class QueueClient : MessageClientEntity {
    public MessagingFactory MessagingFactory { get; }
    public String Path { get; }
    public Int32 PrefetchCount { get; set; }

    public static String FormatDeadLetterPath(String queuePath);

    public MessageSession AcceptMessageSession();
    public void Abandon(Guid lockToken);
    public void Complete(Guid lockToken);
    public void Defer(Guid lockToken);
    public void DeadLetter(Guid lockToken);
    public BrokeredMessage Receive();
    public void Send(BrokeredMessage message);
    public ReceiveMode Mode { get; }
}

The PrefetchCount property is a performance enhancement that specifies how many messages should be retrieved BEFORE the first call to Receive(). Note that this can lead to message loss if the consumer fails before completely processing the messages.

The instance methods of the class exist have several overloads, each of which has an associated asynchronous version. AcceptMessageSession() is used to request the next session (with an overload allowing a specific session to be requested). The returned MessageSession instance can then be used to retrieve sequentially the messages in that session.

The Abandon(), Complete(), DeadLetter()and Defer() methods have the same meaning as the equivalent methods in the BrokeredMessage class. The difference is that in this case the lock token contained in the message must be provided as a parameter.

The FormatDeadLetterPath() method is invoked to get the path for the dead letter subqueue. This path can then be used to create a QueueClient to retrieve messages from the dead letter subqueue.

A produces invokes Send() to send a message to the queue while a consumer invokes Receive() to receive a message from the queue. Note that a message received using the ReceiveAndDelete receive mode is deleted immediately from the queue. None of the Abandon(), Complete(), Defer() and DeadLetter() functionality, just described, applies to such a message.

The following code fragment shows messages being sent to two separate queues:

MessagingFactory messagingFactory = MessagingFactory.Create(serviceUri, tokenProvider);

QueueClient bodySender = messagingFactory.CreateQueueClient(“BodyQueue”);
BrokeredMessage brokeredMessage1 = new BrokeredMessage(“serializable message body”);
bodySender.Send(brokeredMessage1);

QueueClient propertySender = messagingFactory.CreateQueueClient(“PropertyQueue”);
BrokeredMessage brokeredMessage2 = new BrokeredMessage()
{
    Properties = {
        {“Month”, “July”},
        {“NumberOfDays”, 31}
    }
};
propertySender.Send(brokeredMessage2);

messagingFactory.Close();
messagingFactory = null;

These show two different ways of inserting information in a message – either as serializable content in the message body or as message properties. In the first case, a message containing body text is sent to a queue. In the second case, a message containing a pair of properties is sent to a different queue. These show distinct ways of using messages. Other than being convenient when a message contains only a few properties, the latter becomes essential when using subscriptions since the Service Bus messaging broker can modify message handling based on the value of the message properties. Indeed, it can even modify the values of the properties. This functionality is not available when the actionable content is stored in the message body.

The fragment also shows the closing of the MessagingFactory instance. Each MessagingClientEntity (QueueClient, TopicClient, etc.) gets its own connection and the resources consumed by the connection should be released when no longer needed. This can be done either by invoking Close() on the individual MessagingClientEntity or by invoking close on the MessagingFactory instance, which releases all resources created through the factory instance including any MessagingClientEntity instances created by it.

The following code fragment shows messages being retrieved from two separate queues:

QueueClient bodyReceiver = messagingFactory.CreateQueueClient(“BodyQueue”,
   ReceiveMode.ReceiveAndDelete);
BrokeredMessage brokeredMessage3 = bodyReceiver.Receive();
String messageBody = brokeredMessage3.GetBody<String>();
bodyReceiver.Close();

QueueClient propertyReceiver = messagingFactory.CreateQueueClient(“PropertyQueue”);
propertyReceiver.BeginReceive(ReceiveDone, propertyReceiver);

public static void ReceiveDone(IAsyncResult result)
{
    QueueClient queueClient = result.AsyncState as QueueClient;
    BrokeredMessage brokeredMessage = queueClient.EndReceive(result);
    String messageId = brokeredMessage.MessageId;
    String month = brokeredMessage.Properties["Month"] as String;
    Int32 numberOfDays = (Int32)brokeredMessage.Properties["NumberOfDays"];

    switch (numberOfDays)
    {
        case 28:
            brokeredMessage.Defer();
            break;
        case 30:
            brokeredMessage.Complete();
            break;
        case 31:
            brokeredMessage.Abandon();
            break;
    }
    queueClient.Close();
}

This sample shows both the synchronous and asynchronous retrieval of messages. In the synchronous case, the actionable content is retrieved from the message body. In the asynchronous case, the actionable content is retrieved from message properties. The property values are then used to select the disposition of the message as deferred, completed or abandoned. Note that this depends on the message receive mode being the default PeekLock. The MessageId needs to be retained for a deferred message since such a message can be retrieved only by MessageId.

TopicClient

A TopicClient is the MessageClientEntity used to send messages to a topic. It exposes synchronous and asynchronous versions of the Send() method.

SubscriptionClient

The SubscriptionClient class us used to handle messages received from a subscription. SubscriptionClient is declared (in truncated form):

public abstract class SubscriptionClient : MessageClientEntity {
    public MessagingFactory MessagingFactory { get; }
    public String Name { get; }
    public Int32 PrefetchCount { get; set; }
    public String TopicPath { get; }

    public IAsyncResult BeginAddRule(String ruleName, Filter filter, AsyncCallback callback,
       Object state);
    public IAsyncResult BeginAddRule(RuleDescription description, AsyncCallback callback,
       Object state);
    public IAsyncResult BeginRemoveRule(String ruleName, AsyncCallback callback,
       Object state);
    public void EndAddRule(IAsyncResult result);
    public void EndRemoveRule(IAsyncResult result);
    public static String FormatDeadLetterPath(String topicPath, String subscriptionName);
    public static String FormatSubscriptionPath(String topicPath, String subscriptionName);
    public void RemoveRule(String ruleName);

    public IAsyncResult BeginAcceptMessageSession(AsyncCallback callback,
       Object state);
    public IAsyncResult BeginAcceptMessageSession(String sessionId, AsyncCallback callback,
       Object state);
    public IAsyncResult BeginAcceptMessageSession(TimeSpan serverWaitTime,
       AsyncCallback callback, Object state);
    public IAsyncResult BeginAcceptMessageSession(String sessionId, TimeSpan serverWaitTime,
       AsyncCallback callback, Object state);
    public MessageSession EndAcceptMessageSession(IAsyncResult result);
    public IAsyncResult BeginAbandon(Guid lockToken, AsyncCallback callback, Object state);
    public void EndAbandon(IAsyncResult result);
    public IAsyncResult BeginComplete(Guid lockToken, AsyncCallback callback, Object state);
    public void EndComplete(IAsyncResult result);
    public IAsyncResult BeginDefer(Guid lockToken, AsyncCallback callback, Object state);
    public void EndDefer(IAsyncResult result);
    public IAsyncResult BeginDeadLetter(Guid lockToken, AsyncCallback callback, Object state);
    public IAsyncResult BeginDeadLetter(Guid lockToken, String deadLetterReason,
       String deadLetterErrorDescription, AsyncCallback callback, Object state);
    public void EndDeadLetter(IAsyncResult result);
    public IAsyncResult BeginReceive(AsyncCallback callback, Object state);
    public IAsyncResult BeginReceive(TimeSpan serverWaitTime, AsyncCallback callback,
       Object state);
    public IAsyncResult BeginReceive(Int64 sequenceNumber, AsyncCallback callback,
       Object state);
    public BrokeredMessage EndReceive(IAsyncResult result);
    public ReceiveMode Mode { get; }
}

The Begin/End asynchronous pairs of methods have equivalent synchronous methods. The Windows Azure AppFabric Service Bus team is serious about wanting us to do everything asynchronously. Much of the exposed functionality is familiar from the receive capability of a QueueClientAcceptMessageSession(), Abandon(), Complete(), DeadLetter(), Defer() and Receive().

The novel functionality exposed by a SubscriptionClient is that related to rules used to modify the behavior of a subscription. Note that this affects the subscription NOT the subscription receiver. Any modification to the rules using these methods affect only those messages sent to the associated topic after the change. I’m not convinced that this functionality should be in SubscriptionClient as well as the NamespaceManager since the methods affect the subscription not the SubscriptionClient. As it is care must be taken when there is more than one rule for a subscription since a message can be received more than once if it satisfies more than one rule on the subscription.

Rules, Filters and Actions

The RuleDescription class associates a rule name with a Filter and an optional RuleAction. The default Filter for a RuleDescription is the TrueFilter which always evaluates to true. A Filter is used to filter the messages, based on the values of the message properties, that a subscription handles for a topic. A message passes the filter when it evaluates to true. RuleAction is used to modify the properties of a message when it is received through the subscription. Rules significantly enhance the power of a subscription over a simple queue in that different filter and rule actions can be used for different subscriptions against the same topic. This allows different consumers to have very different views of the messages sent to the topic. Note that a rule does not need to have an associated RuleAction.

There are several Filter classes in the following hierarchy:

Filter
    CorrelationFilter
    FalseFilter
    SqlFilter
    TrueFilter

The CorrelationFilter provides a filter on the value of the CorrelationId of a message. This filter is intended to be used when correlating a reply message from a consumer to a producer. A reply message is put on a reply queue with the associated CorrelationId, and the original producer can use a CorrelationFilter on this queue to receive a response to a message it sent. The FalseFilter always fails so that no message is received through a subscription with a FalseFilter. A TrueFilter always succeeds so that all messages are received through a subscription with a TrueFilter. A SqlFilter supports a simple SQL-92 style syntax that can be used to filter messages depending on the property values.

The RuleAction classes have the following hierarchy:

RuleAction
    SqlRuleAction

SqlRuleAction supports a simple SQL-92 style syntax allowing message properties to be modified as they are received through the subscription.

The following code fragment shows the creation of a topic and two subscriptions and the sending of some messages to the topic. Note that a subscription handles only those messages submitted to the topic after the subscription is created. Similarly, a rule only affects messages submitted after the rule is added.

TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(
   issuer, issuerKey);
Uri serviceUri = ServiceBusEnvironment.CreateServiceUri(“sb”, serviceNamespace,
   String.Empty);
NamespaceManager namespaceManager = new NamespaceManager(serviceUri, tokenProvider);

RuleDescription summerRule = new RuleDescription()
{
    Action = new SqlRuleAction(“SET HavingFun = ‘Yes'”),
    Filter = new SqlFilter(“Month = ‘June’ OR Month = ‘July’ OR [Month] = ‘August'”),
    Name = “SummerRule”
};

TopicDescription topicDescription = namespaceManager.CreateTopic(“WeatherTopic”);
SubscriptionDescription summerSubscriptionDescription =
   namespaceManager.CreateSubscription(“WeatherTopic”, “WeatherSubscription”, summerRule);
SubscriptionDescription allSubscriptionDescription =
   namespaceManager.CreateSubscription(“WeatherTopic”, “AllSubscription”);

IEnumerable<RuleDescription> rules = namespaceManager.GetRules(
   “WeatherTopic”, “WeatherSubscription”);
List<RuleDescription> listRules = rules.ToList();

MessagingFactory messagingFactory = MessagingFactory.Create(serviceUri, tokenProvider);
TopicClient weatherSender = messagingFactory.CreateTopicClient(“WeatherTopic”);

BrokeredMessage augustMessage = new BrokeredMessage()
{
    Properties = {
        {“HavingFun”, “No”},
        {“Month”, “August”},
        {“NumberOfDays”, 31}
    }
};
weatherSender.Send(augustMessage);

BrokeredMessage aprilMessage = new BrokeredMessage()
{
    Properties = {
        {“HavingFun”, “No”},
        {“Month”, “April”},
        {“NumberOfDays”, 30}
    }
};
weatherSender.Send(aprilMessage);

BrokeredMessage septemberMessage = new BrokeredMessage()
{
    Properties = {
        {“HavingFun”, “No”},
        {“Month”, “September”},
        {“NumberOfDays”, 30}
    }
};
weatherSender.Send(septemberMessage);

weatherSender.Close();
weatherSender = null;

This fragment creates a RuleDescription with an action that sets the HavingFun property of a message to Yes if the Month property is June, July or August. A topic and two associated subscriptions are created, one of which is assigned this rule while the other has no filter (and consequently passes all messages). A MessagingFactory is then created which, in turn, creates a TopicClient used to send to the topic three messages with different property values.

The following code fragment shows the retrieval of messages from two different subscriptions to the same topic. The first subscription has a rule that filters the messages and transforms the message properties while the second has rule that passes through all the message and performs no transformations.

SubscriptionClient summerReceiver = messagingFactory.CreateSubscriptionClient(“WeatherTopic”, “WeatherSubscription”, ReceiveMode.ReceiveAndDelete);

while (true)
{
    BrokeredMessage brokeredMessage = summerReceiver.Receive(TimeSpan.FromSeconds(1));
    if (brokeredMessage != null)
    {
        String havingFun = brokeredMessage.Properties["HavingFun"] as String;
        String month = brokeredMessage.Properties["Month"] as String;
        brokeredMessage.Dispose();
    }
    else
    {
        break;
    }
}

SubscriptionClient allReceiver = messagingFactory.CreateSubscriptionClient(“WeatherTopic”, “AllSubscription”, ReceiveMode.ReceiveAndDelete);

while (true)
{
    BrokeredMessage brokeredMessage = allReceiver.Receive(TimeSpan.FromSeconds(1));
    if (brokeredMessage != null)
    {
        String havingFun = brokeredMessage.Properties["HavingFun"] as String;
        String month = brokeredMessage.Properties["Month"] as String;
        brokeredMessage.Dispose();
    }
    else
    {
        break;
    }
}

messagingFactory.Close();
messagingFactory = null;

MessageSender and MessageReceiver

The MessageSender and MessageReceiver classes abstract out the sending and receiving functionality of queues, topics and subscriptions, and can be used instead of the QueueClient, TopicClient and SubscriptionClient classes. The MessageSender class exposes synchronous and asynchronous Send() methods while the MessageReceiver class exposes the usual synchronous and asynchronous Abandon(), Complete(), DeadLetter() and Defer() methods.

The earlier example using a SubscriptionClient to receive messages can be converted to use a MessageReceiver by replacing the following line:

SubscriptionClient summerReceiver = messagingFactory.CreateSubscriptionClient(
   “WeatherTopic”, “WeatherSubscription”, ReceiveMode.ReceiveAndDelete);

with

MessageReceiver summerReceiver = messagingFactory.CreateMessageReceiver(
   “WeatherTopic/subscriptions/WeatherSubscription”, ReceiveMode.ReceiveAndDelete);

with WeatherTopic/subscriptions/WeatherSubscription being the full entity path to the WeatherSubscription on WeatherTopic.

UPDATE 10.11.2011

A MessageReceiver can be used to receive messages from a dead letter subqueue for a subscription. For example, the following code fragment changes the earlier example to one that accesses the dead letter subqueue:

SubscriptionClient allReceiver =
   messagingFactory.CreateSubscriptionClient(“WeatherTopic”, “AllSubscription”,
   ReceiveMode.PeekLock);
String deadLetterPath =
   SubscriptionClient.FormatDeadLetterPath(“WeatherTopic”, allReceiver.Name);
MessageReceiver deadLetterReceiver =
   messagingFactory.CreateMessageReceiver(deadLetterPath);

while (true)
{
    BrokeredMessage brokeredMessage =
       deadLetterReceiver.Receive(TimeSpan.FromSeconds(1));
    if (brokeredMessage != null)
    {
        String havingFun = brokeredMessage.Properties["HavingFun"] as String;
        String month = brokeredMessage.Properties["Month"] as String;
        brokeredMessage.Dispose();
    }
    else
    {
        break;
    }
}

In the example, the value of deadLetterPath is:

WeatherTopic/Subscriptions/AllSubscription/$DeadLetterQueue

MessageSession

Service Bus Brokered Messaging supports sessions and session state. A queue or subscription must be set up to support sessions through the use of the RequiresSession property in the QueueDescription or SubscriptionDescription. Subsequently, each message may have a SessionId associated with it. The intent is that related messages share a SessionId and comprise a session. A MessageSession receiver can then be created on the queue or subscription to receive messages with the same SessionId. A persistent session state can be associated with the session. This can be used along with the PeekLock receive mode to implement exactly once processing of messages since the session state can be used to store the current state of processing regardless of any failure in the session consumer.

The MessageSession class is derived from the MessageReceiver class and adds synchronous and asynchronous versions of the following methods to maintain session state:

public Stream GetState();
public void SetState(Stream stream);

The following code fragment shows the creation of a session-aware queue and the sending of some messages to it:

TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(
   issuer, issuerKey);
Uri serviceUri = ServiceBusEnvironment.CreateServiceUri(
   “sb”, serviceNamespace, String.Empty);
NamespaceManager namespaceManager = new NamespaceManager(serviceUri, tokenProvider);

QueueDescription queueDescription = new QueueDescription(“SessionQueue”)
{
    RequiresSession = true
};
namespaceManager.CreateQueue(queueDescription);

MessagingFactory messagingFactory = MessagingFactory.Create(serviceUri, tokenProvider);

QueueClient sender = messagingFactory.CreateQueueClient(“SessionQueue”);

for (Int32 i = 1; i < 10; i++)
{
    BrokeredMessage brokeredMessage = new BrokeredMessage()
    {
        MessageId = i.ToString(),
        SessionId = (i % 3).ToString(),
    };
    sender.Send(brokeredMessage);
}
sender.Close();

The novelty in this fragment is in the RequiresSession property for the QueueDescription and the setting of the SessionId on each message.

The following code fragment shows the use of a MessageSession to receive messages in a single session:

QueueClient receiver = messagingFactory.CreateQueueClient(
   “SessionQueue”, ReceiveMode.ReceiveAndDelete);
MessageSession sessionReceiver = receiver.AcceptMessageSession();
while ( true )
{
    BrokeredMessage message = sessionReceiver.Receive(TimeSpan.FromSeconds(1));
    if (message == null)
    {
        break;
    }
    String messageId = message.MessageId;
    String sessionId = message.SessionId;
}
messagingFactory.Close();

The MessageSession receiver is created by invoking AcceptMessageSession() on the QueueClient receiver. The session receiver receives the first message and then all subsequent messages with the same SessionId as the first one.

About Neil Mackenzie

Azure Architect at Satory Global.
This entry was posted in Azure AppFabric, Brokered Messaging, Service Bus and tagged , , . Bookmark the permalink.

4 Responses to Windows Azure AppFabric Service Bus Brokered Messaging

  1. Pingback: Windows Azure AppFabric Service Bus Queues and Topics | Convective

  2. Pingback: Distributed Weekly 121 — Scott Banwart's Blog

  3. Pingback: Windows Azure AppFabric Service Bus Brokered Messaging « Ricardo Villalobos' Blog

  4. Pingback: Service Bus Links | brianwhiteddev

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s