Windows Azure AppFabric Service Bus Queues and Topics

UPDATE 9/21/2011 – This post described the preview of Service Bus Queues and Topics. This feature has now been released into production with a modified API. I have written another post on Windows Azure AppFabric Service Bus Brokered Messaging to replace this one. Although the general idea of queues, topics and subscriptions remains the same, the other post describes the release version of the API.

The Windows Azure AppFabric Service Bus (65 Scrabble points) has a Message Buffer feature providing a small in-memory buffer where messages from one system to another can be buffered. This feature supports disconnected communication between the two systems. At PDC 2010, the AppFabric team announced Durable Message Buffers which used a persistent store to allow much larger messages to be stored for a longer period of time.

The CTP release of the Windows Azure AppFabric v2.0 SDK (May 2011) significantly improves the messaging capability of the Service Bus by adding Queues and Topics to support sophisticated publish-subscribe scenarios. Queues and Topics allow multiple publishers to communicate in a disconnected fashion with multiple subscribers, which can use filters to customize their subscriptions.

There are several programming models for Queues and Topics. The easiest to understand is the .NET API which exposes operations mapping almost directly onto operations on queues, topics and messages. There is a new WCF binding for Service Bus Messaging implemented using Queues and Topics. Finally, there is a REST API for Queues and Topics. This post only addresses the .NET API.

Clemens Vasters (@clemensv) introduced Queues and Topics in a Tech Ed 11 presentation. Rick Garibey (@rickggaribay) has a couple of posts on Queues and Topics. Sam Vanhoutte (@SamVanhoutte) has posts on publish/subscribe and using message sessions. Zulfiqar Ahmed (@zahmed) has a sequence of posts using the WCF bindings for queues and pub/sub (part 1 and part 2). Will Perry, of the AppFabric team, has a post showing how to use the REST API. Finally, David Ingham has a couple of posts  about Queues and Topics on the AppFabric Team Blog.

Queues

A queue is a durable message log with a single subscription tap feeding all subscribers. Each queue can have multiple receivers competing to receive messages. The maximum size of a queue during the CTP is 100MB, which is expected to rise to 1GB (or more) for production. A message can be up to 256KB.

A variety of system properties, such as MessageId, are associated with each message. A publisher can set some system properties, such as SessionID and CorrelationId, so that the subscriber can modify processing based on the value of the property. Arbitrary name-value pairs to a message. Finally, a message can contain an message body that must be serializable.

A queue implements message delivery using either at-most once semantics (ReceiveAndDelete) or at least once semantics (PeekLock). In the ReceiveAndDelete receive mode, a message is deleted from the queue as soon as it is pulled by a receiver. In the PeekLock receive mode, a message is hidden from other receivers until a timeout expires, by which time the receiver should have deleted the message. Queues are essentially best-effort FIFO, since message order is not guaranteed

A queue can be configured, on creation, to support message sessions which allow multiple messages to be grouped into a session identified by SessionId. All messages with the same SessionId are delivered to the same receiver. Sessions can be used to get around the 265KB limit for messages through the logical aggregation of multiple messages.

Topics

A topic is a durable message log with multiple subscription taps separately feeding subscribers. A topic can have up to 2,000 subscriptions associated with it, each of which gets independent copies of all messages sent to the topic. One or more subscribers can independently subscribe to a subscription and compete for messages from it. Topics support all the capabilities of Queues.

Each subscription can have an associated rule comprising a filter and an action. The filter is used to filter the topic so that only messages matching the filter can be retrieved through the subscription. There are various types of filter, the most powerful of which is expressed in SQL 92 syntax to create a filter string using the name-value pairs added as properties to the message. The action can be used to modify the values of the name-values pairs. If no action is needed, the filter can be associated directly with the subscription without the need to wrap it into a rule. A particularly simple filter just uses the value of a CorrelationId the sender adds to the message. The intent of this is to support a call-response model with the CorrelationId being used to correlate, or match, an initial request message with a response message.

Filters provide much of the power of Topics since they allow different subscribers to view different message streams from the same topic. For example, an auditing service could tap into a subscription which passes through all the messages sent to the topic while a regional service could tap into a subscription which only passes through messages associated with that geographical region.

Addressing Topics and Queues

Queues and Topics are addressed using the Service Bus namespace, similar to the Service Bus Relay Service. For example, the service address for a namespace named cromore is:

sb://cromore.servicebus.appfabriclabs.com/

The namespace is then extended with the names of queues, topics and subscriptions. For example, the following is the full path to a subscription named California on a topic named WeatherForecast:

sb://cromore.servicebus.appfabriclabs.com/WeatherForecast/Subscriptions/California

ServiceBusNamespaceClient

