Graduate Program KB

Azure Event Grid

  • Highly scalable, fully managed Pub Sub message distribution service
  • Offers flexible message consumption patterns using HTTP and Message Queuing Telemetry Transport (MQTT)
  • Build data pipelines with device data, integrate apps and build event-driven serverless architectures
  • Configured to send events to subscribers (push delivery) or subscribers can connect to Event Grid to read events (pull delivery)

Concepts in Azure Event Grid

  • Publishers
    • The app that sends events to Event Grid (can be same app where the events originated which is called the event source)
    • A partner is a type of publisher that sends events from its system to make them available to Azure customers
    • Partners can also receive events from Event Grid, these features are enabled through the Partner Events feature
  • Events and CloudEvents
    • Events are the smallest amount of information that fully describes something that occurred in a system
    • Common information include source, time, a unique ID and specific information relevant to the type of event
    • Event Grid conforms to Cloud Native Computing Foundation's open standard CloudEvents 1.0 specification using HTTP protocol binding with JSON format
    • The maximum allowed event size is 1 MB, events over 64 KB are charged in 64-KB increments
    {
        "specversion" : "1.0",
        "type" : "com.yourcompany.order.created",
        "source" : "https://yourcompany.com/orders/",
        "subject" : "O-28964",
        "id" : "A234-1234-1234",
        "time" : "2018-04-05T17:31:00Z",
        "comexampleextension1" : "value",
        "comexampleothervalue" : 5,
        "datacontenttype" : "application/json",
        "data" : {
        "orderId" : "O-28964",
        "URL" : "https://com.yourcompany/orders/O-28964"
        }
    }
    
  • Event sources
    • The place where the event happens, relating to one or more event types
    • Ex. Azure Storage is the event source for blob created events and IoT Hub is the event source for device created events
    • Your app is the event source for custom events that you define, event sources are responsible for sending events to Event Grid
  • Topics
    • Topics hold events published to Event Grid
    • Typically use a topic resource for a collection of related events
    • Subscribers decide which topics to subscribe to
    • System topics: Built-in topics provided by Azure. Publisher owns the topics but you can subscribe to them
    • Custom topics: Application and third-party topics. When you create or are assigned access to a custom topic, you see it in your subscription
    • Partner topics: Used to subscribe to events published by a partner (enabled through Partner Events)
  • Event subscriptions
    • Subscriptions tell Event Grid which events on a topic you're interested in receiving
  • Event handlers
    • From Event Grid's perspective, an event handler is the place where the event is sent, the handler takes some further action to process the event
    • There are several handler types, Event Grid follows different mechanism to guarantee the delivery of the event depending on the handler
  • Security
    • Event Grid provides security for subscribing to topics and when publishing events to topics
    • Must have adequate permissions on the topic to subscribe
    • For push delivery, the handler is an Azure service and a managed identity is used to authenticate Event Grid (with an appropriate RBAC role)

