Skip to content

Workflow patterns

Complete tests for the workflow shapes you'll hit most often: sequential steps, child contexts, parallel branches, partial failures, long waits, and polling. Each section shows a handler and the test that exercises it. For the assertion vocabulary used inside each test, see Assertions.

Sequential steps

A sequential workflow runs steps one after another, passing results between them. Verify that all steps ran and that the final result reflects the full chain.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: { orderId: string }, context: DurableContext) => {
  const validated = await context.step("validate", () => ({ orderId: event.orderId, status: "validated" }));
  const paid = await context.step("payment", () => ({ ...validated, payment: "completed" }));
  const fulfilled = await context.step("fulfillment", () => ({ ...paid, fulfillment: "shipped" }));
  return fulfilled;
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("executes all steps in order", async () => {
  const result = await runner.run({ payload: { orderId: "order-123" } });

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);

  const ops = result.getOperations().filter(op => op.getType() === OperationType.STEP);
  expect(ops.length).toBe(3);
  expect(ops.map(op => op.getName())).toEqual(["validate", "payment", "fulfillment"]);
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def validate(ctx: StepContext, order_id: str) -> dict:
    return {"order_id": order_id, "status": "validated"}


@durable_step
def payment(ctx: StepContext, order: dict) -> dict:
    return {**order, "payment": "completed"}


@durable_step
def fulfillment(ctx: StepContext, order: dict) -> dict:
    return {**order, "fulfillment": "shipped"}


@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    validated = context.step(validate(event["order_id"]), name="validate")
    paid = context.step(payment(validated), name="payment")
    return context.step(fulfillment(paid), name="fulfillment")


def test_executes_all_steps_in_order():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(input='{"order_id": "order-123"}', timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED

    step_ops = [op for op in result.operations if op.operation_type == OperationType.STEP]
    assert len(step_ops) == 3
    assert [op.name for op in step_ops] == ["validate", "payment", "fulfillment"]
import static org.junit.jupiter.api.Assertions.*;

import java.util.Map;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class SequentialWorkflowTest {

    record Order(String orderId) {}

    @Test
    void executesAllStepsInOrder() {
        var runner = LocalDurableTestRunner.create(
            Order.class,
            (input, context) -> {
                var validated = context.step("validate", Map.class,
                    ctx -> Map.of("orderId", input.orderId(), "status", "validated"));
                var paid = context.step("payment", Map.class,
                    ctx -> Map.of("orderId", input.orderId(), "payment", "completed"));
                return context.step("fulfillment", Map.class,
                    ctx -> Map.of("orderId", input.orderId(), "fulfillment", "shipped"));
            }
        );

        var result = runner.runUntilComplete(new Order("order-123"));

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

        var stepOps = result.getOperations().stream()
            .filter(op -> op.getType() == OperationType.STEP)
            .toList();
        assertEquals(3, stepOps.size());
        assertEquals("validate", stepOps.get(0).getName());
        assertEquals("payment", stepOps.get(1).getName());
        assertEquals("fulfillment", stepOps.get(2).getName());
    }
}

Child contexts

Child contexts group operations under a named scope. The test result exposes the child context as a CONTEXT operation. You can inspect its type and walk its child operations.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  return await context.runInChildContext("process", async (child) => {
    const a = await child.step("step-a", () => "result-a");
    const b = await child.step("step-b", () => "result-b");
    return `${a}:${b}`;
  });
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("executes steps inside a child context", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toBe("result-a:result-b");

  const ctx = runner.getOperation("process");
  expect(ctx.getType()).toBe(OperationType.CONTEXT);

  const children = ctx.getChildOperations();
  expect(children?.length).toBe(2);
  expect(children?.map(c => c.getName())).toEqual(["step-a", "step-b"]);
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def step_a(ctx: StepContext) -> str:
    return "result-a"


@durable_step
def step_b(ctx: StepContext) -> str:
    return "result-b"


@durable_execution
def handler(event, context: DurableContext) -> str:
    def process(child: DurableContext) -> str:
        a = child.step(step_a())
        b = child.step(step_b())
        return f"{a}:{b}"

    return context.run_in_child_context(process, name="process")


def test_executes_steps_inside_child_context():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED

    ctx_ops = [op for op in result.operations if op.operation_type == OperationType.CONTEXT]
    assert len(ctx_ops) == 1
    assert ctx_ops[0].name == "process"
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class ChildContextTest {

    @Test
    void executesStepsInsideChildContext() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> context.runInChildContext("process", String.class, child -> {
                var a = child.step("step-a", String.class, ctx -> "result-a");
                var b = child.step("step-b", String.class, ctx -> "result-b");
                return a + ":" + b;
            })
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals("result-a:result-b", result.getResult(String.class));

        var ctx = result.getOperation("process");
        assertNotNull(ctx);
        assertEquals(OperationType.CONTEXT, ctx.getType());
    }
}

Parallel operations

Parallel branches execute concurrently. Assert on the final result to verify all branches completed.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  const results = await context.parallel("fetch-all", [
    async (ctx) => await ctx.step("fetch-a", () => "data-a"),
    async (ctx) => await ctx.step("fetch-b", () => "data-b"),
    async (ctx) => await ctx.step("fetch-c", () => "data-c"),
  ]);
  return results.getSucceeded();
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("executes branches in parallel and collects results", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toEqual(["data-a", "data-b", "data-c"]);
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def fetch_a(ctx: StepContext) -> str:
    return "data-a"


@durable_step
def fetch_b(ctx: StepContext) -> str:
    return "data-b"


@durable_step
def fetch_c(ctx: StepContext) -> str:
    return "data-c"


@durable_execution
def handler(event, context: DurableContext) -> list:
    results = context.parallel(
        [
            lambda ctx: ctx.step(fetch_a()),
            lambda ctx: ctx.step(fetch_b()),
            lambda ctx: ctx.step(fetch_c()),
        ],
        name="fetch-all",
    )
    return results.get_succeeded()


def test_executes_branches_in_parallel():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class ParallelWorkflowTest {

    @Test
    void executesBranchesInParallel() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> {
                try (var parallel = context.parallel("fetch-all")) {
                    parallel.branch("fetch-a", String.class, ctx -> "data-a");
                    parallel.branch("fetch-b", String.class, ctx -> "data-b");
                    parallel.branch("fetch-c", String.class, ctx -> "data-c");
                }
                return null;
            }
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
    }
}

Partial failures

When a step fails after earlier steps have succeeded, the execution fails but the completed steps remain in the operation history. Assert on the status of individual steps to verify which ones ran before the failure.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationStatus } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  await context.step("step-1", () => "ok");
  await context.step("step-2", () => "ok");
  await context.step("step-3", () => { throw new Error("step-3 failed"); });
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("records which steps succeeded before the failure", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.FAILED);

  const step1 = runner.getOperation("step-1");
  expect(step1.getStatus()).toBe(OperationStatus.SUCCEEDED);

  const step2 = runner.getOperation("step-2");
  expect(step2.getStatus()).toBe(OperationStatus.SUCCEEDED);

  expect(result.getError().errorMessage).toContain("step-3 failed");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def step_1(ctx: StepContext) -> str:
    return "ok"


@durable_step
def step_2(ctx: StepContext) -> str:
    return "ok"


@durable_step
def step_3(ctx: StepContext) -> str:
    raise RuntimeError("step-3 failed")


@durable_execution
def handler(event, context: DurableContext):
    context.step(step_1(), name="step-1")
    context.step(step_2(), name="step-2")
    context.step(step_3(), name="step-3")


def test_records_which_steps_succeeded_before_failure():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.FAILED

    s1 = result.get_step("step-1")
    assert s1.status is OperationStatus.SUCCEEDED

    s2 = result.get_step("step-2")
    assert s2.status is OperationStatus.SUCCEEDED

    assert result.error is not None
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class PartialFailuresTest {

    @Test
    void recordsWhichStepsSucceededBeforeFailure() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> {
                context.step("step-1", String.class, ctx -> "ok");
                context.step("step-2", String.class, ctx -> "ok");
                context.step("step-3", String.class, ctx -> {
                    throw new RuntimeException("step-3 failed");
                });
                return null;
            }
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.FAILED, result.getStatus());

        assertEquals(OperationStatus.SUCCEEDED, result.getOperation("step-1").getStatus());
        assertEquals(OperationStatus.SUCCEEDED, result.getOperation("step-2").getStatus());
        assertTrue(result.getError().isPresent());
    }
}

Long waits

Workflows with long waits (hours or days) would make tests impractical without time skipping. Each SDK handles this differently.

Pass { skipTime: true } to setupTestEnvironment() and the runner advances fake timers automatically. Waits complete instantly.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  await context.wait("cooling-off", { hours: 24 });
  return await context.step("after-wait", () => "done");
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("completes instantly with time skipping enabled", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toBe("done");

  const wait = runner.getOperation("cooling-off");
  expect(wait.getType()).toBe(OperationType.WAIT);
  expect(wait.getWaitDetails()?.waitSeconds).toBe(86400);
});

Set the DURABLE_EXECUTION_TIME_SCALE environment variable to scale context.wait() durations. Set it to 0.0 for instant waits, or to a small fraction such as 0.01 to run waits at 100x speed. The scale does not apply to step retry delays.

import os

os.environ["DURABLE_EXECUTION_TIME_SCALE"] = "0.0"

from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def after_wait(ctx: StepContext) -> str:
    return "done"


@durable_execution
def handler(event, context: DurableContext) -> str:
    context.wait(Duration.from_hours(24), name="cooling-off")
    return context.step(after_wait(), name="after-wait")


def test_completes_with_long_wait():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED

    wait_ops = [op for op in result.operations if op.operation_type == OperationType.WAIT]
    assert len(wait_ops) == 1
    assert wait_ops[0].name == "cooling-off"

runUntilComplete() calls advanceTime() automatically, which immediately completes STARTED waits without waiting for real time.

import static org.junit.jupiter.api.Assertions.*;

import java.time.Duration;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class LongWaitsTest {

    @Test
    void completesWithLongWait() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> {
                context.wait("cooling-off", Duration.ofHours(24));
                return context.step("after-wait", String.class, ctx -> "done");
            }
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals("done", result.getResult(String.class));

        var waitOps = result.getOperations().stream()
            .filter(op -> op.getType() == OperationType.WAIT)
            .toList();
        assertEquals(1, waitOps.size());
        assertEquals("cooling-off", waitOps.get(0).getName());
    }
}

Polling with waitForCondition

waitForCondition polls a check function until it signals done. The test runner drives the polling loop the same way it drives retries.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";

interface PollState {
  attempts: number;
  done: boolean;
}

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  return await context.waitForCondition<PollState>(
    "poll-job",
    async (state) => {
      const next = { attempts: state.attempts + 1, done: state.attempts >= 2 };
      return next;
    },
    {
      initialState: { attempts: 0, done: false },
      waitStrategy: (state) =>
        state.done
          ? { shouldContinue: false }
          : { shouldContinue: true, delay: { seconds: 1 } },
    },
  );
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("polls until condition is met", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult<PollState>()?.done).toBe(true);
});
from dataclasses import dataclass
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext
from aws_durable_execution_sdk_python.waits import (
    WaitForConditionConfig,
    WaitForConditionDecision,
)
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@dataclass
class PollState:
    attempts: int
    done: bool


def check(state: PollState, ctx: WaitForConditionCheckContext) -> PollState:
    return PollState(attempts=state.attempts + 1, done=state.attempts >= 2)


def wait_strategy(state: PollState, attempts_made: int) -> WaitForConditionDecision:
    if state.done:
        return WaitForConditionDecision.stop_polling()
    return WaitForConditionDecision.continue_waiting(Duration.from_seconds(1))


@durable_execution
def handler(event, context: DurableContext) -> PollState:
    return context.wait_for_condition(
        check,
        WaitForConditionConfig(
            wait_strategy=wait_strategy,
            initial_state=PollState(attempts=0, done=False),
        ),
        name="poll-job",
    )


def test_polls_until_condition_is_met():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=30)

    assert result.status is InvocationStatus.SUCCEEDED
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.config.WaitForConditionConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.model.WaitForConditionResult;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class PollingTest {

    record PollState(int attempts, boolean done) {}

    @Test
    void pollsUntilConditionIsMet() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> context.waitForCondition(
                "poll-job",
                PollState.class,
                (state, ctx) -> {
                    var next = new PollState(state.attempts() + 1, state.attempts() >= 2);
                    return next.done()
                        ? WaitForConditionResult.stopPolling(next)
                        : WaitForConditionResult.continuePolling(next);
                },
                WaitForConditionConfig.<PollState>builder()
                    .initialState(new PollState(0, false))
                    .build()
            )
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertTrue(result.getResult(PollState.class).done());
    }
}

See also