The Microsoft.ServiceBus.Messaging namespace contains most of the functionality for Queues and Topics. The ServiceBusNamespaceClient provides methods supporting the management of queues and topics. An abbreviated class declaration is:

public class ServiceBusNamespaceClient {
// Constructors
public ServiceBusNamespaceClient(
String address, ServiceBusNamespaceClientSettings settings);
public ServiceBusNamespaceClient(
Uri address, ServiceBusNamespaceClientSettings settings);
public ServiceBusNamespaceClient(
String address, TransportClientCredentialBase credential);
public ServiceBusNamespaceClient(
Uri address, TransportClientCredentialBase credential);

// Properties
public Uri Address { get; }
public ServiceBusNamespaceClientSettings Settings { get; }

// Methods
public Queue CreateQueue(String path);
public Queue CreateQueue(String path, QueueDescription description);
public Topic CreateTopic(String path);
public Topic CreateTopic(String path, TopicDescription description);
public void DeleteQueue(String path);
public void DeleteTopic(String path);
public Queue GetQueue(String path);
public IEnumerable<Queue> GetQueues();
public Topic GetTopic(String path);
public IEnumerable<Topic> GetTopics();
}

These synchronous methods have matching sets of asynchronous methods.

The various constructors allow the specification of the service namespace and the provision of the authentication token, containing the Service Bus issuer and key, used to authenticate to the Service Bus Messaging Service. The methods support the creation, deletion and retrieval of queues and topics. Note that an exception is thrown when an attempt to create a queue or topic that already exists or when invoking GetTopic() for a non-existent topic. A queue or topic can be defined to support sessions by creating it with a QueueDescription or TopicDescription, respectively, for which RequiresSession = true.

The following example shows the creation of a SharedSecretCredential and its use in the creation of a ServiceBusNamespaceClient, which is then used to create a new Topic. Finally, the example enumerates all the topics currently created in the namespace. Note that the code creating a Queue is almost identical.

public static void CreateTopic(String topicName)
{
String serviceNamespace = “cromore”;
String issuer = “owner”;
String key = “Base64 encoded key”;

SharedSecretCredential credential =
TransportClientCredentialBase.CreateSharedSecretCredential(issuer, key);
Uri serviceBusUri =
ServiceBusEnvironment.CreateServiceUri(“sb”, serviceNamespace, String.Empty);
ServiceBusNamespaceClient namespaceClient =
new ServiceBusNamespaceClient(serviceBusUri, credential);

Topic newTopic = namespaceClient.CreateTopic(topicName);

IEnumerable<Topic> topics = namespaceClient.GetTopics();
foreach (Topic topic in topics)
{
String path = topic.Path;
}
}

Note that a queue or topic is durable, and exists until it is specifically deleted.

QueueClient, SubscriptionClient and TopicClient

A MessagingFactory is used to create the various clients used to send and receive messages to queues and topics. The MessagingFactory class has several static factory methods to create a MessagingFactory instance which can be used to create any of the following client objects:

The QueueClient class exposes synchronous and asynchronous methods for the creation of the MessageSender and MessageReceiver objects used to send messages to and receive messages from a queue. It also exposes an AcceptSessionReceiver() method to create a SessionReceiver instance used to receive messages from a queue implementing message sessions.

The TopicClient and SubscriptionClient classes support topics. The TopicClient class is used to create the MessageSender used to send messages to a topic. The SubscriptionClient class exposes methods to create the MessageReceiver and SessionReceiver used to retrieve messages or session messages from a specified subscription.  The SubscriptionClient also exposes various AddRule() methods allowing either a RuleDescription, containing a FilterExpression /FilterAction pair, or merely a FilterExpression to be specified for the subscription.

The following example shows the creation of a MessageSender via a QueueClient and MessagingFactory. The MessageSender is used to send 10 messages to the queue. Each message contains a simple body and has a single property named SomeKey.

public static void AddMessagesToQueue(string queueName)
{
String serviceNamespace = “cromore”;
String issuer = “owner”;
String key = “Base64 encoded key”;

SharedSecretCredential credential =
TransportClientCredentialBase.CreateSharedSecretCredential(issuer, key);
Uri serviceBusUri =
ServiceBusEnvironment.CreateServiceUri(“sb”, serviceNamespace, String.Empty);

MessagingFactory messagingFactory =
MessagingFactory.Create(serviceBusUri, credential);

QueueClient queueClient = messagingFactory.CreateQueueClient(queueName);

MessageSender messageSender = queueClient.CreateSender();
for (Int32 i = 0; i < 10; i++)
{
String messageContent = String.Format(“Message{0}”, i);
BrokeredMessage message = BrokeredMessage.CreateMessage(messageContent);
message.Properties.Add(“SomeKey”, “SomeValue”);
messageSender.Send(message);
}
}

