Azure Durable Functions, deel 5: Parallele execution

We gaan verder met onze inschrijving function workflow. Je zag in de vorige post hoe de engine intern werkt. Dat is op zich goed om te weten, maar we hoeven ons er nu niet al te druk meer over te maken. Echter, je moet wel onthouden dat we in de orchestrator geen code mogen gebruiken die niet deterministisch is. Oftewel: geen code die iedere keer andere resultaten zou kunnen terug geven.

Fan out – Fan in

In ons systeem hebben we besloten de andere user groups op de hoogte te stellen van het voorgenomen bezoek van deze bezoeker. We willen graag van elkaar weten wie nu vaak meetups bezoekt, wie zich altijd aanmeldt maar niet komt, en hoe goed we het nu eigenlijk doen. Even voor de duidelijkheid: dat doen we in het echt dus niet: zelfs al zou het van de wet mogen (en dat mag dus niet) dan nog nemen we privacy heel serieus. Het is maar een voorbeeld…

Nu kunnen we die code relatief makkelijk schrijven. We kunnen een functie maken en daarin een REST api aanroepen. We gaan er even vanuit dat alle usergroups een standaard API hanteren, dus dat is niet zo ingewikkeld.

Maar: Micro Services zijn klein en doen maar 1 ding. Dus een service schrijven die alle REST API’s van alle usergroups aanroept, past daar niet binnen. Daarnaast kan de lijst met usergroups best lang worden, dus waarom zouden we dat niet parallel uitvoeren? Dit zou de hele execution van de totale workflow wel ten goede komen.

Het proces ziet dan als volgt uit:

Fan out, fan in process

We hebben ons start punt. Dat is het einde van onze vorige A_CheckInput. Vervolgens gaan we parallel een bericht sturen naar de usergroups SDN, DevNetNoord, DotNetOost en de anderen. Pas als deze berichten allemaal verstuurd zijn, gaan we verder (en in ons voorbeeld tot nu toe is dat het einde van het proces).

Dit is het Fan out – fan in patroon. We waaieren eerst uit naar de verschillende functies, en dan wachten we tot ze klaar zijn waarna we verder gaan. De resultaten worden gebundeld en we kunnen verder gaan.

In ons voorbeeld doen we even niets met de eventuele resultaten van de REST calls. We zouden in dat geval dus een fire-and-forget actie kunnen doen. Wel fan-out maar niet wachten. Maar ik wil graag het hele proces laten zien. Fan out only is makkelijker te bouwen dus dat kun je zelf wel bedenken.

Goed. Laten we eerst de activity eens maken. Ik ga niet echt de implementatie maken van het aanroepen van een REST API: dat is gewoon standaard .net code.

Als eerste moeten we weer bedenken wat we mee gaan geven aan de usergroups. Ik denk dat een email adres genoeg is. De naam is niet relevant. De activity zelf heeft echter ook informatie nodig over welke usergroup we gaan aanroepen. Ik gebruik even een string met de naam van de groep. Dus:

public class UsergroupNotification
{
    public string UsergroupName { get; set; }
    public string AttendeeEmail { get; set; }
}

Geen verrassingen hier.

De activity zelf is ook niet ingewikkeld, zeker omdat we hier niets in het echt gaan doen:

[FunctionName("A_NotifyUsergroup")]
public static void NotifyUsergroup(
    [ActivityTrigger] UsergroupNotification notification, 
    ILogger log)
{
    log.LogWarning($"We are notifying {notification.UsergroupName} about {notification.AttendeeEmail}.");            
}

We maken weer een standaard activity. De FunctionName krijgt de prefix A_. Het is een ActivityTrigger en als data geven we mee de net aangemaakte UsergroupNotification. Uiteraard willen we een logger erbij.

In de body doe ik niets anders dan een log wegschrijven.

