Azure Durable Functions, deel 4: de workflow van de workflow

Goed, In het vorige deel hebben we een starter function, een orchestration function en een activity function gemaakt. Het lijkt een hoop werk voor een relatief eenvoudig systeem, maar dat valt wel mee: de rest van de code wordt niet veel ingewikkelder dan dit. Nou ja, wel een beetje maar de plumbing zit er nu ongeveer wel in.

We hebben ook gezien dat als we onze heel eenvoudige workflow starten, er in de output twee keer de log message van de orchestrator stond. En ik vertelde dat dat geen bug is maar dat dat zo hoort.

In deze post, zal ik uitleggen hoe dat werkt en gelijk hoe stateless Azure Functions ineens toch stateful kunnen lijken.

Als je wilt meekijken, raad ik je aan om de gratis tool Azure Storage Explorer te downloaden vanaf deze site. Deze tool gebruik je om te kijken wat er in je storage accounts staat, ook in je emulated versie van je storage account. Dus: we kunnen kijken hoe onze Durable Functions in het echt werken.

De orchestrator aanpassen

Om een goed beeld te krijgen van de werking van Durable Functions, moeten we even onze orchestrator aanpassen. Dat moesten we toch al: onze validator geeft aan of een attendee een geldig email adres meegeeft of niet. Als dat zo is: prima, we gaan door. Maar als dat niet zo is, dan moeten we stoppen met de functies.

Dit was een van de voorbeelden die ik aanhaalde om aan te geven dat we best veel code moeten schrijven om die case af te handelen. In “traditionele” Azure Functions moet je op de een of andere manier bijhouden wat er gebeurt met het resultaat van die Validator functie. Ik zei al dat die functie messages in de queue post om aan te geven wat het resultaat is. Maar: dat betekent dat de functie zelf kennis heeft van de flow en de messages die er allemaal zijn. Dat wil je niet: die functie is minder herbruikbaar op die manier. De functie zoals we die nu geschreven hebben is proces-agnostisch. Hij heeft geen idee waar hij voor gebruikt wordt, het enige wat hij weet is dat hij en boolean terug geeft om aan te geven of een email adres geldig is of niet (nou ja, of er een @ in zit of niet, maar dat is muggenziften,,,)

De orchestrator regelt per definitie de flow. Dat is wat een orchestrator doet. Dus daar gaan we iets met het return resultaat doen.

Ik introduceer een nieuwe DTO: RegistrationResult. Deze geeft de orchestrator terug: is het goed gegaan of niet?

namespace MeetingRegistration
{
    public class RegistrationResult
    {
        public bool IsSucces { get; set; }
        public string Reason { get; set; }
    }
}

Deze spreekt voor zich, denk ik.

Dit moeten we gaan gebruiken in de orchestrator functie:

[FunctionName("O_PerformRegistration")]
public static async Task<RegistrationResult> PerformRegistration(
    [OrchestrationTrigger] DurableOrchestrationContext ctx,
    ILogger log)
{
    log.LogWarning("We are in the orchestrator! Yeah!");
    var attendee = ctx.GetInput<Attendee>();
            
    var isValid = await ctx.CallActivityAsync<bool>("A_ValidateInput", attendee.Email);
    if (!isValid)
    {
        return new RegistrationResult
        {
            IsSucces = false,
            Reason = "Not a valid email given."
        };
    }

    return new RegistrationResult { 
        IsSucces = true, 
        Reason = "Everything checked out." 
    };
}

De return type is veranderd van Task naar Task<RegistrationResult>. Aan het einde van de method geven we de default terug met de info dat alles gelukt is. Als onze activity aangeeft dat er een probleem was (dus het resultaat is False), geven we die info ook terug aan onze aanroepende client.

Als we dit gaan testen, krijgen we het volgende resultaat met een geldig email adres (dus: in de browser de link van statusQueryGetUri klikken of selecteren en pasten)

Succes met het runnen

En als we een ongeldig adres meegeven:

Geen succes met ongeldig email adres

(ik weet het: waarschijnlijk weet je het wel, maar… je hoeft de app niet iedere keer opnieuw op te starten.. gewoon de URL in de browser aanpassen. Je krijgt toch een nieuwe instance van de workflow)