The following example shows the creation of a MessageReceiver via a QueueClient and MessagingFactory. The MessageReceiver is used to receive 10 messages from the queue. The GetBody<T>() method is used to get the content of each message. Since the MessageReceiver is configured to use the default PeekLock receive mode, the Complete() method is used to delete the message after it is used.

public static void GetMessages(string queueName)
{
String serviceNamespace = “cromore”;
String issuer = “owner”;
String key = “Base64 encoded key”;

SharedSecretCredential credential =
TransportClientCredentialBase.CreateSharedSecretCredential(issuer, key);
Uri serviceBusUri =
ServiceBusEnvironment.CreateServiceUri(“sb”, serviceNamespace, String.Empty);

MessagingFactory messagingFactory =
MessagingFactory.Create(serviceBusUri, credential);

QueueClient queueClient = messagingFactory.CreateQueueClient(queueName);

MessageReceiver messageReceiver = queueClient.CreateReceiver();
for (Int32 i = 0; i < 10; i++)
{
BrokeredMessage message = messageReceiver.Receive();
String messageContent = message.GetBody<String>();
message.Complete();
}
}

BrokeredMessage

The MessageSender, MessageReceiver and SessionReceiver all use the BrokeredMessage class to represent a message.  BrokeredMessage is declared as follows:

public sealed class BrokeredMessage : IXmlSerializable, IDisposable {
// Properties
public String ContentType { get; set; }
public String CorrelationId { get; set; }
public Int32 DeliveryCount { get; }
public DateTime EnqueuedTimeUtc { get; set; }
public DateTime ExpiresAtUtc { get; }
public String Label { get; set; }
public DateTime LockedUntilUtc { get; }
public Guid LockToken { get; }
public String MessageId { get; set; }
public MessageReceipt MessageReceipt { get; }
public IDictionary<String,Object> Properties { get; }
public String ReplyTo { get; set; }
public String ReplyToSessionId { get; set; }
public DateTime ScheduledEnqueueTimeUtc { get; set; }
public Int64 SequenceNumber { get; }
public String SessionId { get; set; }
public Int64 Size { get; }
public TimeSpan TimeToLive { get; set; }
public String To { get; set; }

// Methods
public void Abandon();
public void Complete();
public static BrokeredMessage CreateMessage(
Stream messageBodyStream, Boolean ownsStream);
public static BrokeredMessage CreateMessage();
public static BrokeredMessage CreateMessage(
Object serializableObject, XmlObjectSerializer serializer);
public static BrokeredMessage CreateMessage(Object serializableObject);
public void DeadLetter();
public void Defer();
public T GetBody<T>(XmlObjectSerializer serializer);
public T GetBody<T>();

// Implemented Interfaces and Overridden Members
public void Dispose();
public override String ToString();
XmlSchema IXmlSerializable.GetSchema();
void IXmlSerializable.ReadXml(XmlReader reader);
void IXmlSerializable.WriteXml(XmlWriter writer);
}

Note that there are also asynchronous versions of Abandon(), Complete(), DeadLetter() and Defer(). These methods are used to modify the current status of the message on the queue when the PeekLock receive mode is being used – and are not used with the ReceiveAndDelete receive mode. Abandon() gives up the message lock allowing another susbscriber to retrieve the message. Complete() indicates that message processing is complete and that the message can be deleted. DeadLetter() moves the message to the dead letter subqueue. Defer() indicates that the queue should set the message aside for later processing. The MessageReceipt for the message can be passed into Receive() or TryReceive() to retrieve a deferred message.

The dead letter subqueue is used to store messages for which normal processing was not possible for some reason.  A MessageReceiver for the dead letter subqueue can be created by passing the special sub-queue name, $DeadLetterQueue, into the CreateReceiver() method. Message processing can be deferred for various reasons, e.g. low priority.

The properties of the BrokeredMessage class are pretty much self-explanatory. The ReplyTo and ReplyToSessionId properties are set by a message sender so that the message receiver can respond by sending a response message, with a specific SessionId, to a specific queue.

Subscriptions

Once a topic has been created it is necessary to add one or more subscriptions to it since, although senders send messages to the topic, subscribers receive messages from a subscription not the topic.  The Topic.AddSubscription() method is used to add a subscription to a topic. Each subscription is identified by a unique name. When the subscription is added, a RuleDescription can be used to specify a filter and action for the subscription.

The Microsoft.ServiceBus.Messaging.Filters namespace contains the following classes, derived from FilterExpression, which can be used to implement the filter for a subscription:

When a FilterExpression is configured for a subscription the only messages that can be retrieved are those satisfying the filter. The CorrelationFilterExpression provides a simple filter on a specified value of the CorrelationFilterId. The MatchAllFilterExpression allows the retrieval of all messages. The MatchNoneFilterExpression prevents the retrieval of any messages. The SqlFilterExpression allows the values of the properties in BrokeredMessage.Properties to be compared using some SQL92 primitives such as =, < and LIKE. Multiple comparisons may be joined with AND and OR.

