Skip to content

Authoring

Install the testing SDK

npm install --save-dev @aws/durable-execution-sdk-js-testing
pip install aws-durable-execution-sdk-python-testing

Add the testing dependency to your build file with test scope:

<dependency>
  <groupId>software.amazon.lambda</groupId>
  <artifactId>aws-durable-execution-sdk-java-testing</artifactId>
  <scope>test</scope>
</dependency>

Write a minimal test

Create a runner with your handler, call run(), and assert on the result.

Call setupTestEnvironment() in beforeAll and teardownTestEnvironment() in afterAll. Create a new runner instance in beforeEach so each test starts with a clean state.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import {
  LocalDurableTestRunner,
} from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  const result = await context.step("greet", () => "hello");
  return result;
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("returns the expected result", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toBe("hello");
});

Use DurableFunctionTestRunner as a context manager. The context manager starts the scheduler thread on entry and stops it on exit.

from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def greet(ctx: StepContext) -> str:
    return "hello"


@durable_execution
def handler(event, context: DurableContext) -> str:
    return context.step(greet())


def test_returns_expected_result():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED
    assert result.result == '"hello"'

Use LocalDurableTestRunner.create() with the input type and a handler function. Call runUntilComplete() to drive the full replay loop until the execution completes or fails.

import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class MinimalTest {

    @Test
    void returnsExpectedResult() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> context.step("greet", String.class, ctx -> "hello")
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals("hello", result.getResult(String.class));
    }
}

Test a failed execution

