Azure Function for CosmosDB Change Feed

Goal

Process CosmosDB document changes in real-time using an Azure Function triggered by the CosmosDB Change Feed.

Setup CosmosDB

Create a Cosmos account with:

The lease container tracks Change Feed progress across function instances.

Install Azure Functions Core Tools

https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=windows%2Ccsharp%2Cbash#install-the-azure-functions-core-tools

Create Function Project

func init Bustroker.CosmosDbChangeFeed.Func --dotnet [--docker]

Rename the generated .csproj file from Bustroker_CosmosDbChangeFeed_Func.csproj to Bustroker.CosmosDbChangeFeed.Func.csproj.

For Docker development, see this guide.

Create Change Feed Function

cd Bustroker.CosmosDbChangeFeed.Func
func new --name TransactionsChangeFeed [--template {HttpTrigger, CosmosDbTrigger, etc}]
// TransactionsChangeFeed.cs

using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace Arch.Pocs.CosmosDbChangeFeed.Func
{
    public static class TransactionsChangeFeed
    {
        [FunctionName("TransactionsChangeFeed")]
        public static void Run([CosmosDBTrigger(
            databaseName: "arch.pocs.misc.cosmosdb",
            collectionName: "Transactions",
            ConnectionStringSetting = "cosmosDbConnectionString",
            LeaseCollectionName = "TransactionsLease",
            CreateLeaseCollectionIfNotExists = true)]IReadOnlyList<Document> input, ILogger log)
        {
            if (input != null && input.Count > 0)
            {   
                log.LogInformation("Documents modified " + input.Count);
                log.LogInformation("First document Id " + input[0].Id);
            }
        }
    }
}

Set cosmosDbConnectionString in your app settings. If creating the lease collection manually, the partition key must be /id.

Test with HTTP Trigger

Note: Handle connection strings securely in Production.

using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.Documents.Client;
using System.Linq;

namespace Arch.Pocs.CosmosDbChangeFeed.Func
{
    public static class HttpEndpoint
    {
        const string endpointUri = "https://bustroker-cosmosdbchangefeed-cosmosacc.documents.azure.com:443/";
        const string key = "SUP3RK3Y==";
        const string database = "bustroker-cosmosdbchangefeed-cosmosdb";
        const string container = "Transactions";

        [FunctionName("HttpEndpoint")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");

            var action = req.Query["action"];

            switch (action)
            {
                case "insert":
                    var transactionId = await InsertTransactionAsync();
                    return new OkObjectResult($"id:{transactionId}");
                case "read":
                    var transactions = await ReadTransactionsAsync();
                    return new OkObjectResult($"transactions:{transactions}");
                default: 
                    return new OkObjectResult($"Unrecognized action:{action}");
            }            
        }

        private static async Task<string> ReadTransactionsAsync()
        {
            using (var client = new DocumentClient(new Uri(endpointUri), key))
            {
                var options = new FeedOptions
                {
                    EnableCrossPartitionQuery = true
                };
                var uri = UriFactory.CreateDocumentCollectionUri(database, container);
                var response = client.CreateDocumentQuery(uri, "select * from c", options).ToList();
                return await Task.FromResult(JsonConvert.SerializeObject(response));
            }
        }

        private static async Task<string> InsertTransactionAsync()
        {
            using (var client = new DocumentClient(new Uri(endpointUri), key))
            {
                var transaction = new Transaction
                {  
                    Id = Guid.NewGuid().ToString(),
                    Amount = 100,
                    Currency = "EUR"
                };  

                var documentCollectionUri = UriFactory.CreateDocumentCollectionUri(database, container);

                var document = await client.CreateDocumentAsync(documentCollectionUri, transaction); 

                return transaction.Id;
            }
        }
    }
}