Je ziet dan in het eerste geval de “output” alle data bevat die aangeeft dat alles goed ging, en in het tweede geval dat er een fout was (not a valid email given).

Dit is het resultaat van de orchestration. Maar kunnen we ook kijken wat er in de activities gebeurt? Ik zou de vraag niet stellen als het antwoord niet ‘ja’ was..

In het scherm met de output van het resultaat moeten we de querystring even aanpassen (let op dat je runtime nog steeds draait.. mocht dit niet zo zijn dan kun je gewoon in Visual Studio op F5 drukken. Je hoeft geen nieuwe request aan te maken: deze oude is nog gewoon beschikbaar).

Voeg aan het einde van de URL in de browser de volgende parameter toe:

&showHistory=true

Meer debug output in de browser

Je ziet dat je nu voor alle stappen meer informatie krijgt. Meer nog dan je wellicht zou verwachten (komt goed.. moment!)

Voeg nu de volgende parameter toe aan wat we net al hadden:

&showHistoryOutput=true

En ververs de browser

Nog meer input

Als je even zoekt, zie je nu per stap de return values. In ons geval zien we dat A_ValidateInput false terug geeft, wat resulteerde in die message die we deze attendee niet kunnen accepteren. We kunnen deze attendee helaas niet laten weten dat hij of zij niet mag komen: we hebben geen geldig email adres.

Let op: deze informatie wordt heel snel heel groot. Vandaar dat we twee opties hebben om het al dan niet te laten zien.

Async / await in durable functions

Maar hoe zit het nu met die dubbele log messages? En al die aangeroepen functies die ik zie? Ik ben blij dat je het vraagt.

We komen nu bij de kern van hoe Durable Functions durable kunnen zijn en toch stateless. Heb je de Storage Explorer open staan? Die hebben we namelijk zo nodig. Kijk even mee!

Het hart van het systeem wordt gedefinieerd door de keywords Async / Await. De meeste .net / c# ontwikkelaars kennen deze wel, hoewel maar weinigen echt weten hoe het intern werkt. Mijn uitleg over de werking hieronder is totaal niet correct, maar geeft wel weer hoe de werking zou kunnen zijn. Met andere woorden: ook al werkt het intern anders, zo zou het kunnen werken…

Async / await is een pattern on asynchrone code synchroon uit te voeren. Kijk eens naar de volgende dummy code:

public async Task DoSomethingUsefull()
{
    // Hier komen we binnen
    Console.WriteLine("Hello world");

    // Nu doen we de volgende stap, maar...
    // dit wordt uitgevoerd in een andere thread (niet dus, maar het
    // idee klopt wel)
    // Pas als die thread klaar is, komen we hier terug in deze thread

    await DoSomethingThatLastsLong();
            
    // Als bovenstaande thread klaar is, gaan we hier verder
    // (nog steeds: niet dus...)

}

Dit is ongeveer hoe de meeste mensen denken dat async/ await werkt. En het idee klopt wel, hoewel dit pattern in het echt niet (altijd) met threads werkt.

Je zou verwachten dat onze code van de orchestrator hetzelfde doet.

Echter… dit is niet wat er gebeurt. Dit is namelijk weer een mooi voorbeeld van hoe keywords in C# hergebruikt worden maar iets anders doen in een andere context.

Laten we de volgende dummy orchestrator eens bekijken:

public static async Task DummyOrchestrator(
    [OrchestrationClient] DurableOrchestrationClient client
    )
{
    // Entry point. Hier beginnen we.

    var result = await client.StartNewAsync<string>("A_Activity", SomeData);
    // De thread wacht hier tot de activity klaar is, 
    // en gaat dan verder

    // Meer code..
}

Dit lijkt op de code die we eerder zagen. Maar de compiler maakt er iets heel anders van. De Activity bijvoorbeeld doet het volgende in het echt:

public static string Activity([ActivityTrigger] SomeData someData)
{
    // Doe wat werk
    // Sla de data op in TableStorage in Azure (of Emulator)
    // Return void... 
}

Hoewel deze activity een string terug moet geven, geeft deze niets terug… het resultaat echter wordt opgeslagen in TableStorage.