When your handler throws outside a step, or a step exhausts all retries, the execution fails. Assert on the status and inspect the error.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: { fail: boolean }, context: DurableContext) => {
  if (event.fail) {
    throw new Error("intentional failure");
  }
  return "ok";
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("reports a failed execution", async () => {
  const result = await runner.run({ payload: { fail: true } });

  expect(result.getStatus()).toBe(ExecutionStatus.FAILED);
  expect(result.getError().errorMessage).toContain("intentional failure");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_execution
def handler(event: dict, context: DurableContext):
    if event.get("fail"):
        raise ValueError("intentional failure")
    return "ok"


def test_reports_failed_execution():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(input='{"fail": true}', timeout=10)

    assert result.status is InvocationStatus.FAILED
    assert result.error is not None
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class TestFailureTest {

    record Input(boolean fail) {}

    @Test
    void reportsFailedExecution() {
        var runner = LocalDurableTestRunner.create(
            Input.class,
            (input, context) -> {
                if (input.fail()) {
                    throw new RuntimeException("intentional failure");
                }
                return "ok";
            }
        );

        var result = runner.runUntilComplete(new Input(true));

        assertEquals(ExecutionStatus.FAILED, result.getStatus());
        assertTrue(result.getError().isPresent());
    }
}

Test retries

The test runner drives the full retry loop via replay. Configure a retry strategy on the step, and the runner re-invokes the handler as many times as needed.

import { withDurableExecution, DurableContext, createRetryStrategy } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";

let attempts = 0;

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  return await context.step("flaky", () => {
    attempts++;
    if (attempts < 3) {
      throw new Error("transient error");
    }
    return "done";
  }, {
    retryStrategy: createRetryStrategy({ maxAttempts: 3 }),
  });
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  attempts = 0;
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("retries and eventually succeeds", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toBe("done");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import create_retry_strategy, RetryStrategyConfig
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner

attempts = 0


@durable_step
def flaky(ctx: StepContext) -> str:
    global attempts
    attempts += 1
    if attempts < 3:
        raise RuntimeError("transient error")
    return "done"


@durable_execution
def handler(event, context: DurableContext) -> str:
    config = StepConfig(
        retry_strategy=create_retry_strategy(RetryStrategyConfig(max_attempts=3))
    )
    return context.step(flaky(), config=config)


def test_retries_and_eventually_succeeds():
    global attempts
    attempts = 0

    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=30)

    assert result.status is InvocationStatus.SUCCEEDED
    assert result.result == '"done"'
import static org.junit.jupiter.api.Assertions.*;

import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.retry.RetryStrategies;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class TestRetriesTest {

    @Test
    void retriesAndEventuallySucceeds() {
        var attempts = new AtomicInteger(0);

        var config = StepConfig.builder()
            .retryStrategy(RetryStrategies.exponentialBackoff(3))
            .build();

        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> context.step("flaky", String.class, ctx -> {
                if (attempts.incrementAndGet() < 3) {
                    throw new RuntimeException("transient error");
                }
                return "done";
            }, config)
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals("done", result.getResult(String.class));
    }
}

Skip time in tests

Retry backoffs and context.wait() durations use real time in production. The test runner collapses these delays so tests finish in milliseconds.

Pass { skipTime: true } to setupTestEnvironment(). The runner swaps its real timer for a queue that fires immediately. Both context.wait() delays and step retry delays complete in zero wall-clock time.

await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });

Set the DURABLE_EXECUTION_TIME_SCALE environment variable to a float that multiplies context.wait() durations. Set it to 0 for instant waits, or to a small fraction such as 0.01 to run waits at 100x speed. Step retry delays use the configured next_attempt_delay_seconds at real wall-clock time, and the scale does not apply to them. Keep retry delays short in tests, or configure a retry strategy with a low initial_delay_seconds.

DURABLE_EXECUTION_TIME_SCALE=0 pytest tests/my_wait_tests.py

runUntilComplete() calls advanceTime() after each invocation. advanceTime() immediately marks PENDING step retries as READY and completes STARTED waits without real-time sleeps. If you call run() directly, call advanceTime() yourself between invocations.

runner.runUntilComplete(input); // advanceTime() runs after each invocation

See Workflow patterns: Long waits for a worked example.

Test branching logic

Run the same handler with different inputs to cover each branch. Each test case gets its own runner instance.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: { premium: boolean }, context: DurableContext) => {
  if (event.premium) {
    return await context.step("premium-path", () => "premium");
  }
  return await context.step("standard-path", () => "standard");
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("takes the premium path", async () => {
  const result = await runner.run({ payload: { premium: true } });

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toBe("premium");
});

it("takes the standard path", async () => {
  const result = await runner.run({ payload: { premium: false } });

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toBe("standard");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def premium_path(ctx: StepContext) -> str:
    return "premium"


@durable_step
def standard_path(ctx: StepContext) -> str:
    return "standard"


@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    if event.get("premium"):
        return context.step(premium_path())
    return context.step(standard_path())


def test_takes_premium_path():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(input='{"premium": true}', timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED
    assert result.result == '"premium"'


def test_takes_standard_path():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(input='{"premium": false}', timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED
    assert result.result == '"standard"'
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class TestBranchingTest {

    record Input(boolean premium) {}

    private LocalDurableTestRunner<Input, String> createRunner() {
        return LocalDurableTestRunner.create(
            Input.class,
            (input, context) -> {
                if (input.premium()) {
                    return context.step("premium-path", String.class, ctx -> "premium");
                }
                return context.step("standard-path", String.class, ctx -> "standard");
            }
        );
    }

    @Test
    void takesPremiumPath() {
        var result = createRunner().runUntilComplete(new Input(true));

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals("premium", result.getResult(String.class));
    }

    @Test
    void takesStandardPath() {
        var result = createRunner().runUntilComplete(new Input(false));

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals("standard", result.getResult(String.class));
    }
}

See also

  • API Reference Full reference for the runner, result, and operation classes.
  • Workflow patterns Complete tests for common workflow shapes.
  • Assertions Inspect steps, waits, and callbacks after a test run.
  • Cloud Runner Run the same tests against a deployed Lambda function.
  • SAM CLI Local and remote invocation with SAM CLI.