Azure function for CosmosDb Changefeed.
Cosmos
- Create cosmos account, cosmos container
Database:
arch-pocs-misc-cosmosdb
. Container being monitored:Transactions
Lease container, required for the changefeed:TransactionsLease
. The partition key for the lease container must be/id
.
Install func cli
Create function project
func init Bustroker.CosmosDbChangeFeed.Func --dotnet [--docker]
I renamed csproj
file from Bustroker_CosmosDbChangeFeed_Func.csproj
to Bustroker.CosmosDbChangeFeed.Func.csproj
.
Check details for development with docker here
Create ChangeFeed function
cd Bustroker.CosmosDbChangeFeed.Func
func new --name TransactionsChangeFeed [--template {HttpTrigger, CosmosDbTrigger, etc}]
And the code
// TransactionsChangeFeed.cs
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace Arch.Pocs.CosmosDbChangeFeed.Func
{
public static class TransactionsChangeFeed
{
[FunctionName("TransactionsChangeFeed")]
public static void Run([CosmosDBTrigger(
databaseName: "arch.pocs.misc.cosmosdb",
collectionName: "Transactions",
ConnectionStringSetting = "cosmosDbConnectionString",
LeaseCollectionName = "TransactionsLease",
CreateLeaseCollectionIfNotExists = true))]IReadOnlyList<Document> input, ILogger log)
{
if (input != null && input.Count > 0)
{
log.LogInformation("Documents modified " + input.Count);
log.LogInformation("First document Id " + input[0].Id);
}
}
}
}
The connectionstring needs to be set in appsetting cosmosDbConnectionString
.
If the lease collection (TransactionsLease
in this case) is created manually, bear in mind that the partition key must be /id
.
Create HttpTrigger just for test
Note: Make sure the connection strings are properly handled in Production environment!
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.Documents.Client;
using System.Linq;
namespace Arch.Pocs.CosmosDbChangeFeed.Func
{
public static class HttpEndpoint
{
const string endpointUri = "https://bustroker-cosmosdbchangefeed-cosmosacc.documents.azure.com:443/";
const string key = "SUP3RK3Y==";
const string database = "bustroker-cosmosdbchangefeed-cosmosdb";
const string container = "Transactions";
[FunctionName("HttpEndpoint")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
var action = req.Query["action"];
switch (action)
{
case "insert":
var transactionId = await InsertTransactionAsync();
return new OkObjectResult($"id:{transactionId}");
case "read":
var transactions = await ReadTransactionsAsync();
return new OkObjectResult($"transactions:{transactions}");
default:
return new OkObjectResult($"Unrecognized action:{action}");
}
}
private static async Task<string> ReadTransactionsAsync()
{
using (var client = new DocumentClient(new Uri(endpointUri), key))
{
var options = new FeedOptions
{
EnableCrossPartitionQuery = true
};
var uri = UriFactory.CreateDocumentCollectionUri(database, container);
var response = client.CreateDocumentQuery(uri, "select * from c", options).ToList();
return await Task.FromResult(JsonConvert.SerializeObject(response));
}
}
private static async Task<string> InsertTransactionAsync()
{
using (var client = new DocumentClient(new Uri(endpointUri), key))
{
var transaction = new Transaction
{
Id = Guid.NewGuid().ToString(),
Amount = 100,
Currency = "EUR"
};
var documentCollectionUri = UriFactory.CreateDocumentCollectionUri(database, container);
var document = await client.CreateDocumentAsync(documentCollectionUri, transaction);
return transaction.Id;
}
}
}
}