De orchestrator doet iets als deze pseudocode:

public static async Task DummyOrchestrator(
    [OrchestrationClient] DurableOrchestrationClient client
    )
{
    // Entry point. Hier beginnen we.

    bool a_activityHasRun = GetRunStatus("A_Activity");
    if (!a_activityHasRun)
    {
        client.StarAsync("A_Activity", SomeData);
        return;
    }
    var result = ReadFromTableStorage("A_ActivityResult");

    // Meer code..
}

Hier moeten we even iets langer bij stilstaan.

De workflow wordt gestart. Als eerste wordt er ergens in een tabel in TableStorage gekeken of de eerste activity (A_Activity) al gedraaid heeft voor deze workflow instance. Als dat niet zo is, wordt deze functie uitgevoerd in een aparte context en de orchestrator verlaat het geheugen. De code wordt opgeruimd. Het app-domain verdwijnt. De VM kan zelfs weggegooid worden, afhankelijk van de tijd en de geheugendruk.

Als de activity klaar is met zijn werk en het resultaat opgeslagen heeft in de TableStorage, wordt er een nieuwe instance van de orchestrator gemaakt. En deze begint van voor af aan…

Alleen: dit keer is het resultaat van GetRunStatus(“A_Activity”) true: we hebben hem al uitgevoerd! Dus de method gaat door en haalt dat resultaat op uit de TableStorage. Nu hebben we het resultaat, en we gaan verder.

Als hier nu weer een activity gestart wordt, gebeurt hetzelfde: de orchestrator stopt en verdwijnt uit het geheugen. Als de activity klaar is, wordt de orchestrator weer opgestart en begint alles weer van voor af aan.. ook A_Activity (maar die is al uitgevoerd dus het enige wat er gebeurt is dat het resultaat uit de TableStorage komt.

Onze log message verschijnt daarom dus 2 keer. Await / Async werkt hier dus heel anders dan we gewend zijn. Await hier betekent: stop met verwerken, verlaat de functie en ruim je resources op. Dat is iets heel anders dan de thread pauzeren!

Regels voor orchestrators

Aangezien de code in de orchestrator iedere stap van de workflow opnieuw uitgevoerd wordt zijn er een aantal beperkingen aan wat je kunt doen in die code. Je kunt niet zomaar alles doen wat je wilt: de flow in de method moet deterministisch zijn. Het systeem moet volledig voorspelbaar zijn. Kijk even naar de volgende dummy orchestrator:

[FunctionName("O_MyOrchestrator"]
public static async Task MyOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext ctx, 
    ILogger log)
{
    var invocationDate = System.DateTime.UtcNow;
    log.LogInformation($"Code is invoked at {invocationDate}.");

    var result = await ctx.CallActivityAsync<bool>("A_MyActivity", invocationDate);
    var moreResult = await ctx.CallActivityAsync<Attendee>("A_EnrichAttendee", invocationDate);
}

Op zich gebeurt hier niets raars. We halen de tijd op, we loggen de informatie en we roepen twee activities aan. Prima. In activity A_MyActivity geven we die datum mee. Blijkbaar is dat nodig om een berekening uit te voeren.

Maar wat gebeurt er nu echt? Laten we het eens stap voor stap bekijken.

  1. We halen de huidige UTC tijd op
  2. We loggen die tijd
  3. We zorgen ervoor dat de functie A_MyActivity gestart wordt, met de huidige tijd als data
  4. We verlaten deze method.
  5. Als A_MyActivity afgerond is, start deze method weer op
  6. We halen de huidige UTC tijd op
  7. We loggen de nieuwe tijd
  8. We halen het resultaat van A_MyActivity uit de TableStorage
  9. We starten activity A_EnrichAttendee op, met als parameter een nieuwe waarde voor invocationDate…

Wacht even. Dat kan niet kloppen. Als die twee activities iets met die tijd doen, en daar dus afhankelijk zijn, dan kun je in de flow van deze method aflezen dat ze allebei dezelfde waarde krijgen. Maar die krijgen ze niet. Als A_MyActivity uren er over doet om te runnen (en dat kan makkelijk!) dan zit er een behoorlijk verschil tussen de waardes van invocationDate die aan de twee methods doorgegeven worden.

Dit levert potentieel grote problemen op.

Ander voorbeeld: we maken in de eerste regel een GUID aan. Deze hebben we nodig als key in een storage bijvoorbeeld. De eerste activity krijgt deze GUID mee en slaat data op met die GUID als key. De tweede activity heeft die GUID ook nodig om de data weer op te halen. Maar: omdat de orchestrator opnieuw begint krijgen we een nieuwe GUID.

Dit mag dus niet.

Net als het ophalen van data uit een storage, uit een config, uitlezen van een web-resources als REST server, enzovoorts. Alle data die mogelijk tussen twee calls in kan veranderen, mag je niet gebruiken in een orchestrator functie.

Dat klinkt als een beperking, maar dat valt wel mee. We zullen snel zien hoe we hier mee om moeten gaan, maar ik kan je wel alvast de volgende code meegeven:

public static async Task MyOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext ctx, 
    ILogger log)
{
    var invocationDate = ctx.CurrentUtcDateTime;

    if(!ctx.IsReplaying)
        log.LogInformation($"Code is invoked at {invocationDate}.");

    var result = await ctx.CallActivityAsync<bool>("A_MyActivity", invocationDate);
    var moreResult = await ctx.CallActivityAsync<Attendee>("A_EnrichAttendee", null);
}