Event Schemas

  • Two types of event schemas: Event Grid event schema and Cloud event schema
  • Events consist of a set of four required string properties
  • Data objects have properties specific to each publisher
  • Event sources send events to Event Grid in an array, the array of events can have a total size up to 1 MB
    • If an event or the array is greater than the limit then you receive the response 413 Payload Too Large
    • Operations are charged in 64 KB increments after 64 KB
  • Example of event schema showing properties used by all event publishers
    [
        {
            "topic": string,
            "subject": string,
            "id": string,
            "eventType": string,
            "eventTime": string,
            "data":{
            object-unique-to-each-publisher
            },
            "dataVersion": string,
            "metadataVersion": string
        }
    ]
    
  • Event properties:
    • topic (string): Full resource path to event source
    • subject (string, required): Publisher-defined path to the event subject
    • eventType (string, required): One of the registered event types for this event source
    • eventTime (string, required): Time of event generation based on provider's UTC time
    • id (string, required): Unique identifier for the event
    • data (object): Event data specific to the resource provider
    • dataVersion (string): Schema version of the data object, publisher defines the schema version
    • metadataVersion (string): The schema version of the event metadata. Event Grid defines the schema of the top-level properties (value provided by Event Grid)
  • CloudEvents is an open specification for describing event data
    • Simplifies interoperability by providing a common event schema for publishing and consuming cloud based events
    • Example of an Azure Blob Storage event in CloudEvents format
    {
        "specversion": "1.0",
        "type": "Microsoft.Storage.BlobCreated",  
        "source": "/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}",
        "id": "9aeb0fdf-c01e-0131-0922-9eb54906e209",
        "time": "2019-11-18T15:13:39.4589254Z",
        "subject": "blobServices/default/containers/{storage-container}/blobs/{new-file}",
        "dataschema": "#",
        "data": {
            "api": "PutBlockList",
            "clientRequestId": "4c5dd7fb-2c48-4a27-bb30-5361b5de920a",
            "requestId": "9aeb0fdf-c01e-0131-0922-9eb549000000",
            "eTag": "0x8D76C39E4407333",
            "contentType": "image/png",
            "contentLength": 30699,
            "blobType": "BlockBlob",
            "url": "https://gridtesting.blob.core.windows.net/testcontainer/{new-file}",
            "sequencer": "000000000000000000000000000099240000000000c41c18",
            "storageDiagnostics": {
                "batchId": "681fe319-3006-00a8-0022-9e7cde000000"
            }
        }
    }
    
    • Description of fields
    • Header values between CloudEvents and Event Grid schemas are same except for content-type
      • CloudEvents: "content-type":"application/cloudevents+json; charset=utf-8"
      • Event Grid: "content-type":"application/json; charset=utf-8"

Event Delivery Durability

  • Provides durable delivery, tries to deliver each event at least once for each matching subscription immediately
  • Retries based on fixed retry schedule and retry policy if there's a failure or doesn't acknowledge a subscriber's event receipt
  • By default, Event Grid delivers one event at a time to the subscribers and the payload is an array of one event

Retry Schedule

  • Decides whether it should retry the delivery, dead-letter the event or drop the event based on the type of the error
  • If the error returned by the subscriber endpoint is configuration related then it can't be fixed with retries, performing either the other two actions
  • Types of endpoints and their errors for which a retry doesn't occur:
    • Azure Resources: 400 (Bad request), 413 (Request entity is too large)
    • Webhook: 400 (Bad request), 413 (Request entity is too large), 401 (Unauthorized)

Retry Policy

  • Customise retry policy by using the two configurations:
    • Maximum number of attempts: Value must be an integer between 1 and 30 (default is 30)
    • Event time-to-live (TTL): Value must be an integer between 1 and 1440 (default is 1440 minutes)
    az eventgrid event-subscription create \
        -g gridResourceGroup \
        --topic-name <topic_name> \
        --name <event_subscription_name> \
        --endpoint <endpoint_URL> \
        --max-delivery-attempts 18
    

Output Batching

  • Configure Event Grid to batch events for delivery for improved HTTP performance in high-throughput scenarios
  • Batched delivery has two settings:
    • Max events per batch: Maximum number of events Event Grid delivers per batch
    • Preferred batch size in kilobytes: Target ceiling for batch size in kilobytes

Delayed Delivery

  • As endpoints experience delivery failures, Event Grid begins to delay the delivery and retry of events to that endpoint
  • Purpose of delayed delivery is to protect unhealthy endpoints and the Event Grid system
    • Retry policy and volume capabilities can easily overwhelm a system without delayed deliveries

Dead-letter Events

  • The undelivered event can be sent if a delivery times out or exceeds the retry limit
  • There's a 5 minute delay between the last attempt to deliver an event and delivery to the dead-letter location

Custom Delivery Properties

  • Event subscriptions allow you to set up HTTP headers that included in delivered events
  • Allows you to set custom headers that are required by a destination
  • Can set up to 10 headers when creating an event subscription (max. 4096 bytes per header)
  • Can set custom headers on events delivered to following destinations: WebHooks, Azure Service Bus topics and queues, Azure Event Hubs and Relay Hybrid Connection