The namespace also contains the FilterAction and SqlFilterAction classes which are used to specify an action that can be applied to the BrokeredMessage when it is retrieved. The SqlFilterAction supports the use of SQL 92 expressions to modify the values of the properties in BrokeredMessage.Properties.

The following example shows the addition of various subscriptions to a topic:

public static void AddSubscriptionToTopic(String topicName)
{
String serviceNamespace = “cromore”;
String issuer = “owner”;
String key = “Base64 encoded key”;

SharedSecretCredential credential =
TransportClientCredentialBase.CreateSharedSecretCredential(issuer, key);
Uri serviceBusUri =
ServiceBusEnvironment.CreateServiceUri(“sb”, serviceNamespace, String.Empty);

ServiceBusNamespaceClient namespaceClient =
new ServiceBusNamespaceClient(serviceBusUri, credential);
Topic topic = namespaceClient.GetTopic(topicName);

RuleDescription sqlFilterRule = new RuleDescription()
{
FilterAction = new SqlFilterAction(“set defer = ‘yes’;”),
FilterExpression = new SqlFilterExpression(“priority < 3”)
};
Subscription SqlFilterSubscription =
topic.AddSubscription(“SqlFilterSubscription”, sqlFilterRule);

RuleDescription correlationFilterRule = new RuleDescription()
{
FilterAction = new SqlFilterAction(“set defer = ‘no’;”),
FilterExpression = new CorrelationFilterExpression(“odd”)
};
Subscription correlationFilterSubscription =
topic.AddSubscription(“correlationFilterSubscription”, correlationFilterRule);

RuleDescription matchAllRule = new RuleDescription()
{
FilterAction = new SqlFilterAction(“set defer = ‘no’;”),
FilterExpression = new MatchAllFilterExpression()
};
Subscription matchAllFilterSubscription =
topic.AddSubscription(“matchAllFilterSubscription”, matchAllRule);

RuleDescription matchNoneRule = new RuleDescription()
{
FilterAction = new SqlFilterAction(“set defer = ‘yes’;”),
FilterExpression = new MatchNoneFilterExpression()
};
Subscription matchNoneFilterSubscription =
topic.AddSubscription(“matchNoneFilterSubscription”, matchNoneRule);

Subscription defaultSubscription = topic.AddSubscription(“defaultSubscription”);
}

The sqlFilterRule has a filter on the value of the priority property, and has an action setting the defer property to low. The correlationFilterRule has a filter on the value of the CorrelationId for the message, and has an action setting the defer property to low. The matchAllRule and the matchNoneRule match all and no messages respectively. Finally, the defaultSubscription allows all messages to be retrieved, essentially replicating the functionality of a normal queue subscription.

Retrieving Messages from a Subscription

The following example shows how to retrieve messages from a subscription:

public static void GetMessagesFromSubscription(
String topicName, String subscriptionName)
{
String serviceNamespace = ConfigurationManager.AppSettings[“ServiceNamespace”];
String issuer = ConfigurationManager.AppSettings[“DefaultIssuer”];
String key = ConfigurationManager.AppSettings[“DefaultKey”];

SharedSecretCredential credential =
TransportClientCredentialBase.CreateSharedSecretCredential(issuer, key);
Uri serviceBusUri =
ServiceBusEnvironment.CreateServiceUri(“sb”, serviceNamespace, String.Empty);

MessagingFactory messagingFactory =
MessagingFactory.Create(serviceBusUri, credential);

SubscriptionClient subscriptionClient =
messagingFactory.CreateSubscriptionClient(topicName, subscriptionName);

MessageReceiver messageReceiver =
subscriptionClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);

for (Int32 i = 0; i < 5; i++)
{
BrokeredMessage message;
messageReceiver.TryReceive(out message);
if (message != null)
{
Task task = message.GetBody<Task>();
String messageContent = message.GetBody<String>();
}
}
}

In this example, we create a SubscriptionClient for a specified topic and subscription. We then create a MessageReceiver using the ReceiveAndDelete receive mode. This mode automatically deletes a message from the subscription as soon as it is retrieved. This can lead to message loss if the subscriber fails while processing the message.

Conclusion

Windows Azure AppFabric Service Bus Messaging seems to be a very powerful addition to Windows Azure. It will be interesting to see both how the technology develops and how it gets used in the field.

About Neil Mackenzie

Cloud Solutions Architect. Microsoft
This entry was posted in Azure AppFabric, Brokered Messaging, Service Bus, Windows Azure and tagged , , . Bookmark the permalink.