Azure Cosmos DB
- Azure Cosmos DB account is the fundamental unit of global distribution and high availability
- Contains a unique DNS
- Manageable via portal, CLI or different SDKs
- Globally distribute data across multiple regions
- Azure Cosmos DB partitions your container using a provided logical partition key to elastically scale throughput and storage
- Create a maximum of 50 accounts under a subscription (increase via support request)
- Data can be managed by creating databases, containers and items
- Databases:
- Create one or more Azure Cosmos DB databases under an account
- Analogous to a namespace, a unit of management for a set of Azure Cosmos DB containers
- Containers:
- Data is stored here, scaling out as required
- Configurable throughput:
- Dedicated: Throughput on a container is exclusively reserved for that container. Two types of dedicated throughput (standard and autoscale)
- Shared: Throughput is specified at the database level and shared with up to 25 containers within the database (excludes containers configured with their own dedicated throughput)
- Items:
- Depending on API, data entities can be represented in different ways
- NoSQL: Item
- Cassandra: Row
- MongoDB: Document
- Gremlin: Node or edge
- Table: Item
Consistency Levels
- Different data consistency options, trading off availability and performance
- Strong, Bounded staleness, Session, Consistent prefix, Eventual
- From left (strong) to right (weak): Higher availability, lower latency, higher throughput
- Consistency levels are region-agnostic and guaranteed for all operations, regardless of:
- Region where read and writes are saved
- Number of regions associated with your Azure Cosmos DB account
- Whether your account is configured with a single or multiple write regions
Choosing The Right Consistency Level
- Each consistency model provides precise availability and performance tradeoffs backed by comprehensive SLAs
- Guaranteed 100% of read requests meet the consistency level selected
- Default:
- Applied to all Cosmos databases and containers under the account
- Reads and queries issued against a container or database use the specified consistency level by default
- Read consistency applies to a single read operation within a logical partition, it can be issued by a remote client or stored procedure
- Strong:
- Offers linearisability guarantee, referring to serving requests concurrently
- Clients never see an uncommitted or partial write, users are guaranteed to read the latest committed write
- Bounded staleness:
- Maximum staleness of a data in any region can be configured in two ways:
- Number of versions of the item
- The time interval reads might lag behind the writes
- Useful for single-region write accounts with two or more regions
- If the data lag in a region exceeds the configured staleness value, writes for that partition are throttled until staleness is back within the configured upper bound
- Provides same consistency as Session and Eventual Consistency
- Data is replicated to a local majority (3 replicas within a 4 replica set) in the single region
- Maximum staleness of a data in any region can be configured in two ways:
- Session:
- Reads guaranteed to honor your read-your-writes and write-follows-reads guarantee. It assumes a single "writer" session or sharing the session token for multiple writers
- Writes are replicated to a minimum of 3 replicas (in a 4 replica set) in the local region, with asynchronous replication to all other regions
- Consistent prefix:
- Updates made as single document writes see eventual consistency
- Updates made as a batch within a transaction are returned consistent to the transaction in which they're committed
- Write operations within a transaction of multiple documents are always visible together
- Eventual:
- No ordering guarantee for reads
- In the absence of further writes, replicas eventually converge
- Weakest form of consistency because a client might read values that are older than the ones it previously read
- Ideal when the application doesn't require ordering guarantees (Retweets, Likes, non-threaded comments)
Supported APIs
- Considerations when choosing:
- NoSQL is native to Azure Cosmos DB
- MongoDB, PostgreSQL, Cassandra, Gremlin and Table implement the wire protocol of open-source database engines, best suited if the following are true:
- Have existing MongoDB, PostgreSQL, Cassandra or Gremlin applications
- Don't want to rewrite your entire data access layer
- Want to use the open-source developer ecosystem, client-drivers, expertise and resources for your database
- NoSQL:
- Stores as document format
- Full control over the interface, service and SDK client libraries
- Any new feature that is rolled out to Cosmos DB is first available on API for NoSQL accounts
- MongoDB:
- Stores as document structure using JSON format
- Compatible with MongoDB wire protocol but doesn't use any MongoDB related code
- PostgreSQL:
- Managed service for running PostgreSQL at scale with the Citus open source superpower of distributed tables
- Stores data either on a single node or distributed in a multi-node configuration
- Apache Cassandra:
- Stores data in column-oriented schema
- Offers highly distributed, horizontally scaling approach to storing large volumes of data
- Apache Gremlin:
- Allows users to make graph queries and stores data as edges and vertices
- Useful for scenarios involving:
- Dynamic data
- Data with complex relations
- Data that is too complex to be modeled with relational databases
- Want to use the existing Gremlin ecosystem and skills
- Table:
- Stores data in key/value format
- API for Table overcomes the limitations in latency, scaling, throughput, global distribution, index management and low query performance
- Only supports OLTP scenarios
Request Units
- In Azure Cosmos DB, you pay for throughput you provision and the storage consumed on an hourly basis
- Cost of all database operations is expressed by request units, representing a unit of system resources required to perform operations
- The type of Cosmos account determines the way RUs get charged, there are 3 modes:
- Provisioned throughput mode: Provision RUs for your application on a per=second basis in increments of 100 RUs per second
- Serverless mode: Don't have to provision throughput when creating resources in the Cosmos account, you get billed at the end of the period
- Autoscale mode: Automatically and instantly scale throughput of your database or container based on usage. It doesn't affect availability, latency, throughput or performance of the workload
Microsoft .NET SDK v3 for Azure Cosmos DB
- Since Cosmos supports multiple API models, it uses the generic terms container and item
- A container can be a collection, graph or table
- An item can be a document, edge/vertex or row and is the content within a container
- azure-cosmos-dotnet-v3 samples
- Client:
CosmosClient client = new CosmosClient(endpoint, key);
- Databases:
- Creating databases
Database database1 = await client.CreateDatabaseAsync( id: "adventureworks-1" ); Database database2 = await client.CreateDatabaseIfNotExistsAsync( id: "adventureworks-2" );
- Reading a database by ID
DatabaseResponse readResponse = await database.ReadAsync();
- Deleting a database
await database.DeleteAsync();
- Containers:
- Creating a container
ContainerResponse simpleContainer = await database.CreateContainerIfNotExistsAsync( id: containerId, partitionKeyPath: partitionKey, throughput: 400);
- Get a container by ID
Container container = database.GetContainer(containerId); ContainerProperties containerProperties = await container.ReadContainerAsync();
- Delete a container
await database.GetContainer(containerId).DeleteContainerAsync();
- Items:
- Creating an item
ItemResponse<SalesOrder> response = await container.CreateItemAsync(salesOrder, new PartitionKey(salesOrder.AccountNumber));
- Read an item
string id = "[id]"; string accountNumber = "[partition-key]"; ItemResponse<SalesOrder> response = await container.ReadItemAsync(id, new PartitionKey(accountNumber));
- Query an item
QueryDefinition query = new QueryDefinition( "select * from sales s where s.AccountNumber = @AccountInput ") .WithParameter("@AccountInput", "Account1"); FeedIterator<SalesOrder> resultSet = container.GetItemQueryIterator<SalesOrder>( query, requestOptions: new QueryRequestOptions() { PartitionKey = new PartitionKey("Account1"), MaxItemCount = 1 });
Create Stored Procedures
- Azure Cosmos DB provides language-integrated, transactional execution of JavaScript which lets you write stored procedures, triggers and user-defined functions
- To call any stored procedures, triggers or UDFs, they must be registered
- Stored procedures can create, update, read, query and delete items inside a Cosmos container
- They're registered per collection and can operate on any document or attachment present in that collection
- The context object provides access to all operations that can be performed, along with access to request and response objects
var helloWorldStoredProc = { id: "helloWorld", serverScript: function () { var context = getContext(); var response = context.getResponse(); response.setBody("Hello, World"); } }
- Creating an item using a stored procedure is asynchronous, the created item is inserted into the container and returns an ID for it
var createDocumentStoredProc = { id: "createMyDocument", body: function createMyDocument(documentToCreate) { var context = getContext(); var collection = context.getCollection(); var accepted = collection.createDocument(collection.getSelfLink(), documentToCreate, function (err, documentCreated) { if (err) throw new Error('Error' + err.message); context.getResponse().setBody(documentCreated.id) }); if (!accepted) return; } }
- When defining a stored procedure in the portal, input parameters are provided as a string, even for arrays
- The workaround is to define a function within the procedure to parse the string as an array
function sample(arr) { if (typeof arr === "string") arr = JSON.parse(arr); arr.forEach(function(a) { // Do something here console.log(a); }); }
- All operations must be completed within a limited amount of time, all collection functions return a boolean value representing its completion
- Transactions can be implemented on items within a container by using a stored procedure
- JavaScript functions can implement a continuation-based model to batch or resume execution
- This value can be anything and your apps can use this value to resume a transaction from a new starting point
Create Triggers and User-Defined Functions
- Azure Cosmos DB supports pretriggers and post-triggers
- Pretriggers are executed before modifying a database item
- Post-triggers are executed after modifying a database item
- Triggers aren't automatically executed, must be specified for each database operation which wants to execute a trigger
- Use the SDK to register your defined trigger
- Pretriggers
- Pretriggers can't have input parameters
- The following example validates the properties of an item being added
function validateToDoItemTimestamp() { var context = getContext(); var request = context.getRequest(); var itemToCreate = request.getBody(); if (!("timestamp" in itemToCreate)) { var ts = new Date(); itemToCreate["timestamp"] = ts.getTime(); } request.setBody(itemToCreate); }
- Post-triggers
- The following example queries metadata from an item and updates it with details about the newly created item
function updateMetadata() { var context = getContext(); var container = context.getCollection(); var response = context.getResponse(); var createdItem = response.getBody(); var filterQuery = 'SELECT * FROM root r WHERE r.id = "_metadata"'; var accept = container.queryDocuments(container.getSelfLink(), filterQuery, updateMetadataCallback); if(!accept) throw "Unable to update metadata, abort"; function updateMetadataCallback(err, items, responseOptions) { if(err) throw new Error("Error" + err.message); if(items.length != 1) throw 'Unable to find metadata document'; var metadataItem = items[0]; // update metadata metadataItem.createdItems += 1; metadataItem.createdNames += " " + createdItem.id; var accept = container.replaceDocument(metadataItem._self, metadataItem, function(err, itemReplaced) { if(err) throw "Unable to update metadata, abort"; }); if(!accept) throw "Unable to update metadata, abort"; return; } }
- User-defined functions
- The following example calculates the income tax for various income brackets
// Assume a container called "Incomes" with the following properties { "name": "User One", "country": "USA", "income": 70000 } function tax(income) { if(income == undefined) throw 'no input'; if (income < 1000) return income * 0.1; else if (income < 10000) return income * 0.2; else return income * 0.4; }
Change Feed in Azure Cosmos DB
- Change feed is a persistent record of changes to a container in the order they occur
- Listens to container for any changes, outputting the sorted list of modified documents in order of occurrence
- The persisted changes can be processed asynchronously and incrementally in a distributed processing system
- Can see inserts and updates in change feed, but not delete operations currently
- Workaround is to add a soft marker on items being deleted
- Ex. add an attribute "deleted" with the value "true" and set a time-to-live value to ensure it's automatically deleted
- Workaround is to add a soft marker on items being deleted
- Work with the change feed in a push or pull model
- Push model: Change feed processor pushes work to a client that has business logic for processing this work
- Pull model: Pull work from the server, the client has business logic for processing work and also stores state for the last processed work
- Recommended to generally use the push model, don't have to worry about polling future change feed
- Scenarios where using the low-level control of pull model is beneficial:
- Reading changes from a particular partition key
- Controlling the pace at which your client receives changes for processing
- Doing a one-time read of existing data in the change feed
- Two ways to read change feed with a push model: Triggers and the change feed processor library
- Azure Functions can be automatically triggered on each new event in your container's change feed
- Change feed processor simplifies the process of reading change feed and distributes the event processing across multiple consumers
- There are 4 main components of implementing the change feed processor:
- Monitored container: Has data from which the change feed is generated
- Lease container: Acts as a state storage and coordinates processing change feed across multiple workers
- Computer instance: Hosts the change feed processor to listen for changes
- Delegate: The code that defines what you want to do with each batch of changes that the change processor reads
- The point of entry is always the monitored container
/// <summary> /// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation. /// </summary> private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync( CosmosClient cosmosClient, IConfiguration configuration) { string databaseName = configuration["SourceDatabaseName"]; string sourceContainerName = configuration["SourceContainerName"]; string leaseContainerName = configuration["LeasesContainerName"]; Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName); ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName) .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync) .WithInstanceName("consoleHost") .WithLeaseContainer(leaseContainer) .Build(); Console.WriteLine("Starting Change Feed Processor..."); await changeFeedProcessor.StartAsync(); Console.WriteLine("Change Feed Processor started."); return changeFeedProcessor; }
- The following is an example of a delegate
/// <summary> /// The delegate receives batches of changes as they are generated in the change feed and can process them. /// </summary> static async Task HandleChangesAsync( ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken) { Console.WriteLine($"Started handling changes for lease {context.LeaseToken}..."); Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU."); // SessionToken if needed to enforce Session consistency on another client instance Console.WriteLine($"SessionToken ${context.Headers.Session}"); // We may want to track any operation's Diagnostics that took longer than some threshold if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1)) { Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString()); } foreach (ToDoItem item in changes) { Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}."); // Simulate some asynchronous operation await Task.Delay(10); } Console.WriteLine("Finished handling changes."); }
- The compute instance name or unique identifier should be unique and different in each compute instance you're deploying
- The normal life cycle of a host instance is:
- Read the change feed
- If no changes, sleep for some predefined amount of time and go back to the first step
- If changes, send them to the delegate
- When the delegate finishes processing the changes successfully, update the lease store with the latest processed point in time and go back to the first step