Authoring¶
Install the testing SDK¶
Write a minimal test¶
Create a runner with your handler, call run(), and assert on the result.
Call setupTestEnvironment() in beforeAll and teardownTestEnvironment() in
afterAll. Create a new runner instance in beforeEach so each test starts with a
clean state.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import {
LocalDurableTestRunner,
} from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
const result = await context.step("greet", () => "hello");
return result;
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("returns the expected result", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toBe("hello");
});
Use DurableFunctionTestRunner as a context manager. The context manager starts the
scheduler thread on entry and stops it on exit.
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def greet(ctx: StepContext) -> str:
return "hello"
@durable_execution
def handler(event, context: DurableContext) -> str:
return context.step(greet())
def test_returns_expected_result():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
assert result.result == '"hello"'
Use LocalDurableTestRunner.create() with the input type and a handler function. Call
runUntilComplete() to drive the full replay loop until the execution completes or
fails.
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class MinimalTest {
@Test
void returnsExpectedResult() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> context.step("greet", String.class, ctx -> "hello")
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("hello", result.getResult(String.class));
}
}
Test a failed execution¶
When your handler throws outside a step, or a step exhausts all retries, the execution fails. Assert on the status and inspect the error.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: { fail: boolean }, context: DurableContext) => {
if (event.fail) {
throw new Error("intentional failure");
}
return "ok";
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("reports a failed execution", async () => {
const result = await runner.run({ payload: { fail: true } });
expect(result.getStatus()).toBe(ExecutionStatus.FAILED);
expect(result.getError().errorMessage).toContain("intentional failure");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_execution
def handler(event: dict, context: DurableContext):
if event.get("fail"):
raise ValueError("intentional failure")
return "ok"
def test_reports_failed_execution():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(input='{"fail": true}', timeout=10)
assert result.status is InvocationStatus.FAILED
assert result.error is not None
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class TestFailureTest {
record Input(boolean fail) {}
@Test
void reportsFailedExecution() {
var runner = LocalDurableTestRunner.create(
Input.class,
(input, context) -> {
if (input.fail()) {
throw new RuntimeException("intentional failure");
}
return "ok";
}
);
var result = runner.runUntilComplete(new Input(true));
assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertTrue(result.getError().isPresent());
}
}
Test retries¶
The test runner drives the full retry loop via replay. Configure a retry strategy on the step, and the runner re-invokes the handler as many times as needed.
import { withDurableExecution, DurableContext, createRetryStrategy } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";
let attempts = 0;
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
return await context.step("flaky", () => {
attempts++;
if (attempts < 3) {
throw new Error("transient error");
}
return "done";
}, {
retryStrategy: createRetryStrategy({ maxAttempts: 3 }),
});
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
attempts = 0;
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("retries and eventually succeeds", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toBe("done");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import create_retry_strategy, RetryStrategyConfig
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
attempts = 0
@durable_step
def flaky(ctx: StepContext) -> str:
global attempts
attempts += 1
if attempts < 3:
raise RuntimeError("transient error")
return "done"
@durable_execution
def handler(event, context: DurableContext) -> str:
config = StepConfig(
retry_strategy=create_retry_strategy(RetryStrategyConfig(max_attempts=3))
)
return context.step(flaky(), config=config)
def test_retries_and_eventually_succeeds():
global attempts
attempts = 0
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=30)
assert result.status is InvocationStatus.SUCCEEDED
assert result.result == '"done"'
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.retry.RetryStrategies;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class TestRetriesTest {
@Test
void retriesAndEventuallySucceeds() {
var attempts = new AtomicInteger(0);
var config = StepConfig.builder()
.retryStrategy(RetryStrategies.exponentialBackoff(3))
.build();
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> context.step("flaky", String.class, ctx -> {
if (attempts.incrementAndGet() < 3) {
throw new RuntimeException("transient error");
}
return "done";
}, config)
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("done", result.getResult(String.class));
}
}
Skip time in tests¶
Retry backoffs and context.wait() durations use real time in production. The test
runner collapses these delays so tests finish in milliseconds.
Pass { skipTime: true } to setupTestEnvironment(). The runner swaps its real timer
for a queue that fires immediately. Both context.wait() delays and step retry delays
complete in zero wall-clock time.
Set the DURABLE_EXECUTION_TIME_SCALE environment variable to a float that multiplies
context.wait() durations. Set it to 0 for instant waits, or to a small fraction such
as 0.01 to run waits at 100x speed. Step retry delays use the configured
next_attempt_delay_seconds at real wall-clock time, and the scale does not apply to
them. Keep retry delays short in tests, or configure a retry strategy with a low
initial_delay_seconds.
runUntilComplete() calls advanceTime() after each invocation. advanceTime()
immediately marks PENDING step retries as READY and completes STARTED waits without
real-time sleeps. If you call run() directly, call advanceTime() yourself between
invocations.
See Workflow patterns: Long waits for a worked example.
Test branching logic¶
Run the same handler with different inputs to cover each branch. Each test case gets its own runner instance.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: { premium: boolean }, context: DurableContext) => {
if (event.premium) {
return await context.step("premium-path", () => "premium");
}
return await context.step("standard-path", () => "standard");
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("takes the premium path", async () => {
const result = await runner.run({ payload: { premium: true } });
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toBe("premium");
});
it("takes the standard path", async () => {
const result = await runner.run({ payload: { premium: false } });
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toBe("standard");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def premium_path(ctx: StepContext) -> str:
return "premium"
@durable_step
def standard_path(ctx: StepContext) -> str:
return "standard"
@durable_execution
def handler(event: dict, context: DurableContext) -> str:
if event.get("premium"):
return context.step(premium_path())
return context.step(standard_path())
def test_takes_premium_path():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(input='{"premium": true}', timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
assert result.result == '"premium"'
def test_takes_standard_path():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(input='{"premium": false}', timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
assert result.result == '"standard"'
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class TestBranchingTest {
record Input(boolean premium) {}
private LocalDurableTestRunner<Input, String> createRunner() {
return LocalDurableTestRunner.create(
Input.class,
(input, context) -> {
if (input.premium()) {
return context.step("premium-path", String.class, ctx -> "premium");
}
return context.step("standard-path", String.class, ctx -> "standard");
}
);
}
@Test
void takesPremiumPath() {
var result = createRunner().runUntilComplete(new Input(true));
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("premium", result.getResult(String.class));
}
@Test
void takesStandardPath() {
var result = createRunner().runUntilComplete(new Input(false));
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("standard", result.getResult(String.class));
}
}
See also¶
- API Reference Full reference for the runner, result, and operation classes.
- Workflow patterns Complete tests for common workflow shapes.
- Assertions Inspect steps, waits, and callbacks after a test run.
- Cloud Runner Run the same tests against a deployed Lambda function.
- SAM CLI Local and remote invocation with SAM CLI.