Skip to content

Assertions

After run() completes, the test result contains the full operation history. You can look up operations by name, filter by type, or walk the history to assert on what ran and how it ran.

Assert on a step

Look up a step by name and check its status and result.

Use runner.getOperation(name) to get a handle to the operation, then call getStepDetails() to access the result.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner, OperationStatus } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  return await context.step("compute", () => 42);
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("asserts on a step operation", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);

  const step = runner.getOperation("compute");
  expect(step.getType()).toBe(OperationType.STEP);
  expect(step.getStatus()).toBe(OperationStatus.SUCCEEDED);
  expect(step.getStepDetails()?.result).toBe(42);
});

Use result.get_step(name) to get the StepOperation. The result attribute holds the raw serialized payload.

from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def compute(ctx: StepContext) -> int:
    return 42


@durable_execution
def handler(event, context: DurableContext) -> int:
    return context.step(compute())


def test_asserts_on_step_operation():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED

    step = result.get_step("compute")
    assert step.status is OperationStatus.SUCCEEDED
    assert step.result is not None

Use result.getOperation(name) to get the TestOperation, then call getStepResult(Class<T>) to deserialize the result.

import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class AssertStepTest {

    @Test
    void assertsOnStepOperation() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> context.step("compute", Integer.class, ctx -> 42)
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

        var step = result.getOperation("compute");
        assertNotNull(step);
        assertEquals(OperationType.STEP, step.getType());
        assertEquals(OperationStatus.SUCCEEDED, step.getStatus());
        assertEquals(42, step.getStepResult(Integer.class));
    }
}

Assert on a wait

Look up a wait operation and check that it was scheduled with the expected duration.

getWaitDetails() returns waitSeconds and scheduledEndTimestamp.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner, OperationStatus } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  await context.wait("my-wait", { seconds: 30 });
  return "done";
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("asserts on a wait operation", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);

  const wait = runner.getOperation("my-wait");
  expect(wait.getType()).toBe(OperationType.WAIT);
  expect(wait.getStatus()).toBe(OperationStatus.SUCCEEDED);
  expect(wait.getWaitDetails()?.waitSeconds).toBe(30);
  expect(wait.getWaitDetails()?.scheduledEndTimestamp).toBeInstanceOf(Date);
});

Use result.get_wait(name). The scheduled_end_timestamp attribute is a datetime when the wait was scheduled to end.

from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus, OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_execution
def handler(event, context: DurableContext) -> str:
    context.wait(Duration.from_seconds(30), name="my-wait")
    return "done"


def test_asserts_on_wait_operation():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED

    wait = result.get_wait("my-wait")
    assert wait.status is OperationStatus.SUCCEEDED
    assert wait.scheduled_end_timestamp is not None

getWaitDetails().scheduledEndTimestamp() returns the scheduled end time as an Instant.

import static org.junit.jupiter.api.Assertions.*;

import java.time.Duration;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class AssertWaitTest {

    @Test
    void assertsOnWaitOperation() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> {
                context.wait(Duration.ofSeconds(30), "my-wait");
                return "done";
            }
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

        var wait = result.getOperation("my-wait");
        assertNotNull(wait);
        assertEquals(OperationType.WAIT, wait.getType());
        assertEquals(OperationStatus.SUCCEEDED, wait.getStatus());
        assertNotNull(wait.getWaitDetails().scheduledEndTimestamp());
    }
}

Assert on a callback

For callbacks, the test drives the execution to the point where the callback is waiting, then completes it from the test.

Use waitForData(WaitingOperationStatus.SUBMITTED) to wait until the callback submitter has run, then call sendCallbackSuccess() to complete it.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner, WaitingOperationStatus } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  return await context.waitForCallback("approval", async (callbackId) => {
    // In production this would notify an external system
    void callbackId;
  });
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("completes a callback from the test", async () => {
  const runPromise = runner.run();

  const callback = runner.getOperation("approval");
  await callback.waitForData(WaitingOperationStatus.SUBMITTED);
  await callback.sendCallbackSuccess(JSON.stringify("approved"));

  const result = await runPromise;

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
  expect(result.getResult()).toBe("approved");
});

Use run_async() to start the execution, wait_for_callback() to get the callback ID, send_callback_success() to complete it, then wait_for_result() to get the final result. All calls must happen inside the with runner: block.

from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.types import WaitForCallbackContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_execution
def handler(event, context: DurableContext) -> str:
    def submit(callback_id: str, ctx: WaitForCallbackContext) -> None:
        pass  # In production this would notify an external system

    return context.wait_for_callback(submit, name="approval")