Nu naar de orchestrator. Na de call naar A_CheckInput (als dat goed gegaan is) moeten we deze activity starten. Dit is een eerste opzetje:

        [FunctionName("O_PerformRegistration")]
        public static async Task<RegistrationResult> PerformRegistration(
            [OrchestrationTrigger] DurableOrchestrationContext ctx,
            ILogger log)
        {
            if(!ctx.IsReplaying)
                log.LogWarning("We are in the orchestrator! Yeah!");
            var attendee = ctx.GetInput<Attendee>();

            var isValid = await ctx.CallActivityAsync<bool>("A_ValidateInput", attendee.Email);
            if (!isValid)
            {
                return new RegistrationResult
                {
                    IsSucces = false,
                    Reason = "Not a valid email given."
                };
            }

            await ctx.CallActivityAsync("A_NotifyUsergroup", 
                new UsergroupNotification { 
                    AttendeeEmail = attendee.Email, 
                    UsergroupName = "SDN" });

            await ctx.CallActivityAsync("A_NotifyUsergroup",
                new UsergroupNotification
                {
                    AttendeeEmail = attendee.Email,
                    UsergroupName = "DNN"
                });

            await ctx.CallActivityAsync("A_NotifyUsergroup",
                new UsergroupNotification
                {
                    AttendeeEmail = attendee.Email,
                    UsergroupName = "DNO"
                });


            if (!ctx.IsReplaying)
                log.LogWarning("At the end of the workflow.");

            return new RegistrationResult {
                IsSucces = true,
                Reason = "Everything checked out."
            };
        }

Ik heb even de hele orchestrator geplaatst zodat je ziet waar we zijn.

Dit werkt wel maar is niet echt handig: we roepen de verschillende usergroups nu sequentieel aan. We wachten iedere keer tot er een call geweest is en gaan dan verder. Dat willen we niet: we willen het parallel doen.

Ok. Poging 2: ik vervang de 3 calls door het volgende stuk code:

var allUsergroups = new string[] { "SDN", "DNN", "DNO" };
foreach(var usergroupName in allUsergroups)
{
    var notification = new UsergroupNotification { AttendeeEmail = attendee.Email, UsergroupName = usergroupName };
    await ctx.CallActivityAsync("A_NotifyUsergroup", notification);
}

Dat lost niet veel op. De code is leesbaarder maar het is nog steeds sequentieel. Poging 3 dan maar:

var allUsergroups = new string[] { "SDN", "DNN", "DNO" };
var allCalls = new List<Task>();
foreach(var usergroupName in allUsergroups)
{
    var notification = new UsergroupNotification { AttendeeEmail = attendee.Email, UsergroupName = usergroupName };
    allCalls.Add(ctx.CallActivityAsync("A_NotifyUsergroup", notification));
}

await Task.WhenAll(allCalls);

Dit is veel beter. We maken een lijst van de usergroups, dan maken we een lege lijst van Tasks aan. In de loop roepen we CallActivityAsync aan maar we awaiten het niet. Dan krijgen we een Task terug (we hebben geen return type, dus geen Task<T> maar gewoon Task). De verzamelen we in de lijst. Pas als we alle Tasks aangemaakt hebben, roepen we await Task.WhenAll(allCalls); aan.

Deze gaat alle Tasks opstarten en komt pas terug als alle tasks uitgevoerd zijn. Let op: ook hier geldt dat het systeem iedere keer als er een task gestart wordt, de orchestrator weggooid en daarna opnieuw opstart. Met andere woorden: samen met de call naar A_CheckInput wordt de orchestrator nu 5 keer gestart… Een keer voor de eerste run, daarna na het beeindigen van A_Checkinput en daarna 3 keer na het beeindigen van iedere call naar A_NotifyUsergroup. Je ziet: het aantal invocations kan snel oplopen.

We zouden het resultaat van de calls makkelijk kunnen uitlezen hier. Ze geven immers allemaal een result terug. Dit gaan we niet doen, maar ik vraag je wel even om na te denken hoe je dit soort dingen zou bouwen in een normale Azure Function. Realiseer je eens hoeveel controle code je zou moeten schrijven. Nu doen we een fan-out, fan-in in een paar regels code.

Configuration ophalen

Ik ben nog niet tevreden. De lijst met usergroups is nu hardcoded in de orchestrator. Gezien het tempo waarin Meetups verschijnen en weer verdwijnen, is dat niet optimaal. We moeten die data ergens vandaan halen.

