- Two types of queue mechanisms: Service Bus queues and Storage queues
- Store large number of messages
- Access messages from anywhere in the world via authenticated calls using HTTP or HTTPS
Choosing a Message Queue Solution
- Use cases for Service Bus queues:
- Solution needs to receive messages without having to poll the queue
- Requires the queue to provide a guaranteed first-in-first-out (FIFO) ordered delivery
- Needs to support automatic duplicate detection
- Want app to process messages as parallel long-running streams (messages are associated with a stream using the session ID property on the message)
- Requires transactional behaviour and atomicity when sending or receiving multiple messages from a queue
- App handles messages exceeding 64 KB but likely won't approach 256 KB or 1-MB limit
- Deal with a requirement to provide RBAC model to the queues and different rights/permissions for senders and receivers
- Use cases for Storage queues:
- App must store over 80 GBs of messages in a queue
- App wants to track progress for processing a message in the queue (useful if worker processing a message crashes)
- Require server side logs of all transactions executed against your queues
Azure Service Bus
- Fully managed enterprise message broke with message queues and publish-subscribe topics
- A message is a container with metadata and data (can be anything including structured data such as JSON, XML, Apache Avro and Plain Text)
- Common scenarios:
- Messaging
- Decoupling applications
- Topics and subscriptions
- Message sessions
- Tiers:
- Basic, standard and premium (recommended for production)
- Standard:
- Variable throughput, variable latency
- Pay as you go variable pricing
- No scaling
- Message size up to 256 KB
- Premium:
- High throughput and predictable performance
- Fixed pricing
- Ability to scale workload up and down
- Message size up to 100 MB
- Advanced features:
- Message sessions
- Autoforwarding
- Dead-letter queue
- Scheduled delivery
- Message deferral
- Transactions
- Filtering and actions
- Autodelete on idle
- Duplicate detection
- Security protocols
- Geo-disaster recovery
- Security
- Compliance with standards and protocols:
- Primary wire protocol is AMQP 1.0 which is an open ISO/IEC standard
- With premium tier, it's fully compliant with the Java/Jakarta EE Java Message Service 2.0 API
Queues, Topics and Subscriptions
- Messaging entities forming the core messaging capabilities
- Queues:
- Offer FIFO message delivery
- Benefit of load-leveling, enabling producers and consumers to send and receive messages at different rates
- Provide loose coupling between components by using queues to intermediate between message producers and consumers
- Can create queues using Azure portal, PowerShell, CLI or Resource Manager templates. Then, send and receive messages using clients written with code
- Receive modes
- There are two modes for receiving messages
- Receive and delete:
- Marks receive request from consumer as consumed and returns it to the consumer app
- Peek lock:
- Finds next message to be consumed, locks it to prevent other consumers from receiving it and then returns it to the app
- After the app processes the message, it requests Service Bus to complete the second stage of the receive process, marking the message as consumed
- Topics and subscriptions
- Subscriptions can use more filters to restrict the messages they want to receive
- Message-sending functionality of a queue maps directly to a topic and its message-receiving functionality maps to a subscription
- Rules and actions
- Configure subscriptions to find messages with desired properties then perform certain modifications to those properties
- Such filtering is accomplished using subscription filters called filter actions
Service Bus Payloads and Serialisation
- Messages carry a payload and metadata
- Broker properties are system defined, these either control message-level functionality inside the broker or map to common and standardised metadata items
- User properties are a collection of key-value pairs defined and set by the application
- Message routing and correlation
- Broker properties specifically To, ReplyTo, ReplyToSessionId, MessageId, CorrelationId and SessionId help apps route messages to particular destinations
- Routing patterns:
- Simple request/reply: Publisher sends message into queue and expect reply from the message consumer
- Multicast request/reply: Variation of prior pattern, publisher sends message into a topic and multiple subscribers become eligible to consume the message
- Multiplexing: Session feature enables multiplexing of streams of related messages through a single queue or subscription such that each session or group of related messages identified by the matching SessionId values are routed to a specific receiver while the receiver holds the session under lock
- Multiplexed request/reply: Session feature enables multiplexed replies, allowing several publishers to share a reply queue
- Payload serialisation
- When the payload is in transit or stored in of Service Bus, it's always an opaque, binary block
- The ContentType property enables apps to describe the payload with the suggested format for property values being a MIME content-type description according to IETF RFC2045 (ex. application/json;charset=utf-8)
Azure Queue Storage
- Service for storing large number of messages
- Access messages anywhere via authenticated calls using HTTP or HTTPS
- Queue can be 64 KB in size
- Components:
- Contains a storage account and queue
- URL format: Queues addressable using the URL format https://<storageaccount>.queue.core.windows.net/images-to-download
- Storage account: All access to Azure Storage is done through a storage account
- Queue: Queue contains a set of messages, all messages must be in queue (queue name must be lowercase)
- Message: A message in any format up to 64 KB. Default TTL is 7 days (configurable)
Create and Manage Azure Queue Storage and messages by using .NET
- Create Queue service client
QueueClient queueClient = new QueueClient(connectionString, queueName);
- Create a queue
// Get the connection string from app settings string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"]; // Instantiate a QueueClient which will be used to create and manipulate the queue QueueClient queueClient = new QueueClient(connectionString, queueName); // Create the queue queueClient.CreateIfNotExists();
- Insert a message into a queue
// Get the connection string from app settings string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"]; // Instantiate a QueueClient which will be used to create and manipulate the queue QueueClient queueClient = new QueueClient(connectionString, queueName); // Create the queue if it doesn't already exist queueClient.CreateIfNotExists(); if (queueClient.Exists()) { // Send a message to the queue queueClient.SendMessage(message); }
- Peek at the next message
// Get the connection string from app settings string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"]; // Instantiate a QueueClient which will be used to manipulate the queue QueueClient queueClient = new QueueClient(connectionString, queueName); if (queueClient.Exists()) { // Peek at the next message PeekedMessage[] peekedMessage = queueClient.PeekMessages(); }
- Change the contents of a queued message
// Get the connection string from app settings string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"]; // Instantiate a QueueClient which will be used to manipulate the queue QueueClient queueClient = new QueueClient(connectionString, queueName); if (queueClient.Exists()) { // Get the message from the queue QueueMessage[] message = queueClient.ReceiveMessages(); // Update the message contents queueClient.UpdateMessage(message[0].MessageId, message[0].PopReceipt, "Updated contents", TimeSpan.FromSeconds(60.0) // Make it invisible for another 60 seconds ); }
- Dequeue the next message
// Get the connection string from app settings string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"]; // Instantiate a QueueClient which will be used to manipulate the queue QueueClient queueClient = new QueueClient(connectionString, queueName); if (queueClient.Exists()) { // Get the next message QueueMessage[] retrievedMessage = queueClient.ReceiveMessages(); // Process (i.e. print) the message in less than 30 seconds Console.WriteLine($"Dequeued message: '{retrievedMessage[0].Body}'"); // Delete the message queueClient.DeleteMessage(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt); }
- Get the queue length
/// Instantiate a QueueClient which will be used to manipulate the queue QueueClient queueClient = new QueueClient(connectionString, queueName); if (queueClient.Exists()) { QueueProperties properties = queueClient.GetProperties(); // Retrieve the cached approximate message count. int cachedMessagesCount = properties.ApproximateMessagesCount; // Display number of messages. Console.WriteLine($"Number of messages in queue: {cachedMessagesCount}"); }
- Delete a queue
/// Get the connection string from app settings string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"]; // Instantiate a QueueClient which will be used to manipulate the queue QueueClient queueClient = new QueueClient(connectionString, queueName); if (queueClient.Exists()) { // Delete the queue queueClient.Delete(); }