Azure Function Durable Functions

Azure Functions Durable Functions
Share:

About

In this code snippet, we will look at Durable Azure Functions.

Azure functions are stateless by default. But we can add state by using the Durable Functions extension. This allows you to implement common patterns such as:

    1. Function chaining
    2. Fan-out/fan-in
    3. Async HTTP APIs
    4. Monitoring
    5. Human interaction
    6. Aggregator

I remember watching this video from Microsoft a while ago. It does a great job of explaining the inner workings of durable functions in more detail.

In this post I will include examples of some of the above patterns I have personally used at some point. You can check out Microsoft documentation with great code examples for all of the patterns listed above here.

Let’s see how to use durable functions in the code examples below.

Prerequisites:

Before getting started you need to install the Durable Functions NuGet package into your project:
Microsoft.Azure.Functions.Worker.Extensions.DurableTask and add the following using statement at the top of your code file.

using Microsoft.DurableTask.Client;
using Microsoft.DurableTask;

Async HTTP API Code:

This code example shows how to create an Async HTTP API used for long-running jobs(this approach also gets refferd to as long polling). I had to implement this pattern once for a service that generates files which sometimes took even over half an hour.
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask;

namespace DemoFunctionApp
{
    public class Function1
    {
        private readonly ILogger _logger;
        private readonly HttpClient _client;

        public Function1(ILoggerFactory loggerFactory, IHttpClientFactory httpClientFactory)
        {
            _logger = loggerFactory.CreateLogger<Function1>();
            _client = httpClientFactory.CreateClient();
        }

        //Define the model for the job input data.
        public class OrchestratorJobInput
        {
            public OrchestratorJobInput(string myFirstProperty, int mySecondProperty)
            {
                MyFirstProperty = myFirstProperty;
                MySecondProperty = mySecondProperty;
            }

            public string MyFirstProperty { get; set; }
            public int MySecondProperty { get; set; }
        }

        [Function("DoSomething")]
        public async Task<HttpResponseData> DoSomething
        (
            [HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext
        )
        {
            ILogger<Function1> logger = executionContext.GetLogger<Function1>();

            //Get input data from the request. ///////////////////////////////////////////////

            //Get the data from the request body like so:
            //string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            //
            //OrchestratorJobInput jobInputData = JsonSerializer.Deserialize<OrchestratorJobInput>(requestBody);


            //Or get the data from the headers like so:
            //req.Headers.TryGetValues("myFirstProperty", out var header1);
            //req.Headers.TryGetValues("mySecondProperty", out var header2);
            //
            //OrchestratorJobInput jobInputData = new OrchestratorJobInput(header1.FirstOrDefault(), int.Parse(header2.FirstOrDefault()));


            //Or the query parameters like so:
            var query = System.Web.HttpUtility.ParseQueryString(req.Url.Query);
            string myFirstProperty = query["myFirstProperty"];
            int mySecondProperty = int.Parse(query["mySecondProperty"]);

            OrchestratorJobInput jobInputData = new OrchestratorJobInput(myFirstProperty, mySecondProperty);

            //////////////////////////////////////////////////////////////////////////////////


            //Start the orchestrator function and return the response with the polling URLs to the client.
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("OrchestratorFunction", jobInputData);

            logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

            return await client.CreateCheckStatusResponseAsync(req, instanceId);
        }

        [Function("OrchestratorFunction")]
        public async Task<string> OrchestratorFunction([OrchestrationTrigger] TaskOrchestrationContext activityContext)
        {
            OrchestratorJobInput jobInput = activityContext.GetInput<OrchestratorJobInput>();

            activityContext.CreateReplaySafeLogger<Function1>();
            
            return await activityContext.CallActivityAsync<string>(nameof(SyncDataDurableTask), jobInput);
        }

        [Function(nameof(SyncDataDurableTask))]
        public async Task<string> SyncDataDurableTask([ActivityTrigger] OrchestratorJobInput jobInput, FunctionContext executionContext)
        {
            ILogger<Function1> logger = executionContext.GetLogger<Function1>();
            logger.LogInformation($"Synchronous durable task started processing the data.");

            //Do something with the data ...
            string result = $"Some data: {jobInput.MyFirstProperty}: {jobInput.MySecondProperty}";

            return result;
        }
    }
}

Async HTTP API Result:

As you can see when you send a request you immediately get back a response with the URLs that can be used to poll for the result. If you call the “statusQueryGetUri” you get the job status and result if it has finished.

Function Chaining Code:

This code example shows how to chain together multiple functions. This is a great way to chain together multiple functions compared to deploying them and them chaining them with HTTP calls instead.
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask;

namespace DemoFunctionApp
{
    public class Function1
    {
        private readonly ILogger _logger;
        private readonly HttpClient _client;