In plaats van System.DateTime.UtcNow vragen we ct.CurrentUtcDateTime op. De eerste keer dat we in deze orchestrator komen voor deze instance van de workflow, geeft dit de huidige datum en tijd terug. De volgende keer als we hier inkomen, nadat A_MyActivity klaar is, dan krijgen we gegarandeerd dezelfde waarde terug. Hoewel die tweede call dus dagen later kan gebeuren (want activities kunnen heel lang duren) krijgen we altijd die eerste waarde terug.

Hetzelfde kunnen we doen met ctx.NewGuid. Die genereert een GUID die bij het opnieuw uitvoeren hetzelfde resultaat terug geeft.

Verder kunnen we aan de context vragen of dit de eerste keer is dat deze regel uitgevoerd wordt: ctx.IsReplaying geeft aan of dit een replay is of niet. Let wel op: als A_MyActivity klaar is, is op dit punt IsReplaying true, maar zo gauw de code na de call naar A_MyActivity is (dus wanneer eigenlijk het resultaat uit de TableStorage gehaald wordt) dan is IsReplaying weer false. We zijn immers hier nog niet geweest.

Ga nu niet proberen de boel te optimaliseren door dit te doen:

public static async Task MyOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext ctx, 
    ILogger log)
{
    var invocationDate = ctx.CurrentUtcDateTime;

    if(!ctx.IsReplaying)
        log.LogInformation($"Code is invoked at {invocationDate}.");

    if(!ctx.IsReplaying)
        var result = await ctx.CallActivityAsync<bool>("A_MyActivity", invocationDate);

    if (!ctx.IsReplaying)
        var moreResult = await ctx.CallActivityAsync<Attendee>("A_EnrichAttendee", null);
}

Je moet echt iedere keer die CallActivityAsync aanroepen. Weet je nog? De eerste keer zorgt dit er voor dat de functie uitgevoerd wordt, de tweede, derde en andere keren haalt die code regel het resultaat op uit de TableStorage. Dus die moet iedere keer opnieuw aangeroepen worden.

Ik gebruik die ctx.IsReplaying dan eigenlijk alleen maar voor het loggen van informatie. Dat hoef ik namelijk niet iedere keer te zien. De data is steeds gelijk dus 1 logregel per keer is genoeg.

Stateless durable functions

En nu weet je gelijk hoe het kan dat we stateless kunnen zijn en toch onze data kunnen behouden. Alle data wordt continue opgeslagen in een TableStorage. Deze storage kunnen we zo configureren dat we zelfs geo-redundant zijn. Dus de data staat keurig ergens in de cloud, de functies worden iedere keer opnieuw gestart (en de app-domain opnieuw aangemaakt, zelfs de VM waar het op draait kan opnieuw aangemaakt worden). We zijn volledig serverless en stateless bezig maar voor ons als ontwikkelaar lijkt het alsof we een eenvoudige flow hebben in een method waarin data bewaard blijft tussen de calls in. Het lijkt wel magie!