def test_completes_callback_from_test():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        execution_arn = runner.run_async(timeout=10)
        callback_id = runner.wait_for_callback(execution_arn=execution_arn, name="approval", timeout=10)
        runner.send_callback_success(callback_id=callback_id, result=b'"approved"')
        result = runner.wait_for_result(execution_arn=execution_arn, timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED

Use run() to reach the PENDING state, getCallbackId() to get the callback ID, completeCallback() to complete it, then run() again to finish the execution.

import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class AssertCallbackTest {

    @Test
    void completesCallbackFromTest() {
        var runner = LocalDurableTestRunner.create(
            String.class,
            (input, context) -> {
                var cb = context.createCallback("approval", String.class);
                return cb.get();
            }
        );

        // First invocation: creates the callback and suspends
        var pending = runner.run("test");
        assertEquals(ExecutionStatus.PENDING, pending.getStatus());

        var op = runner.getOperation("approval");
        assertNotNull(op);
        assertEquals(OperationType.CALLBACK, op.getType());
        assertEquals(OperationStatus.STARTED, op.getStatus());

        // Complete the callback from the test
        var callbackId = runner.getCallbackId("approval");
        runner.completeCallback(callbackId, "\"approved\"");

        // Second invocation: callback is complete, execution finishes
        var result = runner.run("test");
        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals("approved", result.getResult(String.class));
    }
}

Assert on a child context

Child contexts appear as CONTEXT operations in the result. You can walk their child operations to assert on what ran inside the context.

import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  return await context.runInChildContext("process", async (child) => {
    return await child.step("compute", () => 42);
  });
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment();
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("asserts on child context and its operations", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);

  const ctx = runner.getOperation("process");
  expect(ctx.getType()).toBe(OperationType.CONTEXT);
  expect(ctx.getContextDetails()?.result).toBe(42);

  const children = ctx.getChildOperations();
  expect(children?.length).toBe(1);
  expect(children?.[0].getName()).toBe("compute");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


@durable_step
def compute(ctx: StepContext) -> int:
    return 42


@durable_execution
def handler(event, context: DurableContext) -> int:
    def process(child: DurableContext) -> int:
        return child.step(compute())

    return context.run_in_child_context(process, name="process")


def test_asserts_on_child_context():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED

    ctx_ops = [op for op in result.operations if op.operation_type == OperationType.CONTEXT]
    assert len(ctx_ops) == 1
    assert ctx_ops[0].name == "process"
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class AssertChildContextTest {

    @Test
    void assertsOnChildContextAndItsOperations() {
        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> context.runInChildContext("process", Integer.class,
                child -> child.step("compute", Integer.class, ctx -> 42))
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
        assertEquals(42, result.getResult(Integer.class));

        var ctx = result.getOperation("process");
        assertNotNull(ctx);
        assertEquals(OperationType.CONTEXT, ctx.getType());
        assertNotNull(ctx.getContextDetails());
    }
}

Filter operations by status

When a step retries, the operation history contains one entry per attempt. Filter by status to count failures and successes separately.

Pass { status: OperationStatus.FAILED } to result.getOperations() to filter.

import { withDurableExecution, DurableContext, createRetryStrategy } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationStatus } from "@aws-sdk/client-lambda";

let callCount = 0;

const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
  return await context.step("flaky", () => {
    callCount++;
    if (callCount < 3) throw new Error("not yet");
    return "ok";
  }, { retryStrategy: createRetryStrategy({ maxAttempts: 3 }) });
});

let runner: LocalDurableTestRunner;

beforeAll(async () => {
  await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});

afterAll(async () => {
  await LocalDurableTestRunner.teardownTestEnvironment();
});

beforeEach(() => {
  callCount = 0;
  runner = new LocalDurableTestRunner({ handlerFunction: handler });
});

it("filters operations by status", async () => {
  const result = await runner.run();

  expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);

  const failedOps = result.getOperations({ status: OperationStatus.FAILED });
  expect(failedOps.length).toBe(2);

  const succeededOps = result.getOperations({ status: OperationStatus.SUCCEEDED });
  expect(succeededOps.length).toBe(1);
});

Use result.get_all_operations() to get a flat list including nested operations, then filter by op.status.

from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import create_retry_strategy, RetryStrategyConfig
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner

call_count = 0


@durable_step
def flaky(ctx: StepContext) -> str:
    global call_count
    call_count += 1
    if call_count < 3:
        raise RuntimeError("not yet")
    return "ok"


@durable_execution
def handler(event, context: DurableContext) -> str:
    config = StepConfig(
        retry_strategy=create_retry_strategy(RetryStrategyConfig(max_attempts=3))
    )
    return context.step(flaky(), config=config)


def test_filters_operations_by_status():
    global call_count
    call_count = 0

    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(timeout=30)

    assert result.status is InvocationStatus.SUCCEEDED

    failed_ops = [op for op in result.get_all_operations()
                  if op.status is OperationStatus.FAILED]
    assert len(failed_ops) == 2

    succeeded_ops = [op for op in result.get_all_operations()
                     if op.status is OperationStatus.SUCCEEDED]
    assert len(succeeded_ops) == 1

result.getFailedOperations() and result.getSucceededOperations() return pre-filtered lists.

import static org.junit.jupiter.api.Assertions.*;

import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.retry.RetryStrategies;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class FilterByStatusTest {

    @Test
    void filtersOperationsByStatus() {
        var callCount = new AtomicInteger(0);
        var config = StepConfig.builder()
            .retryStrategy(RetryStrategies.exponentialBackoff(3))
            .build();

        var runner = LocalDurableTestRunner.create(
            Void.class,
            (input, context) -> context.step("flaky", String.class, ctx -> {
                if (callCount.incrementAndGet() < 3) {
                    throw new RuntimeException("not yet");
                }
                return "ok";
            }, config)
        );

        var result = runner.runUntilComplete(null);

        assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

        var failedOps = result.getFailedOperations();
        assertEquals(2, failedOps.size());

        var succeededOps = result.getSucceededOperations();
        assertEquals(1, succeededOps.size());
    }
}

See also