Control Access to Events

  • Event Grid allows you to control the level of access given to different users to do various management operations such as list event subscriptions, create new ones and generate keys
  • Event Grid uses RBAC
  • Built-in roles:
    • Event Grid Subscription Reader: Lets you read Event Grid event subscriptions
    • Event Grid Subscription Contributor: Lets you manage Event Grid event subscription operations
    • Event Grid Contributor: Lets you create and manage Event Grid resources
    • Event Grid Data Sender: Lets you send events to Event Grid topics
  • Permissions for event subscriptions
    • Event handlers that isn't a WebHook requires write access to that resource, this permission check prevents unauthorised users from sending events to your resource
    • Must have Microsoft.EventGrid/EventSubscriptions/Write permission on the resource that is the event source
      • Need this permission because you're writing a new subscription at the scope of the resource
      • Required resource differs based on whether you're subscribing to a system topic or custom topic
        • System topics: Need permission to write a new event subscription at the scope of the resource publishing the event. The format of the resource is: /subscriptions/{subscription-id}/resourceGroups/{resource-group-name}/providers/{resource-provider}/{resource-type}/{resource-name}
        • Custom topics: Need permission to write a new event subscription at the scope of the event grid topic. The format of the resource is: /subscriptions/{subscription-id}/resourceGroups/{resource-group-name}/providers/Microsoft.EventGrid/topics/{topic-name}

Receive Events by Using WebHooks

  • Azure Event Grid uses webhooks to deliver events by sending an HTTP POST request to the configured endpoint
  • To prevent malicious use, Event Grid requires proof of ownership of the webhook endpoint before delivering events
  • Azure infrastructure automatically validates webhook endpoints for these services:
    • Azure Logic Apps (Event Grid Connector)
    • Azure Automation (via webhook)
    • Azure Functions (Event Grid Trigger)

Endpoint Validation with Event Grid events

  • Validation handshakes are required for non-automatic endpoints (ex. HTTP-triggered Azure Functions)
    • Synchronous handshake:
      • Event Grid sends a validation event during subscription creation, it contains a validationCode property
      • The endpoint must respond synchronously with the validationCode
      • Supported in all Event Grid versions
    • Asynchronous handshake (manual):
      • Applicable for tools or third-party services (ex. Zapier, IFTTT) unable to respond synchronously
      • Validation event includes a validationUrl property.
      • Steps:
        • Perform an HTTP GET request to the validationUrl within 5 minutes
        • Provisioning state remains AwaitingManualAction until completed
        • Failure to complete within 5 minutes results in a Failed state, requiring a new subscription
      • Endpoint must initially return HTTP 200 to acknowledge the validation event
  • Key requirements:
    • Endpoint must correctly handle and respond to POST requests from Event Grid
    • The URL for manual validation is valid for 5 minutes only

Filter Events

  • Three options for filtering when creating an event subscription:
    • Event types
    • Subject begins with or ends with
    • Advanced fields and operators
  • Event type filtering
    • By default, all event types for the source are sent to the endpoint
    • Can decide to send only certain event types to your endpoint
    "filter": {
        "includedEventTypes": [
            "Microsoft.Resources.ResourceWriteFailure",
            "Microsoft.Resources.ResourceWriteSuccess"
        ]
    }
    
  • Subject filtering
    • For filtering by subject, specify a starting or ending value for the subject
    "filter": {
        "subjectBeginsWith": "/blobServices/default/containers/mycontainer/log",
        "subjectEndsWith": ".jpg"
    }
    
  • Advanced filtering
    • Use advanced filtering option to filter values in the data fields by specifying the comparison operator
      • operator type: Type of comparison
      • key: Field in the event data you're using for filtering, can be number, boolean or string
      • value or values: The value or values to compare to the key
    "filter": {
        "advancedFilters": [
            {
            "operatorType": "NumberGreaterThanOrEquals",
            "key": "Data.Key1",
            "value": 5
            },
            {
            "operatorType": "StringContains",
            "key": "Subject",
            "values": ["container1", "container2"]
            }
        ]
    }
    

