#java #architecture #exeris #jvm-performance #loom

Gdzie kończy się StructuredTaskScope: Budowa warstwy Flow w Exeris

STS daje strukturalną współbieżność wewnątrz scope. Flow daje strukturalne wykonanie rozłożone w czasie. W momencie gdy krok musi zostać zaparkowany i wznowiony po zewnętrznym zdarzeniu — opuściłeś terytorium STS.

To jest piąty artykuł w serii Exeris Kernel.

Seria stopniowo buduje jeden spójny obraz architektoniczny: artykuł pierwszy zastąpił ThreadLocal przez ScopedValue na ścieżce propagacji kontekstu; artykuł drugi dowodził że niektóre warstwy nie zyskują nic z polimorfizmu runtime’owego; artykuł trzeci pokazał gdzie StructuredTaskScope faktycznie zarabia na swoje miejsce; artykuł czwarty wypchnął TLS poza stertę.

Ten jest o tym gdzie STS przestaje być właściwym narzędziem — i co trzeba było zbudować zamiast.


Ograniczenie na które wciąż trafiałem

Presja przyszła z konkretnego miejsca: budowałem warstwę orkiestracji w Exeris Kernel — tę część która prowadzi wieloetapowe procesy biznesowe z kompensacją i podpinaniem zewnętrznych zdarzeń.

Oczywistym punktem startowym był jakiś istniejący enterprise’owy framework do sag. Rozważałem Camundę, Axon i kilka innych w tej kategorii, i odrzuciłem wszystkie. Elastyczności jakiej wymagały pozostałe ograniczenia kernel’a — zero-allocation hot paths, propagacja ScopedValue, off-heap state — nie dało się dorobić do frameworka którego model orkiestracji był już z góry przesądzony. Każdy z nich oznaczał też oddzielny proces operacyjny — Axon Server, Camunda engine — z własną JVM, śladem pamięciowym i cyklem życia. Wymiar kosztowo-wydajnościowy zmierzyłem porządnie dopiero później, ale narzut operacyjny był widoczny od pierwszego dnia.

Więc zszedłem o poziom niżej. Na pierwszy rzut oka StructuredTaskScope wydawał się naturalnym dopasowaniem. Forkujesz pracę, joinujesz ją, dostajesz strukturalne gwarancje cyklu życia. Project Loom uczynił to tanim i czystym.

Potem trafiłem na krok który musiał czekać na potwierdzenie płatności.

Nie kilka milisekund. Nie do momentu gdy downstream’owy serwis odpowie synchronicznie. Krok musiał oddać wykonanie, utrwalić swój stan i wznowić się gdy nadejdzie zewnętrzne zdarzenie — możliwie wiele godzin później, możliwie po restarcie JVM.

StructuredTaskScope nie ma na to modelu. Jest strukturalny w przestrzeni — wszystkie sforkowane zadania są ograniczone do scope’a żyjącego na pojedynczym stosie wywołań. Nie jest strukturalny w czasie. W momencie gdy granica wykonania musi rozciągać się przez czas a nie przez scope, opuściłeś terytorium StructuredTaskScope całkowicie.

To jest ograniczenie które zadecydowało o architekturze.


Co STS dalej robi wewnątrz Flow

Zanim pójdę dalej: StructuredTaskScope nie został zastąpiony. Jest dalej używany w kilku miejscach w kernel’u — i nie zawsze do tego co pokazują standardowe wprowadzenia do STS.

Pierwsze użycie jest oczywiste. Wewnątrz pojedynczego kroku Flow, kiedy akcja musi wywołać dwa niezależne serwisy i połączyć wyniki przed zwróceniem FlowOutcome.CONTINUE. Wykonanie jest ograniczone do Virtual Thread’a tego kroku. STS posiada fan-out; Flow posiada granicę kroku. Standardowa współbieżność strukturalna.