Even terug naar onze originele code. Als we deze voor de allereerste keer draaien, maakt de runtime alle structuren voor ons aan. Ik heb het al een paar keer over die TableStorage gehad, laten we eens kijken hoe dat er uit ziet. En daar hebben we die Storage Explorer voor nodig.

Als ik onze code nu een keer ga draaien, krijg ik het resultaat dat we al gezien hebben. Ik geef even een geldig email adres mee zodat we de hele flow (hoe kort ook) doorlopen.

Daarna start ik de Storage Explorer. Die ziet er dan bij mij als volgt uit:

Storage Explorer

Zoals je ziet, heeft de runtime van alles aangemaakt. We hebben blob containers, queues, en tables.

Die queues (4 control queues en 1 workitems queue, die het lopende werk bevatten) zijn het hart van het systeem. Ik vertelde in de allereerste post hoe je functions kunt laten communiceren: door het versturen van messages in een queue. Later vertelde ik dat we dat kunnen voorkomen door Durable Functions te gebruiken. Maar wat blijkt nu: dat is precies hoe Durable Functions werken. Ze versturen zelf allemaal berichten in allerlei queues om het werk te synchroniseren. Het verschil: we hoeven het dit keer niet zelf te schrijven. Maar uiteindelijk, onder water, zijn alle orchestrator en activity functions gewoon queue-triggered functions. We zien het alleen niet.

Alle data die we nodig hebben voor het runnen van een workflow staat in de tabel DurableFunctionsHistory. Durable Functions zijn een voorbeeld van event-sourcing: alle data wordt stap voor stap opgeslagen zodat we deze later kunnen herhalen.

In het voorbeeld dat ik hier gaf kun je dat duidelijk zien. Je ziet precies welke stappen er genomen worden. In de kolom EventType staat precies welke stap er genomen wordt en daar achter in de andere kolommen input en result (niet zichtbaar in mijn voorbeeld maar bij jou kun je hem wel zien) staat de data die heen en weer gaat.

We kunnen ook mooi zien wat de flow is:

  1. Orchestrator started. We beginnen de flow dus
  2. Execution started. We gaan een instance uitvoeren
  3. Task Scheduled. Onze A_CheckInput method wordt klaar gezet om te beginnen
  4. Orchestrator Completed. Tot A_CheckInput klaar is, is de orchestrator voorlopig klaar
  5. Orchestrator started. Blijkbaar is A_CheckInput klaar: we gaan opnieuw beginnen
  6. Task Completed. We gaan het resultaat van A_CheckInput ophalen (in de result staat nu dan de waarde true)
  7. Execution Completed. We zijn aan het einde van deze instance flow
  8. Orchestrator Completed. De orchestrator wordt weer afgebroken

Je kunt de flow met alle data, timestamps en dergelijke dus perfect volgen.

En nu?

We hebben weer een hoop besproken. In de volgende post gaan we een wat meer uitdagende activity maken: we gaan de andere usergroups informeren over het feit dat iemand zich aanmeldt.

Tot dan!

Gepubliceerd door Dennis Vroegop

Passionate Azure Cloud Solutions Architect. I am a enthusiastic guitar player (though not as good at it as I'd like to be) and in my daytime job I teach software developers to be better at their job. Married to my wonderful wife Diana with whom I try to raise our daughter Emma.

Eén opmerking over 'Azure Durable Functions, deel 4: de workflow van de workflow'

Geef een reactie

Vul je gegevens in of klik op een icoon om in te loggen.

WordPress.com logo

Je reageert onder je WordPress.com account. Log uit /  Bijwerken )

Twitter-afbeelding

Je reageert onder je Twitter account. Log uit /  Bijwerken )

Facebook foto

Je reageert onder je Facebook account. Log uit /  Bijwerken )

Verbinden met %s

Deze site gebruikt Akismet om spam te bestrijden. Ontdek hoe de data van je reactie verwerkt wordt.

%d bloggers liken dit: