Workflow patterns¶
Complete tests for the workflow shapes you'll hit most often: sequential steps, child contexts, parallel branches, partial failures, long waits, and polling. Each section shows a handler and the test that exercises it. For the assertion vocabulary used inside each test, see Assertions.
Sequential steps¶
A sequential workflow runs steps one after another, passing results between them. Verify that all steps ran and that the final result reflects the full chain.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: { orderId: string }, context: DurableContext) => {
const validated = await context.step("validate", () => ({ orderId: event.orderId, status: "validated" }));
const paid = await context.step("payment", () => ({ ...validated, payment: "completed" }));
const fulfilled = await context.step("fulfillment", () => ({ ...paid, fulfillment: "shipped" }));
return fulfilled;
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("executes all steps in order", async () => {
const result = await runner.run({ payload: { orderId: "order-123" } });
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
const ops = result.getOperations().filter(op => op.getType() === OperationType.STEP);
expect(ops.length).toBe(3);
expect(ops.map(op => op.getName())).toEqual(["validate", "payment", "fulfillment"]);
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def validate(ctx: StepContext, order_id: str) -> dict:
return {"order_id": order_id, "status": "validated"}
@durable_step
def payment(ctx: StepContext, order: dict) -> dict:
return {**order, "payment": "completed"}
@durable_step
def fulfillment(ctx: StepContext, order: dict) -> dict:
return {**order, "fulfillment": "shipped"}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
validated = context.step(validate(event["order_id"]), name="validate")
paid = context.step(payment(validated), name="payment")
return context.step(fulfillment(paid), name="fulfillment")
def test_executes_all_steps_in_order():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(input='{"order_id": "order-123"}', timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
step_ops = [op for op in result.operations if op.operation_type == OperationType.STEP]
assert len(step_ops) == 3
assert [op.name for op in step_ops] == ["validate", "payment", "fulfillment"]
import static org.junit.jupiter.api.Assertions.*;
import java.util.Map;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class SequentialWorkflowTest {
record Order(String orderId) {}
@Test
void executesAllStepsInOrder() {
var runner = LocalDurableTestRunner.create(
Order.class,
(input, context) -> {
var validated = context.step("validate", Map.class,
ctx -> Map.of("orderId", input.orderId(), "status", "validated"));
var paid = context.step("payment", Map.class,
ctx -> Map.of("orderId", input.orderId(), "payment", "completed"));
return context.step("fulfillment", Map.class,
ctx -> Map.of("orderId", input.orderId(), "fulfillment", "shipped"));
}
);
var result = runner.runUntilComplete(new Order("order-123"));
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
var stepOps = result.getOperations().stream()
.filter(op -> op.getType() == OperationType.STEP)
.toList();
assertEquals(3, stepOps.size());
assertEquals("validate", stepOps.get(0).getName());
assertEquals("payment", stepOps.get(1).getName());
assertEquals("fulfillment", stepOps.get(2).getName());
}
}
Child contexts¶
Child contexts group operations under a named scope. The test result exposes the child
context as a CONTEXT operation. You can inspect its type and walk its child
operations.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
return await context.runInChildContext("process", async (child) => {
const a = await child.step("step-a", () => "result-a");
const b = await child.step("step-b", () => "result-b");
return `${a}:${b}`;
});
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("executes steps inside a child context", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toBe("result-a:result-b");
const ctx = runner.getOperation("process");
expect(ctx.getType()).toBe(OperationType.CONTEXT);
const children = ctx.getChildOperations();
expect(children?.length).toBe(2);
expect(children?.map(c => c.getName())).toEqual(["step-a", "step-b"]);
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def step_a(ctx: StepContext) -> str:
return "result-a"
@durable_step
def step_b(ctx: StepContext) -> str:
return "result-b"
@durable_execution
def handler(event, context: DurableContext) -> str:
def process(child: DurableContext) -> str:
a = child.step(step_a())
b = child.step(step_b())
return f"{a}:{b}"
return context.run_in_child_context(process, name="process")
def test_executes_steps_inside_child_context():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
ctx_ops = [op for op in result.operations if op.operation_type == OperationType.CONTEXT]
assert len(ctx_ops) == 1
assert ctx_ops[0].name == "process"
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class ChildContextTest {
@Test
void executesStepsInsideChildContext() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> context.runInChildContext("process", String.class, child -> {
var a = child.step("step-a", String.class, ctx -> "result-a");
var b = child.step("step-b", String.class, ctx -> "result-b");
return a + ":" + b;
})
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("result-a:result-b", result.getResult(String.class));
var ctx = result.getOperation("process");
assertNotNull(ctx);
assertEquals(OperationType.CONTEXT, ctx.getType());
}
}
Parallel operations¶
Parallel branches execute concurrently. Assert on the final result to verify all branches completed.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
const results = await context.parallel("fetch-all", [
async (ctx) => await ctx.step("fetch-a", () => "data-a"),
async (ctx) => await ctx.step("fetch-b", () => "data-b"),
async (ctx) => await ctx.step("fetch-c", () => "data-c"),
]);
return results.getSucceeded();
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("executes branches in parallel and collects results", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toEqual(["data-a", "data-b", "data-c"]);
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def fetch_a(ctx: StepContext) -> str:
return "data-a"
@durable_step
def fetch_b(ctx: StepContext) -> str:
return "data-b"
@durable_step
def fetch_c(ctx: StepContext) -> str:
return "data-c"
@durable_execution
def handler(event, context: DurableContext) -> list:
results = context.parallel(
[
lambda ctx: ctx.step(fetch_a()),
lambda ctx: ctx.step(fetch_b()),
lambda ctx: ctx.step(fetch_c()),
],
name="fetch-all",
)
return results.get_succeeded()
def test_executes_branches_in_parallel():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class ParallelWorkflowTest {
@Test
void executesBranchesInParallel() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> {
try (var parallel = context.parallel("fetch-all")) {
parallel.branch("fetch-a", String.class, ctx -> "data-a");
parallel.branch("fetch-b", String.class, ctx -> "data-b");
parallel.branch("fetch-c", String.class, ctx -> "data-c");
}
return null;
}
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
}
}
Partial failures¶
When a step fails after earlier steps have succeeded, the execution fails but the completed steps remain in the operation history. Assert on the status of individual steps to verify which ones ran before the failure.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationStatus } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
await context.step("step-1", () => "ok");
await context.step("step-2", () => "ok");
await context.step("step-3", () => { throw new Error("step-3 failed"); });
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("records which steps succeeded before the failure", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.FAILED);
const step1 = runner.getOperation("step-1");
expect(step1.getStatus()).toBe(OperationStatus.SUCCEEDED);
const step2 = runner.getOperation("step-2");
expect(step2.getStatus()).toBe(OperationStatus.SUCCEEDED);
expect(result.getError().errorMessage).toContain("step-3 failed");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def step_1(ctx: StepContext) -> str:
return "ok"
@durable_step
def step_2(ctx: StepContext) -> str:
return "ok"
@durable_step
def step_3(ctx: StepContext) -> str:
raise RuntimeError("step-3 failed")
@durable_execution
def handler(event, context: DurableContext):
context.step(step_1(), name="step-1")
context.step(step_2(), name="step-2")
context.step(step_3(), name="step-3")
def test_records_which_steps_succeeded_before_failure():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.FAILED
s1 = result.get_step("step-1")
assert s1.status is OperationStatus.SUCCEEDED
s2 = result.get_step("step-2")
assert s2.status is OperationStatus.SUCCEEDED
assert result.error is not None
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class PartialFailuresTest {
@Test
void recordsWhichStepsSucceededBeforeFailure() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> {
context.step("step-1", String.class, ctx -> "ok");
context.step("step-2", String.class, ctx -> "ok");
context.step("step-3", String.class, ctx -> {
throw new RuntimeException("step-3 failed");
});
return null;
}
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals(OperationStatus.SUCCEEDED, result.getOperation("step-1").getStatus());
assertEquals(OperationStatus.SUCCEEDED, result.getOperation("step-2").getStatus());
assertTrue(result.getError().isPresent());
}
}
Long waits¶
Workflows with long waits (hours or days) would make tests impractical without time skipping. Each SDK handles this differently.
Pass { skipTime: true } to setupTestEnvironment() and the runner advances fake
timers automatically. Waits complete instantly.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
await context.wait("cooling-off", { hours: 24 });
return await context.step("after-wait", () => "done");
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("completes instantly with time skipping enabled", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toBe("done");
const wait = runner.getOperation("cooling-off");
expect(wait.getType()).toBe(OperationType.WAIT);
expect(wait.getWaitDetails()?.waitSeconds).toBe(86400);
});
Set the DURABLE_EXECUTION_TIME_SCALE environment variable to scale context.wait()
durations. Set it to 0.0 for instant waits, or to a small fraction such as 0.01 to
run waits at 100x speed. The scale does not apply to step retry delays.
import os
os.environ["DURABLE_EXECUTION_TIME_SCALE"] = "0.0"
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def after_wait(ctx: StepContext) -> str:
return "done"
@durable_execution
def handler(event, context: DurableContext) -> str:
context.wait(Duration.from_hours(24), name="cooling-off")
return context.step(after_wait(), name="after-wait")
def test_completes_with_long_wait():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
wait_ops = [op for op in result.operations if op.operation_type == OperationType.WAIT]
assert len(wait_ops) == 1
assert wait_ops[0].name == "cooling-off"
runUntilComplete() calls advanceTime() automatically, which immediately completes
STARTED waits without waiting for real time.
import static org.junit.jupiter.api.Assertions.*;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class LongWaitsTest {
@Test
void completesWithLongWait() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> {
context.wait("cooling-off", Duration.ofHours(24));
return context.step("after-wait", String.class, ctx -> "done");
}
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("done", result.getResult(String.class));
var waitOps = result.getOperations().stream()
.filter(op -> op.getType() == OperationType.WAIT)
.toList();
assertEquals(1, waitOps.size());
assertEquals("cooling-off", waitOps.get(0).getName());
}
}
Polling with waitForCondition¶
waitForCondition polls a check function until it signals done. The test runner drives
the polling loop the same way it drives retries.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";
interface PollState {
attempts: number;
done: boolean;
}
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
return await context.waitForCondition<PollState>(
"poll-job",
async (state) => {
const next = { attempts: state.attempts + 1, done: state.attempts >= 2 };
return next;
},
{
initialState: { attempts: 0, done: false },
waitStrategy: (state) =>
state.done
? { shouldContinue: false }
: { shouldContinue: true, delay: { seconds: 1 } },
},
);
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("polls until condition is met", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult<PollState>()?.done).toBe(true);
});
from dataclasses import dataclass
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext
from aws_durable_execution_sdk_python.waits import (
WaitForConditionConfig,
WaitForConditionDecision,
)
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@dataclass
class PollState:
attempts: int
done: bool
def check(state: PollState, ctx: WaitForConditionCheckContext) -> PollState:
return PollState(attempts=state.attempts + 1, done=state.attempts >= 2)
def wait_strategy(state: PollState, attempts_made: int) -> WaitForConditionDecision:
if state.done:
return WaitForConditionDecision.stop_polling()
return WaitForConditionDecision.continue_waiting(Duration.from_seconds(1))
@durable_execution
def handler(event, context: DurableContext) -> PollState:
return context.wait_for_condition(
check,
WaitForConditionConfig(
wait_strategy=wait_strategy,
initial_state=PollState(attempts=0, done=False),
),
name="poll-job",
)
def test_polls_until_condition_is_met():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=30)
assert result.status is InvocationStatus.SUCCEEDED
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.config.WaitForConditionConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.model.WaitForConditionResult;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class PollingTest {
record PollState(int attempts, boolean done) {}
@Test
void pollsUntilConditionIsMet() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> context.waitForCondition(
"poll-job",
PollState.class,
(state, ctx) -> {
var next = new PollState(state.attempts() + 1, state.attempts() >= 2);
return next.done()
? WaitForConditionResult.stopPolling(next)
: WaitForConditionResult.continuePolling(next);
},
WaitForConditionConfig.<PollState>builder()
.initialState(new PollState(0, false))
.build()
)
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertTrue(result.getResult(PollState.class).done());
}
}
See also¶
- Authoring Set up the test runner and write your first test.
- Assertions Inspect individual operations after a test run.
- Parallel
- Child Context
- Wait for Condition