Azure Event Hubs

  • Native data-streaming service in the cloud that can stream millions of events per second with low latency from any source to any destination
  • Compatible with Apache Kafka, enables you to run Kafka workloads without any code changes
  • Key capabilities:
    • Apache Kafka on Azure Event Hubs
      • Event Hubs is a multi-protocol event streaming engine supporting Advanced Message Queuing Protocol (AMQP), Apache Kafka and HTTPS protocols
      • Don't need to set up, configure or manage Kafka clusters or use a Kafka-as-a-service offering not native to Azure
    • Schema Registry in Event Hubs
      • Provides a centralised repository for managing schemas of event streaming applications
      • Comes free with every Event Hubs namespace, integrates with your Kafka apps or Event Hubs SDK-based apps
    • Real-time processing of streaming events with Stream Analytics
      • Integrates with Azure Stream Analytics to enable real-time streaming processing
      • Built-in no-code editor to develop a Stream Analytics job by using drag-and-drop functionality without writing any code
  • Key concepts:
    • Producer applications: These apps can ingest data to an event hub using the Event Hubs SDKs or any Kafka producer client
    • Namespace: Management container for one or more event hubs or Kafka topics
    • Event Hubs/Kafka topic: In Event Hubs, can organise events in an event hub or a Kafka topic
    • Partitions: Used to scale an event hub
    • Consumer applications: These apps can consume data by seeking through the event log and maintaining a consumer offset
    • Consumer group: Logical group of consumer instances that reads data from an event hub or Kafka topic

Event Hubs Capture

  • Enables you to automatically capture the streaming data in Event Hubs in an Azure Blob storage or Azure Data Lake Storage account
  • Fast to set up and no administrative costs to run it
  • Scales automatically with Event Hubs throughput units in standard tier or processing units in premium tier
  • Can process real-time and batch-based pipelines on the same stream, meaning you can build solutions that grow with your needs over time
  • How it works:
    • Event Hubs are a durable, partitioned buffer for telemetry with a time-based retention period
    • Scales using a partitioned consumer model; partitions are consumed independently
    • Captures data to Azure Blob Storage or Data Lake in the same or different regions
    • Data is stored in Apache Avro format (compact, schema-rich and widely compatible)
  • Capture windowing
    • Configurable by size and time; first trigger activates capture
    • Data is stored in blobs named by capture interval timestamp using a defined path structure: {Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}
  • Scaling to throughput units
    • Traffic managed by throughput units: 1 MB/s or 1,000 events/s ingress per unit; double for egress
    • Standard: 1-20 units; more available with quota requests
    • Capture bypasses egress quotas, saving capacity for other readers

Scale Your Processing Application

  • Run multiple instances of your application to balance the load across them
  • Older versions used EventProcessorHost for load balancing and event checkpointing
  • Newer versions (5.0+) use:
    • EventProcessorClient (for .NET and Java)
    • EventHubConsumerClient (for Python and JavaScript)
  • Example scenario
    • Context: Monitors 100,000 homes using sensors
    • Event flow:
      • Sensors push data to an Event Hub (16 partitions)
      • Consumer reads, consolidates events and stores aggregates in a blob for web display
    • Key requirements:
      • Scaling: Use multiple consumers, each taking ownership of specific Event Hub partitions
      • Load balancing: Dynamically adjust consumers based on load, rebalance partitions among consumers when instances are added or removed
      • Fault tolerance: Handle failures by redistributing partitions to other consumers, ensure seamless resumption using checkpoints to avoid data loss
      • Event consumption: Process events to perform meaningful operations (ex. aggregate data), save aggregated results to Azure Blob Storage for use in a user-friendly web interface
  • Event processor or consumer client
    • SDKs provide functionality to build solutions meeting these requirements
    • Recommended to use event process client for reading and processing events
  • Partition ownership tracking
    • Event processor instance typically owns and processes events from one or more partitions
    • Ownership of partitions is evenly distributed by all active event processor instances associated with an event hub and consumer group combination
    • Each event processor has a unique ID and claims ownership of partitions by adding or updating an entry in a checkpoint store
  • Receive messages
    • When creating an event processor, specify functions that process events and errors
    • Recommended to do things relatively fast (do little processing as possible)
    • Writing to storage or doing routing should use two consumer groups and have two event processors
  • Checkpointing
    • Process by which an event processor marks or commits the position of the last successfully processed event within a partition
    • Typically done within the function that processes the events and occurs on a per-partition basis within a consumer group
  • Thread safety and processor instances
    • By default, the function processing the events is called sequentially for a given partition
    • Subsequent events and calls to this function from the same partition queue up as the event pump continues to run in the background on other threads