Drugie jest na L3, wewnątrz subsystemu Events. InMemoryEventBus.publishAndAwait otwiera scope, forkuje jeden Virtual Thread na każdy zarejestrowany handler, joinuje i wraca gdy każdy handler skończył. CommunityEventLoop.dispatchBatch ma ten sam kształt w innym tempie: na każdą wydrenowaną paczkę zdarzeń forkuje jeden VT na zarejestrowany batch processor, joinuje gdy wszystkie skończą. Oba to czyste ad-hoc fan-outy z deterministycznym punktem joina — dokładnie ten przypadek do którego STS było projektowane.

Trzecie jest najsubtelniejsze. OutboxOrchestrator.ownerLoop otwiera scope i forkuje dokładnie jedno zadanie — swoją długożyjącą pętlę poll-and-flush. Po co używać STS dla pojedynczego forka? Bo Java 26 wymusza regułę owner-thread: open(), fork(), join() i close() muszą wszystkie wydarzyć się na tym samym wątku. Orchestrator spawnuje dedykowany owner Virtual Thread właśnie po to żeby spełnić to ograniczenie, dzięki czemu lifecycle scope pętli jest jawny i strukturalny zamiast niejawny i śledzony przez wątki.

Punkt w którym Flow przejmuje to granica między krokami — moment w którym krok zwraca coś innego niż “skończyłem, jedź dalej”. Poniżej tej granicy STS pozostaje właściwym narzędziem, we wszystkich trzech wzorcach powyżej. Flow podejmuje pracę tam gdzie kończy się stos wywołań.


Model wykonania

Model wykonania przyszedł z miejsca którego się nie spodziewałem. Po wykluczeniu enterprise’owych frameworków saga i zejściu o poziom niżej do STS, dalej musiałem zdecydować jak ma wyglądać sama maszyna stanów sagi. Pierwsza analogia która przyszła mi do głowy to maszyna stanów TLS którą właśnie kończyłem implementować (opisana w artykule czwartym). TLS to jedna z najbardziej rygorystycznie wyspecyfikowanych maszyn stanów w powszechnie używanym oprogramowaniu — stany jawne, przejścia deterministyczne, I/O ograniczone do granic stanów. Orkiestracja sagi, strukturalnie, ma ten sam kształt. Spróbowałem zaimitować wzorzec TLS. Pasował i tak już zostało.

Każda instancja Flow działa na własnym Virtual Thread’zie, uruchamianym przez CoreFlowRuntime:

// From CoreFlowRuntime.launch()
Thread thread = Thread.ofVirtual()
    .name("exeris-flow-"
          + instance.key().instanceIdMost()
          + '-'
          + instance.key().instanceIdLeast())
    .unstarted(() -> runInstance(instance, startStep));
runningThreads.add(thread);
thread.start();

Ten Virtual Thread wykonuje kroki w pętli dopóki nie wydarzy się jedna z trzech rzeczy:

// From CoreFlowRuntime.applyOutcome()
return switch (outcome) {
    case CONTINUE -> applyContinueOutcome(instance, step, stepIndex);
    case COMPLETE -> {
        applyCompleteOutcome(instance, step);
        yield -1;
    }
    case PARK -> {
        applyParkOutcome(instance, stepIndex);
        yield -1;
    }
    default -> -1;
};

CONTINUE — przejdź do następnego kroku, zostań na tym samym Virtual Thread’zie. COMPLETE — short-circuit bezpośrednio do stanu terminalnego, pomijając pozostałe kroki. PARK — to jest granica. Virtual Thread kończy działanie. Stan zostaje utrwalony. Wykonanie jest zawieszone do jawnego wywołania wake().

Rysunek 1: Przejścia FlowState i granica Virtual Thread'a.

