Graduate Program KB

  • Two types of queue mechanisms: Service Bus queues and Storage queues
  • Store large number of messages
  • Access messages from anywhere in the world via authenticated calls using HTTP or HTTPS

Choosing a Message Queue Solution

  • Use cases for Service Bus queues:
    • Solution needs to receive messages without having to poll the queue
    • Requires the queue to provide a guaranteed first-in-first-out (FIFO) ordered delivery
    • Needs to support automatic duplicate detection
    • Want app to process messages as parallel long-running streams (messages are associated with a stream using the session ID property on the message)
    • Requires transactional behaviour and atomicity when sending or receiving multiple messages from a queue
    • App handles messages exceeding 64 KB but likely won't approach 256 KB or 1-MB limit
    • Deal with a requirement to provide RBAC model to the queues and different rights/permissions for senders and receivers
  • Use cases for Storage queues:
    • App must store over 80 GBs of messages in a queue
    • App wants to track progress for processing a message in the queue (useful if worker processing a message crashes)
    • Require server side logs of all transactions executed against your queues

Azure Service Bus

  • Fully managed enterprise message broke with message queues and publish-subscribe topics
  • A message is a container with metadata and data (can be anything including structured data such as JSON, XML, Apache Avro and Plain Text)
  • Common scenarios:
    • Messaging
    • Decoupling applications
    • Topics and subscriptions
    • Message sessions
  • Tiers:
    • Basic, standard and premium (recommended for production)
    • Standard:
      • Variable throughput, variable latency
      • Pay as you go variable pricing
      • No scaling
      • Message size up to 256 KB
    • Premium:
      • High throughput and predictable performance
      • Fixed pricing
      • Ability to scale workload up and down
      • Message size up to 100 MB
  • Advanced features:
    • Message sessions
    • Autoforwarding
    • Dead-letter queue
    • Scheduled delivery
    • Message deferral
    • Transactions
    • Filtering and actions
    • Autodelete on idle
    • Duplicate detection
    • Security protocols
    • Geo-disaster recovery
    • Security
  • Compliance with standards and protocols:
    • Primary wire protocol is AMQP 1.0 which is an open ISO/IEC standard
    • With premium tier, it's fully compliant with the Java/Jakarta EE Java Message Service 2.0 API

Queues, Topics and Subscriptions

  • Messaging entities forming the core messaging capabilities
  • Queues:
    • Offer FIFO message delivery
    • Benefit of load-leveling, enabling producers and consumers to send and receive messages at different rates
    • Provide loose coupling between components by using queues to intermediate between message producers and consumers
    • Can create queues using Azure portal, PowerShell, CLI or Resource Manager templates. Then, send and receive messages using clients written with code
  • Receive modes
    • There are two modes for receiving messages
    • Receive and delete:
      • Marks receive request from consumer as consumed and returns it to the consumer app
    • Peek lock:
      • Finds next message to be consumed, locks it to prevent other consumers from receiving it and then returns it to the app
      • After the app processes the message, it requests Service Bus to complete the second stage of the receive process, marking the message as consumed
  • Topics and subscriptions
    • Subscriptions can use more filters to restrict the messages they want to receive
    • Message-sending functionality of a queue maps directly to a topic and its message-receiving functionality maps to a subscription
  • Rules and actions
    • Configure subscriptions to find messages with desired properties then perform certain modifications to those properties
    • Such filtering is accomplished using subscription filters called filter actions

Service Bus Payloads and Serialisation

  • Messages carry a payload and metadata
  • Broker properties are system defined, these either control message-level functionality inside the broker or map to common and standardised metadata items
    • User properties are a collection of key-value pairs defined and set by the application
  • Message routing and correlation
    • Broker properties specifically To, ReplyTo, ReplyToSessionId, MessageId, CorrelationId and SessionId help apps route messages to particular destinations
    • Routing patterns:
      • Simple request/reply: Publisher sends message into queue and expect reply from the message consumer
      • Multicast request/reply: Variation of prior pattern, publisher sends message into a topic and multiple subscribers become eligible to consume the message
      • Multiplexing: Session feature enables multiplexing of streams of related messages through a single queue or subscription such that each session or group of related messages identified by the matching SessionId values are routed to a specific receiver while the receiver holds the session under lock
      • Multiplexed request/reply: Session feature enables multiplexed replies, allowing several publishers to share a reply queue
  • Payload serialisation
    • When the payload is in transit or stored in of Service Bus, it's always an opaque, binary block
    • The ContentType property enables apps to describe the payload with the suggested format for property values being a MIME content-type description according to IETF RFC2045 (ex. application/json;charset=utf-8)

Azure Queue Storage

  • Service for storing large number of messages
  • Access messages anywhere via authenticated calls using HTTP or HTTPS
  • Queue can be 64 KB in size
  • Components:
    • Contains a storage account and queue
    • URL format: Queues addressable using the URL format https://<storageaccount>.queue.core.windows.net/images-to-download
    • Storage account: All access to Azure Storage is done through a storage account
    • Queue: Queue contains a set of messages, all messages must be in queue (queue name must be lowercase)
    • Message: A message in any format up to 64 KB. Default TTL is 7 days (configurable)

Create and Manage Azure Queue Storage and messages by using .NET

  • Create Queue service client
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
  • Create a queue
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];
    
    // Instantiate a QueueClient which will be used to create and manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
    // Create the queue
    queueClient.CreateIfNotExists();
    
  • Insert a message into a queue
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];
    
    // Instantiate a QueueClient which will be used to create and manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
    // Create the queue if it doesn't already exist
    queueClient.CreateIfNotExists();
    
    if (queueClient.Exists())
    {
        // Send a message to the queue
        queueClient.SendMessage(message);
    }
    
  • Peek at the next message
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];
    
    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
    if (queueClient.Exists())
    { 
        // Peek at the next message
        PeekedMessage[] peekedMessage = queueClient.PeekMessages();
    }
    
  • Change the contents of a queued message
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];
    
    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
    if (queueClient.Exists())
    {
        // Get the message from the queue
        QueueMessage[] message = queueClient.ReceiveMessages();
    
        // Update the message contents
        queueClient.UpdateMessage(message[0].MessageId, 
                message[0].PopReceipt, 
                "Updated contents",
                TimeSpan.FromSeconds(60.0)  // Make it invisible for another 60 seconds
            );
    }
    
  • Dequeue the next message
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];
    
    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
    if (queueClient.Exists())
    {
        // Get the next message
        QueueMessage[] retrievedMessage = queueClient.ReceiveMessages();
    
        // Process (i.e. print) the message in less than 30 seconds
        Console.WriteLine($"Dequeued message: '{retrievedMessage[0].Body}'");
    
        // Delete the message
        queueClient.DeleteMessage(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
    }
    
  • Get the queue length
    /// Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
    if (queueClient.Exists())
    {
        QueueProperties properties = queueClient.GetProperties();
    
        // Retrieve the cached approximate message count.
        int cachedMessagesCount = properties.ApproximateMessagesCount;
    
        // Display number of messages.
        Console.WriteLine($"Number of messages in queue: {cachedMessagesCount}");
    }
    
  • Delete a queue
    /// Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];
    
    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
    
    if (queueClient.Exists())
    {
        // Delete the queue
        queueClient.Delete();
    }