Control Access to Events

  • Event Hubs supports both Entra ID and shared access signatures for authentication and authorisation
  • Built-in roles for authorising access to Event Hubs data using Entra ID and OAuth:
    • Azure Event Hubs Data Owner: Use this role to give complete access to Event Hubs resources
    • Azure Event Hubs Data Sender: Use this role to give send access to Event Hubs resources
    • Azure Event Hubs Data Receiver: Use this role to give receiving access to Event Hubs resources
  • Authorise access with managed identities
    • Need to configure Azure RBAC settings for that managed identity
  • Authorise access with Microsoft identity platform
    • Entra authenticates the security principal running the app, returning an access token to authorise requests
  • Authorise access to Event Hubs publishers with SAS
    • Publisher can only send messages to event hub and not receive messages
    • Each Event Hubs client has a unique token that can only send to one publisher, multiple clients sharing the same token will share the same publisher
  • Authorise access to Event Hubs consumers with SAS
    • Event Hubs token authentication requires its clients to either have the manage rights or the listen privileges assigned to its Event Hubs namespace or event hub instance or topic

Common Operations with the Event Hubs client library

  • Inspect Event Hubs
    var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
    var eventHubName = "<< NAME OF THE EVENT HUB >>";
    
    await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
    {
        string[] partitionIds = await producer.GetPartitionIdsAsync();
    }
    
  • Publish events to Event Hubs
    var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
    var eventHubName = "<< NAME OF THE EVENT HUB >>";
    
    await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
    {
        using EventDataBatch eventBatch = await producer.CreateBatchAsync();
        eventBatch.TryAdd(new EventData(new BinaryData("First")));
        eventBatch.TryAdd(new EventData(new BinaryData("Second")));
    
        await producer.SendAsync(eventBatch);
    }
    
  • Read events from an Event Hubs
    var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
    var eventHubName = "<< NAME OF THE EVENT HUB >>";
    
    string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
    await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
    {
        using var cancellationSource = new CancellationTokenSource();
        cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
    
        await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
        {
            // At this point, the loop will wait for events to be available in the Event Hub. When an event
            // is available, the loop will iterate with the event that was received. Because we did not
            // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
            // the cancellation token.
        }
    }
    
  • Read events from an Event Hubs partition
    var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
    var eventHubName = "<< NAME OF THE EVENT HUB >>";
    
    string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
    await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
    {
        EventPosition startingPosition = EventPosition.Earliest;
        string partitionId = (await consumer.GetPartitionIdsAsync()).First();
    
        using var cancellationSource = new CancellationTokenSource();
        cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
    
        await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
        {
            // At this point, the loop will wait for events to be available in the partition. When an event
            // is available, the loop will iterate with the event that was received. Because we did not
            // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
            // the cancellation token.
        }
    }
    
  • Process events using an Event Processor client
    var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
    
    var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
    var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
    
    var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
    var eventHubName = "<< NAME OF THE EVENT HUB >>";
    var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
    
    Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
    Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
    
    var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
    var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
    
    processor.ProcessEventAsync += processEventHandler;
    processor.ProcessErrorAsync += processErrorHandler;
    
    await processor.StartProcessingAsync();
    
    try
    {
        // The processor performs its work in the background; block until cancellation
        // to allow processing to take place.
    
        await Task.Delay(Timeout.Infinite, cancellationSource.Token);
    }
    catch (TaskCanceledException)
    {
        // This is expected when the delay is canceled.
    }
    
    try
    {
        await processor.StopProcessingAsync();
    }
    finally
    {
        // To prevent leaks, the handlers should be removed when processing is complete.
    
        processor.ProcessEventAsync -= processEventHandler;
        processor.ProcessErrorAsync -= processErrorHandler;
    }