Ta ścieżka PARK to dokładnie to co nie ma odpowiednika w StructuredTaskScope. Kiedy applyParkOutcome się wykonuje, serializuje bieżący FlowSnapshot — indeks kroku, stos kompensacji, stan, timeout — i rejestruje instancję w parked map. Późniejsze wywołanie wake() rozwiązuje ten snapshot, albo z pamięci, albo z FlowSnapshotStore, i uruchamia nowy Virtual Thread który wznawia od currentStep + 1.


Definiowanie Flow

Zanim Flow może zostać uruchomiony, musi zostać skompilowany w plan wykonania. API buildera jest celowo jawne — deklarujesz kroki, ich akcje kompensacyjne i wszelkie nieliniowne przejścia:

FlowExecutionPlan orderFulfillment = engine.plans()
    .newDefinition("order-fulfillment")
    .step("validate-order",
          ctx -> {
              validateOrder(ctx);
              return FlowOutcome.CONTINUE;
          },
          ctx -> { /* no rollback needed for validation */ })
    .step("charge-payment",
          ctx -> {
              initiatePayment(ctx);
              return FlowOutcome.PARK; // wait for async confirmation
          },
          ctx -> refundPayment(ctx))
    .step("ship-order",
          ctx -> {
              dispatchShipment(ctx);
              return FlowOutcome.COMPLETE;
          },
          null)
    .build();

engine.plans().compile(orderFulfillment);

compile() waliduje graf kroków, buduje macierz sąsiedztwa i tablicę indeksów nextStep, i rejestruje plan w planCatalog. Po tym punkcie silnik może schedulować instancje tej definicji.

Trade-off tutaj jest świadomy: definicja jest closed-world w momencie kompilacji. Dodawanie lub przestawianie kroków gdy w locie są istniejące instancje to problem migracji schematu — i silnik nie rozwiązuje tego po cichu. Jeśli saga w locie była zaparkowana na kroku który po redeployu już nie istnieje, na wake’u rzucane jest EX-FLOW-7002 z SCHEMA_MISMATCH. Bezpieczny wzorzec to blue/green z drainem sag przed przełączeniem ruchu. Zachowałem to ograniczenie jako jawne zamiast ukrywać je za version-transparent routing’iem, bo transparentne wersjonowanie które tak naprawdę nie gwarantuje poprawności jest gorsze niż widoczne tarcie.


Gdy wake przychodzi z zewnątrz

Park/wake z bezpośrednim wywołaniem scheduler.wake() pokrywa przypadek gdy kontrolujesz obie strony. Trudniejszy przypadek to choreografia: Flow ma się wznowić gdy nadejdzie zdarzenie z zewnętrznego systemu — bramki płatniczej, serwisu magazynowego, akcji użytkownika.

Tutaj wchodzą FlowChoreographyBridge i sealed typ ChoreographyDecision. Rejestrujesz mapper który tłumaczy przychodzące deskryptory zdarzeń na decyzje routingu:

engine.registerChoreographyMapper(
    descriptor -> {
        // EventDescriptor carries the flow instance ID in streamIdHigh/streamIdLow.
        // The convention: the event producer encodes the target instance UUID
        // in those fields when publishing a choreography trigger.
        long instanceMost = descriptor.streamIdHigh();
        long instanceLeast = descriptor.streamIdLow();
        if (instanceMost == 0 && instanceLeast == 0) {
            return ChoreographyDecision.ignore();
        }
        return ChoreographyDecision.wake(instanceMost, instanceLeast);
    },
    List.of("payment.confirmed"),
    eventBus
);

Implementacja bridge’a używa pattern matchingu z Java 21 na sealed hierarchii żeby dispatchować bez instanceof chainów:

// From FlowChoreographyBridge.handle()
switch (decision) {
    case ChoreographyDecision.Wake(long most, long least) -> {
        scheduler.lookupParked(most, least).ifPresent(scheduler::wake);
        // Stale or duplicate wake event: idempotent no-op if instance no longer parked
    }
    case ChoreographyDecision.Start(FlowExecutionPlan plan, long most, long least) -> {
        scheduler.schedule(plan, newContext(plan, most, least));
    }
    case ChoreographyDecision.Ignore() -> { /* intentional no-op */ }
}

