Assertions¶
After run() completes, the test result contains the full operation history. You can
look up operations by name, filter by type, or walk the history to assert on what ran
and how it ran.
Assert on a step¶
Look up a step by name and check its status and result.
Use runner.getOperation(name) to get a handle to the operation, then call
getStepDetails() to access the result.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner, OperationStatus } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
return await context.step("compute", () => 42);
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("asserts on a step operation", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
const step = runner.getOperation("compute");
expect(step.getType()).toBe(OperationType.STEP);
expect(step.getStatus()).toBe(OperationStatus.SUCCEEDED);
expect(step.getStepDetails()?.result).toBe(42);
});
Use result.get_step(name) to get the StepOperation. The result attribute holds the
raw serialized payload.
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def compute(ctx: StepContext) -> int:
return 42
@durable_execution
def handler(event, context: DurableContext) -> int:
return context.step(compute())
def test_asserts_on_step_operation():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
step = result.get_step("compute")
assert step.status is OperationStatus.SUCCEEDED
assert step.result is not None
Use result.getOperation(name) to get the TestOperation, then call
getStepResult(Class<T>) to deserialize the result.
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class AssertStepTest {
@Test
void assertsOnStepOperation() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> context.step("compute", Integer.class, ctx -> 42)
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
var step = result.getOperation("compute");
assertNotNull(step);
assertEquals(OperationType.STEP, step.getType());
assertEquals(OperationStatus.SUCCEEDED, step.getStatus());
assertEquals(42, step.getStepResult(Integer.class));
}
}
Assert on a wait¶
Look up a wait operation and check that it was scheduled with the expected duration.
getWaitDetails() returns waitSeconds and scheduledEndTimestamp.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner, OperationStatus } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
await context.wait("my-wait", { seconds: 30 });
return "done";
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("asserts on a wait operation", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
const wait = runner.getOperation("my-wait");
expect(wait.getType()).toBe(OperationType.WAIT);
expect(wait.getStatus()).toBe(OperationStatus.SUCCEEDED);
expect(wait.getWaitDetails()?.waitSeconds).toBe(30);
expect(wait.getWaitDetails()?.scheduledEndTimestamp).toBeInstanceOf(Date);
});
Use result.get_wait(name). The scheduled_end_timestamp attribute is a datetime
when the wait was scheduled to end.
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus, OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_execution
def handler(event, context: DurableContext) -> str:
context.wait(Duration.from_seconds(30), name="my-wait")
return "done"
def test_asserts_on_wait_operation():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
wait = result.get_wait("my-wait")
assert wait.status is OperationStatus.SUCCEEDED
assert wait.scheduled_end_timestamp is not None
getWaitDetails().scheduledEndTimestamp() returns the scheduled end time as an
Instant.
import static org.junit.jupiter.api.Assertions.*;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class AssertWaitTest {
@Test
void assertsOnWaitOperation() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> {
context.wait(Duration.ofSeconds(30), "my-wait");
return "done";
}
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
var wait = result.getOperation("my-wait");
assertNotNull(wait);
assertEquals(OperationType.WAIT, wait.getType());
assertEquals(OperationStatus.SUCCEEDED, wait.getStatus());
assertNotNull(wait.getWaitDetails().scheduledEndTimestamp());
}
}
Assert on a callback¶
For callbacks, the test drives the execution to the point where the callback is waiting, then completes it from the test.
Use waitForData(WaitingOperationStatus.SUBMITTED) to wait until the callback submitter
has run, then call sendCallbackSuccess() to complete it.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner, WaitingOperationStatus } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
return await context.waitForCallback("approval", async (callbackId) => {
// In production this would notify an external system
void callbackId;
});
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("completes a callback from the test", async () => {
const runPromise = runner.run();
const callback = runner.getOperation("approval");
await callback.waitForData(WaitingOperationStatus.SUBMITTED);
await callback.sendCallbackSuccess(JSON.stringify("approved"));
const result = await runPromise;
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
expect(result.getResult()).toBe("approved");
});
Use run_async() to start the execution, wait_for_callback() to get the callback ID,
send_callback_success() to complete it, then wait_for_result() to get the final
result. All calls must happen inside the with runner: block.
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.types import WaitForCallbackContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_execution
def handler(event, context: DurableContext) -> str:
def submit(callback_id: str, ctx: WaitForCallbackContext) -> None:
pass # In production this would notify an external system
return context.wait_for_callback(submit, name="approval")
def test_completes_callback_from_test():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
execution_arn = runner.run_async(timeout=10)
callback_id = runner.wait_for_callback(execution_arn=execution_arn, name="approval", timeout=10)
runner.send_callback_success(callback_id=callback_id, result=b'"approved"')
result = runner.wait_for_result(execution_arn=execution_arn, timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
Use run() to reach the PENDING state, getCallbackId() to get the callback ID,
completeCallback() to complete it, then run() again to finish the execution.
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class AssertCallbackTest {
@Test
void completesCallbackFromTest() {
var runner = LocalDurableTestRunner.create(
String.class,
(input, context) -> {
var cb = context.createCallback("approval", String.class);
return cb.get();
}
);
// First invocation: creates the callback and suspends
var pending = runner.run("test");
assertEquals(ExecutionStatus.PENDING, pending.getStatus());
var op = runner.getOperation("approval");
assertNotNull(op);
assertEquals(OperationType.CALLBACK, op.getType());
assertEquals(OperationStatus.STARTED, op.getStatus());
// Complete the callback from the test
var callbackId = runner.getCallbackId("approval");
runner.completeCallback(callbackId, "\"approved\"");
// Second invocation: callback is complete, execution finishes
var result = runner.run("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("approved", result.getResult(String.class));
}
}
Assert on a child context¶
Child contexts appear as CONTEXT operations in the result. You can walk their child
operations to assert on what ran inside the context.
import { withDurableExecution, DurableContext } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationType } from "@aws-sdk/client-lambda";
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
return await context.runInChildContext("process", async (child) => {
return await child.step("compute", () => 42);
});
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment();
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("asserts on child context and its operations", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
const ctx = runner.getOperation("process");
expect(ctx.getType()).toBe(OperationType.CONTEXT);
expect(ctx.getContextDetails()?.result).toBe(42);
const children = ctx.getChildOperations();
expect(children?.length).toBe(1);
expect(children?.[0].getName()).toBe("compute");
});
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
@durable_step
def compute(ctx: StepContext) -> int:
return 42
@durable_execution
def handler(event, context: DurableContext) -> int:
def process(child: DurableContext) -> int:
return child.step(compute())
return context.run_in_child_context(process, name="process")
def test_asserts_on_child_context():
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
ctx_ops = [op for op in result.operations if op.operation_type == OperationType.CONTEXT]
assert len(ctx_ops) == 1
assert ctx_ops[0].name == "process"
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class AssertChildContextTest {
@Test
void assertsOnChildContextAndItsOperations() {
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> context.runInChildContext("process", Integer.class,
child -> child.step("compute", Integer.class, ctx -> 42))
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(42, result.getResult(Integer.class));
var ctx = result.getOperation("process");
assertNotNull(ctx);
assertEquals(OperationType.CONTEXT, ctx.getType());
assertNotNull(ctx.getContextDetails());
}
}
Filter operations by status¶
When a step retries, the operation history contains one entry per attempt. Filter by status to count failures and successes separately.
Pass { status: OperationStatus.FAILED } to result.getOperations() to filter.
import { withDurableExecution, DurableContext, createRetryStrategy } from "@aws/durable-execution-sdk-js";
import { LocalDurableTestRunner } from "@aws/durable-execution-sdk-js-testing";
import { ExecutionStatus, OperationStatus } from "@aws-sdk/client-lambda";
let callCount = 0;
const handler = withDurableExecution(async (event: unknown, context: DurableContext) => {
return await context.step("flaky", () => {
callCount++;
if (callCount < 3) throw new Error("not yet");
return "ok";
}, { retryStrategy: createRetryStrategy({ maxAttempts: 3 }) });
});
let runner: LocalDurableTestRunner;
beforeAll(async () => {
await LocalDurableTestRunner.setupTestEnvironment({ skipTime: true });
});
afterAll(async () => {
await LocalDurableTestRunner.teardownTestEnvironment();
});
beforeEach(() => {
callCount = 0;
runner = new LocalDurableTestRunner({ handlerFunction: handler });
});
it("filters operations by status", async () => {
const result = await runner.run();
expect(result.getStatus()).toBe(ExecutionStatus.SUCCEEDED);
const failedOps = result.getOperations({ status: OperationStatus.FAILED });
expect(failedOps.length).toBe(2);
const succeededOps = result.getOperations({ status: OperationStatus.SUCCEEDED });
expect(succeededOps.length).toBe(1);
});
Use result.get_all_operations() to get a flat list including nested operations, then
filter by op.status.
from aws_durable_execution_sdk_python import DurableContext, durable_execution, durable_step
from aws_durable_execution_sdk_python.types import StepContext
from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import create_retry_strategy, RetryStrategyConfig
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
call_count = 0
@durable_step
def flaky(ctx: StepContext) -> str:
global call_count
call_count += 1
if call_count < 3:
raise RuntimeError("not yet")
return "ok"
@durable_execution
def handler(event, context: DurableContext) -> str:
config = StepConfig(
retry_strategy=create_retry_strategy(RetryStrategyConfig(max_attempts=3))
)
return context.step(flaky(), config=config)
def test_filters_operations_by_status():
global call_count
call_count = 0
runner = DurableFunctionTestRunner(handler=handler)
with runner:
result = runner.run(timeout=30)
assert result.status is InvocationStatus.SUCCEEDED
failed_ops = [op for op in result.get_all_operations()
if op.status is OperationStatus.FAILED]
assert len(failed_ops) == 2
succeeded_ops = [op for op in result.get_all_operations()
if op.status is OperationStatus.SUCCEEDED]
assert len(succeeded_ops) == 1
result.getFailedOperations() and result.getSucceededOperations() return pre-filtered
lists.
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.retry.RetryStrategies;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
class FilterByStatusTest {
@Test
void filtersOperationsByStatus() {
var callCount = new AtomicInteger(0);
var config = StepConfig.builder()
.retryStrategy(RetryStrategies.exponentialBackoff(3))
.build();
var runner = LocalDurableTestRunner.create(
Void.class,
(input, context) -> context.step("flaky", String.class, ctx -> {
if (callCount.incrementAndGet() < 3) {
throw new RuntimeException("not yet");
}
return "ok";
}, config)
);
var result = runner.runUntilComplete(null);
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
var failedOps = result.getFailedOperations();
assertEquals(2, failedOps.size());
var succeededOps = result.getSucceededOperations();
assertEquals(1, succeededOps.size());
}
}
See also¶
- Authoring Set up the test runner and write your first test.
- Cloud Runner Run tests against a deployed Lambda function.
- Step
- Wait