Azure function for CosmosDb Changefeed.

Cosmos

Install func cli

https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=windows%2Ccsharp%2Cbash#install-the-azure-functions-core-tools

Create function project

func init Bustroker.CosmosDbChangeFeed.Func --dotnet [--docker]

I renamed csproj file from Bustroker_CosmosDbChangeFeed_Func.csproj to Bustroker.CosmosDbChangeFeed.Func.csproj.

Check details for development with docker here

Create ChangeFeed function

cd Bustroker.CosmosDbChangeFeed.Func
func new --name TransactionsChangeFeed [--template {HttpTrigger, CosmosDbTrigger, etc}]

And the code

// TransactionsChangeFeed.cs

using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace Arch.Pocs.CosmosDbChangeFeed.Func
{
    public static class TransactionsChangeFeed
    {
        [FunctionName("TransactionsChangeFeed")]
        public static void Run([CosmosDBTrigger(
            databaseName: "arch.pocs.misc.cosmosdb",
            collectionName: "Transactions",
            ConnectionStringSetting = "cosmosDbConnectionString",
            LeaseCollectionName = "TransactionsLease",
            CreateLeaseCollectionIfNotExists = true))]IReadOnlyList<Document> input, ILogger log)
        {
            if (input != null && input.Count > 0)
            {   
                log.LogInformation("Documents modified " + input.Count);
                log.LogInformation("First document Id " + input[0].Id);
            }
        }
    }
}

The connectionstring needs to be set in appsetting cosmosDbConnectionString. If the lease collection (TransactionsLease in this case) is created manually, bear in mind that the partition key must be /id.

Create HttpTrigger just for test

Note: Make sure the connection strings are properly handled in Production environment!

using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.Documents.Client;
using System.Linq;

namespace Arch.Pocs.CosmosDbChangeFeed.Func
{
    public static class HttpEndpoint
    {
        const string endpointUri = "https://bustroker-cosmosdbchangefeed-cosmosacc.documents.azure.com:443/";
        const string key = "SUP3RK3Y==";
        const string database = "bustroker-cosmosdbchangefeed-cosmosdb";
        const string container = "Transactions";

        [FunctionName("HttpEndpoint")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");

            var action = req.Query["action"];

            switch (action)
            {
                case "insert":
                    var transactionId = await InsertTransactionAsync();
                    return new OkObjectResult($"id:{transactionId}");
                case "read":
                    var transactions = await ReadTransactionsAsync();
                    return new OkObjectResult($"transactions:{transactions}");
                default: 
                    return new OkObjectResult($"Unrecognized action:{action}");
            }            
        }

        private static async Task<string> ReadTransactionsAsync()
        {
            using (var client = new DocumentClient(new Uri(endpointUri), key))
            {
                var options = new FeedOptions
                {
                    EnableCrossPartitionQuery = true
                };
                var uri = UriFactory.CreateDocumentCollectionUri(database, container);
                var response = client.CreateDocumentQuery(uri, "select * from c", options).ToList();
                return await Task.FromResult(JsonConvert.SerializeObject(response));
            }
        }

        private static async Task<string> InsertTransactionAsync()
        {
            using (var client = new DocumentClient(new Uri(endpointUri), key))
            {
                var transaction = new Transaction
                {  
                    Id = Guid.NewGuid().ToString(),
                    Amount = 100,
                    Currency = "EUR"
                };  

                var documentCollectionUri = UriFactory.CreateDocumentCollectionUri(database, container);

                var document = await client.CreateDocumentAsync(documentCollectionUri, transaction); 

                return transaction.Id;
            }
        }
    }
}