Sealed interface ChoreographyDecision daje gwarancje wyczerpywalności od kompilatora. Dodanie nowego wariantu decyzji bez obsługi w bridge’u jest błędem kompilacji, a nie cichym runtime miss’em. To ten sam wzorzec zastosowany do FlowOutcome w pętli wykonywania kroków — sealed types jako bariera architektoniczna.

Sam mapper to @FunctionalInterface. Domyka się nad dowolnym stanem korelacji jakiego potrzebujesz. Bridge nie narzuca jak masz rozwiązywać identyfikatory korelacji — to jest twoja logika domenowa.


Events i Flow: Dwie warstwy, jeden kontrakt deduplikacji

Subsystem Events (L3) i silnik Flow (L4) mają jawny podział deduplikacji który warto wyłożyć wprost, bo łatwo założyć że jedna warstwa załatwia wszystko.

EventBus dostarcza at-least-once. Nie ma wbudowanej deduplikacji na poziomie busa — to świadoma decyzja projektowa. Wbudowana deduplikacja bus-level wymaga stanu współdzielonego między wszystkimi subskrybentami: albo ConcurrentHashMap na stercie (presja na GC), albo distributed lock (latencja). Żadne z tych nie jest akceptowalne na poziomie wydajności do którego subsystem Events celuje.

IdempotencyGuard na L4 pokrywa przypadek Flow-specific: dedup na poziomie kroku per instancja Sagi. Podział to:

WarstwaDeduplikacjaMechanizm
EventBus (L3)Brak — at-least-onceOdpowiedzialność subskrybenta
Flow Engine (L4)Na krok, na instancjęIdempotencyGuard.tryClaimStep()
AplikacjaNa mutację stanuEventDescriptor.eventUuidHigh/Low jako klucz dedup

To znaczy że zdarzenie choreography wake może przyjść dwa razy — retry Outboxa, reconnect brokera, duplikat sieciowy. Bridge wywołuje scheduler.lookupParked(), a potem scheduler.wake(). Jeśli Flow nie jest już zaparkowany (został już zbudzony przez pierwsze dostarczenie), lookupParked() zwraca empty i drugi wake jest cichym no-opem. Jeśli Flow się budzi i wchodzi ponownie w krok który już wykonał, IdempotencyGuard pomija go i przechodzi dalej. Dwie sieci bezpieczeństwa, żadna z nich nie wymaga koordynacji między tymi dwoma warstwami.

Punktem integracji między nimi jest FlowProgressPublisher: kiedy Flow osiąga stan terminalny, opcjonalnie publikuje zdarzenie FlowProgress do EventBus. Tylko przejścia terminalne emitują — pośrednie zmiany stanu są celowo pomijane, żeby uniknąć alokacji na hot path schedulingu.


Idempotencja przy replay

Replay przy crash-recovery i ponowne wake’i z choreografii tworzą prawdziwy problem deduplikacji: krok który już raz wykonał się pomyślnie nie powinien wykonać się ponownie jeśli silnik zrestartuje się w trakcie Flow.

IdempotencyGuard obsługuje to na poziomie kroku:

// From CoreFlowRuntime.runStep() — called inside synchronized(instance.monitor())
if (guard != null && !guard.tryClaimStep(
        instance.key().instanceIdMost(),
        instance.key().instanceIdLeast(),
        stepIndex)) {
    // Step already claimed — skip and advance
    int nextGuardIndex = instance.plan().nextStep(stepIndex);
    if (nextGuardIndex < 0) {
        complete(instance);
        return -1;
    }
    return nextGuardIndex;
}

Domyślna implementacja CoreIdempotencyGuard jest oparta o stertę: ConcurrentMap<FlowKey, ConcurrentMap<Integer, Boolean>>. Wewnętrzna mapa kluczowana po indeksie kroku sprawia że releaseInstance() to usunięcie całego wpisu instancji w O(1), a nie O(claimed steps). Przy complete() albo FAILED_ROLLEDBACK guard zwalnia claim instancji.