Het ligt voor de hand om die lijst in een storage te hebben. We hebben immers al de TableStorage. Maar in dit voorbeeld maak ik het leven wat eenvoudiger en sla ik het op in de config. Daar kan ik later makkelijk bij komen als het aantal usergroups dat we willen benaderen veranderd.

In de file local.settings.json verander ik de huidige settings in:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "UserGroups": "SDN,DNN,DNO"
    }
}

Ik heb dus de “UserGroups” key toegevoegd en de waardes erbij gezet in een comma-seperated list.

Als ik dit later wil deployen naar Azure, zal ik die waardes daar ook moeten toevoegen. De file local.settings.json is immers, zoals de naam al zegt, local.

Dit moeten we even uitlezen:

var allUsergroups = System.Environment.GetEnvironmentVariable("Usergroups")
    .Split(',')
    .ToArray(); 
         
var allCalls = new List<Task>();
foreach(var usergroupName in allUsergroups)
{
    var notification = new UsergroupNotification { AttendeeEmail = attendee.Email, UsergroupName = usergroupName };
    allCalls.Add(ctx.CallActivityAsync("A_NotifyUsergroup", notification));
}

await Task.WhenAll(allCalls);

We lezen de data uit, doen een split op de komma en veranderen dit in een array. De rest blijft hetzelfde.

Zijn we er nu?

Nee.

Dit mag dus niet. Orchestrator functions moeten deterministic zijn. Stel dat tussen de invocation van de eerste call naar A_NotifyUsergroup en de tweede de config wijzigt. Dat kan zo maar gebeuren. Wat betekent dat dan? We hebben al eerder gezien dat we geen code mogen gebruiken die eventueel andere resultaten tussen invocations kan opleveren. Dus dit mag niet.

De oplossing is uiteraard simpel: verplaats die code naar een eigen activity. Immers: de eerste keer dat die activity uitgevoerd wordt, lezen we de settings file daadwerkelijk uit. De keren daarna krijgen we het resultaat daarvan terug uit de TableStorage, ongeacht of de onderliggende data veranderd is. Dat maakt voor deze instance van de workflow niet uit.

Dus onze orchestration ziet er als volgt uit:

if (!ctx.IsReplaying)
    log.LogWarning("Getting all usergroups.");

var allUsergroups = await ctx.CallActivityAsync<string[]>("A_GetUsergroupNames", null);
            
var allCalls = new List<Task>();
foreach(var usergroupName in allUsergroups)
{
    var notification = new UsergroupNotification { AttendeeEmail = attendee.Email, UsergroupName = usergroupName };
    allCalls.Add(ctx.CallActivityAsync("A_NotifyUsergroup", notification));
}

await Task.WhenAll(allCalls);

In regel 4 zien we de call naar de Activity. Deze moet data mee krijgen maar we kunnen daar null voor gebruiken.

De activity:

[FunctionName("A_GetUsergroupNames")]
public static string[] GetUsergroupNames(
    [ActivityTrigger] object input, 
    ILogger log)
{
    var allUsergroups = System.Environment.GetEnvironmentVariable("Usergroups")
        .Split(',')
        .ToArray();

    log.LogWarning("Loaded the usergroup list.");

    return allUsergroups;
}

Het is een activity, dus we moeten een parameter meegeven met als attribuut [ActivityTrigger]. Echter, we geven null mee en we doen er niets mee, dus ik heb er object van gemaakt met als naam input. Er moet toch iets staan.

In de body lezen we de config uit. Hier mag dat wel: dit wordt per instance van de workflow maar een keer uitgevoerd. Het resultaat komt voor deze instance in de TableStorage te staan dus we hebben een deterministische orchestrator.

Als we deze runnen kunnen we in de output ook zien dat het werkt:

Ouput van fan-out, fan-in calls

De volgorde van de output kan varieren, de timings ook. Maar alle usergroups die we hebben staan in de config worden aangeroepen.