        public Function1(ILoggerFactory loggerFactory, IHttpClientFactory httpClientFactory)
        {
            _logger = loggerFactory.CreateLogger<Function1>();
            _client = httpClientFactory.CreateClient();
        }

        [Function("HttpChainTrigger")]
        public async Task<HttpResponseData> HttpChainTrigger
        (
            [HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext
        )
        {
            //Start the orchestrator function and return the response with the polling URLs to the client.
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("RunChain", "");

            return await client.CreateCheckStatusResponseAsync(req, instanceId);
        }

        [Function("RunChain")]
        public static async Task<object> RunChain([OrchestrationTrigger] TaskOrchestrationContext context)
        {
            var f1Result = await context.CallActivityAsync<object>("F1", null);
            var f2Result = await context.CallActivityAsync<object>("F2", f1Result);

            return await context.CallActivityAsync<object>("F3", f2Result);
        }

        [Function("F1")]
        public static string F1([ActivityTrigger] string input)
        {
            input += "Hello from F1. ";
            return input;
        }

        [Function("F2")]
        public static object F2([ActivityTrigger] object input)
        {
            input += "Hello from F2. ";
            return input;
        }

        [Function("F3")]
        public static object F3([ActivityTrigger] object input)
        {
            input += "Hello from F3. ";
            return input;
        }
    }
}

Function Chaining Result:​

As you can see when you send a request you immediately get back a response with the URLs that can be used to poll for the result. If you call the “statusQueryGetUri” you get the job status and result if it has finished.

Fan Out/Fan In (Parallelism) Code:

This code example shows how to use parallelism. Azure Functions are single-threaded so you can’t do parallelism aka multithreading like you would traditionally as described in this post. The caller(client or parent/orchestrator function) simply has to make multiple requests to your function which will cause it to horizontally scale and spawn a new instance with a new thread. Durable functions help you do just that in a few lines of code.
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask;

namespace DemoFunctionApp
{
    public class Function1
    {
        private readonly ILogger _logger;
        private readonly HttpClient _client;

        public Function1(ILoggerFactory loggerFactory, IHttpClientFactory httpClientFactory)
        {
            _logger = loggerFactory.CreateLogger<Function1>();
            _client = httpClientFactory.CreateClient();
        }

                [Function("FanOutFanInTrigger")]
        public async Task<HttpResponseData> FanOutFanInTrigger
        (
            [HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext
        )
        {
            //Start the orchestrator function and return the response with the polling URLs to the client.
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("RunFanOutFanIn", "");

            return await client.CreateCheckStatusResponseAsync(req, instanceId);
        }

        [Function("RunFanOutFanIn")]
        public static async Task<object> FanOutFanIn(
            [OrchestrationTrigger] TaskOrchestrationContext context)
        {
            //Here I'll just genereate some random data to simulate the work batch.
            //You can pass the data in from the FanOutFanInTrigger(see first code example wtih the OrchestratorJobInput). 
            int itemCount = 1000;
            int[] workBatch = new int[itemCount];

            Random random = new Random();
            for (int i = 0; i < itemCount; i++)
                workBatch[i] = random.Next(0, 255);



            int batchSize = 100;
            int numberOfBatches = workBatch.Length/batchSize;
            var parallelTasks = new Task<int[]>[numberOfBatches];
            //Create a task for each work item.
            for (int i = 0; i < numberOfBatches; i++)
            {
                Task<int[]> task = context.CallActivityAsync<int[]>("ParallelMe", workBatch[i..batchSize]);
                parallelTasks[i] = task;
            }



            //Wait for all tasks to complete.
            await Task.WhenAll(parallelTasks);



            //Get the results of all tasks and return the result.
            for (int i = 0; i < parallelTasks.Length; i++)
            {
                int[] result = parallelTasks[i].Result;
                foreach (var item in result)
                {
                    workBatch[i] = item;
                    i++;
                }
            }

            return workBatch;
        }

        [Function("ParallelMe")]
        public static int[] ParallelMe([ActivityTrigger] int[] input)
        {
            for (int i = 0; i < input.Length; i++)
            {
                //Do some work here ...
                input[i] += 45;

                if (input[i] > 255)
                    input[i] = 255;
            } 

            return input;
        }
    }
}

Fan Out/Fan In Result:

As you can see when you send a request you immediately get back a response with the URLs that can be used to poll for the result. If you call the “statusQueryGetUri” you get the job status and result if it has finished.
Share:

Leave a Reply

Your email address will not be published. Required fields are marked *

The following GDPR rules must be read and accepted:
This form collects your name, email and content so that we can keep track of the comments placed on the website. For more info check our privacy policy where you will get more info on where, how and why we store your data.

Advertisment ad adsense adlogger