Customowe implementacje IdempotencyGuard — oparte o Redis, bazę danych lub dowolny inny współdzielony store — można wpiąć przez KernelProviders.IDEMPOTENCY_GUARD przed submitnięciem do silnika.

W nadchodzącym modelu distributed (ADR-013) idempotencja staje się dwuwarstwowa: heap CAS w pamięci w CoreIdempotencyGuard i trwały CAS na FlowSnapshot.schemaVersion w JdbcFlowSnapshotStore. ADR-013 wymaga żeby te dwie warstwy zgadzały się co do stanów terminalnych — jeśli heap guard raportuje “już zrobione”, a trwały wiersz twierdzi inaczej, wygrywa trwała odpowiedź a stan heapu jest reconcile’owany przy następnym loadzie. Żadna z warstw nie może być jedynym źródłem prawdy w deploycie distributed.

To nie jest uniwersalne. Pokrywa tylko przypadki w których ścieżka wykonania jest dalej pod kontrolą kernel’a. Zewnętrzne side effects — płatność została zainicjowana ale odpowiedź została utracona — pozostają odpowiedzialnością wywołującego do uczynienia idempotentnymi. Guard zapobiega ponownemu wykonaniu przez kernel kroku który już zaclaim’ował. Nie może zapobiec temu żeby serwis downstream zobaczył duplikat wywołania jeśli krok wykonał się pomyślnie przed crashem.


Co dalej pozostaje prawdą

Kompensacja jest potężna, ale ma swoją cenę: każdy krok który może zostać wycofany musi być napisany jako idempotentna operacja odwrotna. Jeśli twoja akcja kompensacyjna ma side effects których nie da się cofnąć — notyfikacja już wysłana, rekord audytowy już zapisany — modelujesz biznesowy invariant którego kompensacja nie może zaspokoić. Silnik dostarcza mechanizm. Poprawność dalej należy do ciebie.

terminalStateCatalog w CoreFlowRuntime rośnie od start() aż do close(). To jest in-process’owa bariera idempotencji która zapobiega ponownemu schedulowaniu już-terminalnych Flow’ów w obrębie pojedynczego runtime lifetime’u. Jest to akceptowalne dla obecnego modelu operacyjnego, ale jest to znane ograniczenie: bardzo długo żyjące runtime’y z wysokim wolumenem Flow zobaczą jak ta mapa rośnie. Bounded retention jest w roadmapie v0.7 — ale każda polityka musi zachować semantykę bariery. Flow terminalny nie może być ponownie wykonywalny.

Choreography wake przeżywający restart nie jest jeszcze wspierany. Po restarcie zaparkowane Flow przeżywają tylko jako snapshoty w FlowSnapshotStore; lookupParked() obecnie rozwiązuje tylko z indeksu in-memory. Roadmap v0.7 zamyka to przez JdbcFlowSnapshotStore z entry pointem do parked-enumeration — na starcie silnik rehydratuje routing wake’ów z trwałego store’a. Choreografia w steady-state zostaje na fast path O(1) w pamięci; probe store’a jest tylko fallbackiem przy missie.

Jedno ograniczenie z tej nadchodzącej implementacji JDBC warto tu odnotować: JdbcFlowSnapshotStore ma jawnie zakazane używanie ThreadLocal do propagacji kontekstu. DataSource jest wstrzykiwany przez konstruktor przez BootstrapContext. To samo ograniczenie które zbanowało ThreadLocal z warstwy propagacji kontekstu kernel’a sięga aż do warstwy distributed saga state.

Panama FFM nie pojawia się w tej warstwie bezpośrednio. Ograniczenie alokacji jest egzekwowane na hot path schedulingu przez FlowZeroAllocTck, ale stan orkiestracji żyje na stercie. Model off-heap ownership z warstwy transport nie jest właściwym modelem dla maszyny stanów która musi robić checkpoint, restore i ewoluować przez restarty JVM.

Jednym miejscem gdzie filozofia zero-allocation sięga do tej warstwy jest EventDescriptor — metadane routingu używane przez subsystem Events i bridge choreografii. Siedem prymitywnych pól: dwie pary long dla UUID zdarzenia i streamu, dwa int dla ordinala i bitmapy flag, jeden long dla timestampa. Wire codec pakuje to w dokładnie 64 bajty — jedna linia cache CPU, z jawnym paddingiem MemoryLayout żeby ją wypełnić. C2 skalaryzuje rekord przez Escape Analysis już dziś. Kiedy JEP 401 osiągnie GA, dodanie modyfikatora value wymaga zerowych zmian w polach — nagłówki obiektów znikają za darmo.


Podsumowanie

StructuredTaskScope jest strukturalny w przestrzeni; Flow jest strukturalny w czasie. Rozróżnienie staje się widoczne dopiero na granicy między krokami, gdzie FlowOutcome.PARK kończy Virtual Thread, utrwala stan i pozwala innemu Virtual Thread’owi wznowić się później. Żadna abstrakcja STS nie mapuje się na ten kształt. Choreografia obsługuje przypadek gdy wake przychodzi z zewnątrz — sealed ChoreographyDecision i FlowChoreographyMapper czynią ścieżkę event-driven type-safe i wyczerpującą wobec kompilatora, więc dodanie nowego wariantu decyzji jest błędem kompilacji a nie cichym runtime miss’em.

Trade-offy pozostają widoczne: migracja schematu wymaga drain-and-switch, poprawność kompensacji to odpowiedzialność wywołującego, a terminal state catalog jest scope’owany do runtime’a. Żadnego z tych problemów Flow nie udaje że rozwiązuje transparentnie.

Następnym konkretnym krokiem jest distributed saga state, zdecydowany w ADR-013. Model: współdzielony trwały FlowSnapshotStore (referencyjna implementacja: JdbcFlowSnapshotStore nad Postgresem w v0.7 Sprint 3) jako źródło prawdy, z Kafka choreography wake dla cross-service saga recovery (Sprint 5/6). Optimistic concurrency przez FlowSnapshot.schemaVersion — już w SPI z SCHEMA_VERSION_INITIAL = 1L — rozwiązuje współbieżne próby advance’u z dwóch nodów kernel’a bez koordynatora.

Trzy podejścia zostały odrzucone zanim wylądowałem tam gdzie jestem. Distributed lock service (ZooKeeper/etcd) kładzie blokującą koordynację na ścieżce advance’u sagi — niespójne z kontraktem No-Waste-Compute. Replikacja stanu oparta o CRDT jest semantycznie niespójna dla modelu sagi: stos kompensacji i kolejność kroków nie są przemienne, więc współbieżne advance’y nie mogą być bezpiecznie merge’owane na poziomie struktury danych. Single-leader coordinator z Raft/Paxos wprowadza z powrotem scentralizowany stateful komponent którego kernel celowo unika.

Co pozostaje: cross-service choreography wake, parked-instance enumeration na starcie i wire-through schemaVersion w RuntimeFlowInstance.toSnapshot(). Ograniczenie migracji schematu (drain-and-switch) to osobny problem którego distributed saga state nie łagodzi — wymaga jawnego wersjonowania definicji FlowSnapshot, co jest scope’owane do późniejszego milestone’a.


Zobacz Exeris Kernel — zero-allocation architektura w działającym kodzie: 🔗 exeris-systems/exeris-kernel

Subsystem Flow znajduje się w exeris-kernel-core i exeris-kernel-spi. TCK pokrywający pełny cykl życia — submit, park, wake, kompensacja, crash-recovery — jest w exeris-kernel-tck.