Interessant is ook de output. Klik maar op de status query (je weet wel: die in de browser staat onder de naam statusQueryGetUri. Voeg aan het einde van deze url de parameters &showHistory=true&showHistoryOutput=true en zie het resultaat.

// 20191103143449
// http://localhost:7071/runtime/webhooks/durabletask/instances/a9f84fe8ee0f42529b26b9cfeeb68a17?taskHub=DurableFunctionsHub&connection=Storage&code=6ZoZrnZw/qufYwLVultvsgS7RiwKNf1g6t0AjF9YNUMERC84wKrOLA==&showHistory=true&showHistoryOutput=true

{
  "name": "O_PerformRegistration",
  "instanceId": "a9f84fe8ee0f42529b26b9cfeeb68a17",
  "runtimeStatus": "Completed",
  "input": {
    "$type": "MeetingRegistration.Attendee, MeetingRegistration",
    "Name": "Dennis",
    "Email": "dennis@vroegop.org",
    "WantsTweet": true
  },
  "customStatus": null,
  "output": {
    "IsSucces": true,
    "Reason": "Everything checked out."
  },
  "createdTime": "2019-11-03T13:34:20Z",
  "lastUpdatedTime": "2019-11-03T13:34:25Z",
  "historyEvents": [
    {
      "EventType": "ExecutionStarted",
      "Timestamp": "2019-11-03T13:34:20.8306001Z",
      "FunctionName": "O_PerformRegistration"
    },
    {
      "EventType": "TaskCompleted",
      "Result": true,
      "Timestamp": "2019-11-03T13:34:22.6157852Z",
      "ScheduledTime": "2019-11-03T13:34:21.8540948Z",
      "FunctionName": "A_ValidateInput"
    },
    {
      "EventType": "TaskCompleted",
      "Result": [
        "SDN",
        "DNN",
        "DNO"
      ],
      "Timestamp": "2019-11-03T13:34:23.3958736Z",
      "ScheduledTime": "2019-11-03T13:34:23.1352168Z",
      "FunctionName": "A_GetUsergroupNames"
    },
    {
      "EventType": "TaskCompleted",
      "Result": null,
      "Timestamp": "2019-11-03T13:34:24.0132401Z",
      "ScheduledTime": "2019-11-03T13:34:23.7418444Z",
      "FunctionName": "A_NotifyUsergroup"
    },
    {
      "EventType": "TaskCompleted",
      "Result": null,
      "Timestamp": "2019-11-03T13:34:24.158211Z",
      "ScheduledTime": "2019-11-03T13:34:23.7418405Z",
      "FunctionName": "A_NotifyUsergroup"
    },
    {
      "EventType": "TaskCompleted",
      "Result": null,
      "Timestamp": "2019-11-03T13:34:24.5138455Z",
      "ScheduledTime": "2019-11-03T13:34:23.7418457Z",
      "FunctionName": "A_NotifyUsergroup"
    },
    {
      "EventType": "ExecutionCompleted",
      "OrchestrationStatus": "Completed",
      "Result": {
        "IsSucces": true,
        "Reason": "Everything checked out."
      },
      "Timestamp": "2019-11-03T13:34:25.1201956Z"
    }
  ]
}

En zo kunnen we dus zaken parallel laten draaien!

Toch ben ik nog niet helemaal tevreden. Onze orchestrator functie wordt wat groot en er gebeurt te veel in. Daar gaan we in de volgende post wat aan doen.

Gepubliceerd door Dennis Vroegop

Passionate Azure Cloud Solutions Architect. I am a enthusiastic guitar player (though not as good at it as I'd like to be) and in my daytime job I teach software developers to be better at their job. Married to my wonderful wife Diana with who I try to raise our daughter Emma.

Geef een reactie

Vul je gegevens in of klik op een icoon om in te loggen.

WordPress.com logo

Je reageert onder je WordPress.com account. Log uit /  Bijwerken )

Google photo

Je reageert onder je Google account. Log uit /  Bijwerken )

Twitter-afbeelding

Je reageert onder je Twitter account. Log uit /  Bijwerken )

Facebook foto

Je reageert onder je Facebook account. Log uit /  Bijwerken )

Verbinden met %s

Deze site gebruikt Akismet om spam te bestrijden. Ontdek hoe de data van je reactie verwerkt wordt.

%d bloggers liken dit: