About
In this code snippet, we will look at Durable Azure Functions.
Azure functions are stateless by default. But we can add state by using the Durable Functions extension. This allows you to implement common patterns such as:
1. Function chaining
2. Fan-out/fan-in
3. Async HTTP APIs
4. Monitoring
5. Human interaction
6. Aggregator
I remember watching this video from Microsoft a while ago. It does a great job of explaining the inner workings of durable functions in more detail.
In this post I will include examples of some of the above patterns I have personally used at some point. You can check out Microsoft documentation with great code examples for all of the patterns listed above here.
Let’s see how to use durable functions in the code examples below.
Prerequisites:
Before getting started you need to install the Durable Functions NuGet package into your project:
Microsoft.Azure.Functions.Worker.Extensions.DurableTask and add the following using statement at the top of your code file.
using Microsoft.DurableTask.Client; using Microsoft.DurableTask;
Async HTTP API Code:
using Microsoft.Azure.Functions.Worker; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.Extensions.Logging; using System.Text.Json; using Microsoft.DurableTask.Client; using Microsoft.DurableTask; namespace DemoFunctionApp { public class Function1 { private readonly ILogger _logger; private readonly HttpClient _client; public Function1(ILoggerFactory loggerFactory, IHttpClientFactory httpClientFactory) { _logger = loggerFactory.CreateLogger<Function1>(); _client = httpClientFactory.CreateClient(); } //Define the model for the job input data. public class OrchestratorJobInput { public OrchestratorJobInput(string myFirstProperty, int mySecondProperty) { MyFirstProperty = myFirstProperty; MySecondProperty = mySecondProperty; } public string MyFirstProperty { get; set; } public int MySecondProperty { get; set; } } [Function("DoSomething")] public async Task<HttpResponseData> DoSomething ( [HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req, [DurableClient] DurableTaskClient client, FunctionContext executionContext ) { ILogger<Function1> logger = executionContext.GetLogger<Function1>(); //Get input data from the request. /////////////////////////////////////////////// //Get the data from the request body like so: //string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); // //OrchestratorJobInput jobInputData = JsonSerializer.Deserialize<OrchestratorJobInput>(requestBody); //Or get the data from the headers like so: //req.Headers.TryGetValues("myFirstProperty", out var header1); //req.Headers.TryGetValues("mySecondProperty", out var header2); // //OrchestratorJobInput jobInputData = new OrchestratorJobInput(header1.FirstOrDefault(), int.Parse(header2.FirstOrDefault())); //Or the query parameters like so: var query = System.Web.HttpUtility.ParseQueryString(req.Url.Query); string myFirstProperty = query["myFirstProperty"]; int mySecondProperty = int.Parse(query["mySecondProperty"]); OrchestratorJobInput jobInputData = new OrchestratorJobInput(myFirstProperty, mySecondProperty); ////////////////////////////////////////////////////////////////////////////////// //Start the orchestrator function and return the response with the polling URLs to the client. string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("OrchestratorFunction", jobInputData); logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId); return await client.CreateCheckStatusResponseAsync(req, instanceId); } [Function("OrchestratorFunction")] public async Task<string> OrchestratorFunction([OrchestrationTrigger] TaskOrchestrationContext activityContext) { OrchestratorJobInput jobInput = activityContext.GetInput<OrchestratorJobInput>(); activityContext.CreateReplaySafeLogger<Function1>(); return await activityContext.CallActivityAsync<string>(nameof(SyncDataDurableTask), jobInput); } [Function(nameof(SyncDataDurableTask))] public async Task<string> SyncDataDurableTask([ActivityTrigger] OrchestratorJobInput jobInput, FunctionContext executionContext) { ILogger<Function1> logger = executionContext.GetLogger<Function1>(); logger.LogInformation($"Synchronous durable task started processing the data."); //Do something with the data ... string result = $"Some data: {jobInput.MyFirstProperty}: {jobInput.MySecondProperty}"; return result; } } }
Async HTTP API Result:
Function Chaining Code:
using Microsoft.Azure.Functions.Worker; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.Extensions.Logging; using Microsoft.DurableTask.Client; using Microsoft.DurableTask; namespace DemoFunctionApp { public class Function1 { private readonly ILogger _logger; private readonly HttpClient _client; public Function1(ILoggerFactory loggerFactory, IHttpClientFactory httpClientFactory) { _logger = loggerFactory.CreateLogger<Function1>(); _client = httpClientFactory.CreateClient(); } [Function("HttpChainTrigger")] public async Task<HttpResponseData> HttpChainTrigger ( [HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req, [DurableClient] DurableTaskClient client, FunctionContext executionContext ) { //Start the orchestrator function and return the response with the polling URLs to the client. string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("RunChain", ""); return await client.CreateCheckStatusResponseAsync(req, instanceId); } [Function("RunChain")] public static async Task<object> RunChain([OrchestrationTrigger] TaskOrchestrationContext context) { var f1Result = await context.CallActivityAsync<object>("F1", null); var f2Result = await context.CallActivityAsync<object>("F2", f1Result); return await context.CallActivityAsync<object>("F3", f2Result); } [Function("F1")] public static string F1([ActivityTrigger] string input) { input += "Hello from F1. "; return input; } [Function("F2")] public static object F2([ActivityTrigger] object input) { input += "Hello from F2. "; return input; } [Function("F3")] public static object F3([ActivityTrigger] object input) { input += "Hello from F3. "; return input; } } }
Function Chaining Result:
Fan Out/Fan In (Parallelism) Code:
using Microsoft.Azure.Functions.Worker; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.Extensions.Logging; using Microsoft.DurableTask.Client; using Microsoft.DurableTask; namespace DemoFunctionApp { public class Function1 { private readonly ILogger _logger; private readonly HttpClient _client; public Function1(ILoggerFactory loggerFactory, IHttpClientFactory httpClientFactory) { _logger = loggerFactory.CreateLogger<Function1>(); _client = httpClientFactory.CreateClient(); } [Function("FanOutFanInTrigger")] public async Task<HttpResponseData> FanOutFanInTrigger ( [HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req, [DurableClient] DurableTaskClient client, FunctionContext executionContext ) { //Start the orchestrator function and return the response with the polling URLs to the client. string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("RunFanOutFanIn", ""); return await client.CreateCheckStatusResponseAsync(req, instanceId); } [Function("RunFanOutFanIn")] public static async Task<object> FanOutFanIn( [OrchestrationTrigger] TaskOrchestrationContext context) { //Here I'll just genereate some random data to simulate the work batch. //You can pass the data in from the FanOutFanInTrigger(see first code example wtih the OrchestratorJobInput). int itemCount = 1000; int[] workBatch = new int[itemCount]; Random random = new Random(); for (int i = 0; i < itemCount; i++) workBatch[i] = random.Next(0, 255); int batchSize = 100; int numberOfBatches = workBatch.Length/batchSize; var parallelTasks = new Task<int[]>[numberOfBatches]; //Create a task for each work item. for (int i = 0; i < numberOfBatches; i++) { Task<int[]> task = context.CallActivityAsync<int[]>("ParallelMe", workBatch[i..batchSize]); parallelTasks[i] = task; } //Wait for all tasks to complete. await Task.WhenAll(parallelTasks); //Get the results of all tasks and return the result. for (int i = 0; i < parallelTasks.Length; i++) { int[] result = parallelTasks[i].Result; foreach (var item in result) { workBatch[i] = item; i++; } } return workBatch; } [Function("ParallelMe")] public static int[] ParallelMe([ActivityTrigger] int[] input) { for (int i = 0; i < input.Length; i++) { //Do some work here ... input[i] += 45; if (input[i] > 255) input[i] = 255; } return input; } } }