

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 调用模型进行实时推理
<a name="realtime-endpoints-test-endpoints"></a>

使用 Amazon SageMaker AI 将模型部署到终端节点后，您可以通过向模型发送推理请求来与模型进行交互。要向模型发送推理请求，您需要调用承载该模型的端点。您可以使用 Amazon SageMaker Studio AWS SDKs、或调用您的终端节点 AWS CLI。

## 使用 Amazon SageMaker Studio 调用您的模型
<a name="realtime-endpoints-test-endpoints-studio"></a>

将模型部署到终端节点后，您可以通过 Amazon SageMaker Studio 查看终端节点，并通过发送单个推理请求来测试您的终端节点。

**注意**  
SageMaker AI 仅支持在 Studio 中对实时端点进行端点测试。

**向端点发送测试推理请求**

1. 启动 Amazon SageMaker Studio。

1. 在左侧导航窗格中，选择**部署**。

1. 从下拉菜单中选择**端点**。

1. 按名称查找端点，然后在表中选择名称。**端点**面板中列出的端点名称是在部署模型时定义的。Studio 工作区将在新选项卡中打开**端点**页面。

1. 选择**测试推理**选项卡。

1. 在**测试选项**中，选择以下选项之一：

   1. 选择**测试示例请求**，立即向端点发送请求。使用 **JSON 编辑器**提供 JSON 格式的示例数据，然后选择**发送请求**向端点提交请求。提交请求后，Studio 会在 JSON 编辑器右侧的卡片中显示推理输出。

   1. 选择**使用 Python SDK 示例代码**，查看向端点发送请求的代码。然后，从**推理请求示例**部分复制代码示例，并在测试环境中运行代码。

卡片顶部显示了发送到端点的请求类型（仅接受 JSON）。卡片中显示了以下字段：
+ **状态** – 显示以下状态类型之一：
  + `Success` – 请求成功。
  + `Failed` – 请求失败。响应显示在**失败原因**下方。
  + `Pending` – 当推理请求处于待处理状态时，状态会显示一个旋转的圆形图标。
+ **执行时长** – 调用耗费的时间（结束时间减去开始时间），以毫秒为单位。
+ **请求时间** – 自发送请求以来过去的分钟数。
+ **结果时间** – 自返回结果以来过去的分钟数。

## 使用调用您的模型 适用于 Python (Boto3) 的 AWS SDK
<a name="realtime-endpoints-test-endpoints-api"></a>

如果要在应用程序代码中调用模型端点，则可以使用其中之一 AWS SDKs，包括 适用于 Python (Boto3) 的 AWS SDK。使用该 SDK 调用端点时，您需要使用以下 Python 方法之一：
+ `invoke_endpoint`：向模型端点发送推理请求，并返回模型生成的响应。

  在模型完成生成推理负载后，此方法将其作为一个响应返回。有关更多信息，请参阅《AWS SDK for Python (Boto3) API 参考》中的 [invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html)**。
+ `invoke_endpoint_with_response_stream`：向模型端点发送推理请求，并在模型生成响应时以增量方式流式传输响应。

  使用这种方法，您的应用程序会在响应部分可用时立即收到这些部分。有关更多信息，请参阅《AWS SDK for Python (Boto3) API 参考》中的 [invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html)**。

  此方法仅用于调用支持推理流的模型。

在应用程序代码中使用这些方法之前，必须初始化 A SageMaker I Runtime 客户端，并且必须指定终端节点的名称。以下示例为接下来的示例设置了客户端和端点：

```
import boto3

sagemaker_runtime = boto3.client(
    "sagemaker-runtime", region_name='aws_region')

endpoint_name='endpoint-name'
```

### 调用以获取推理响应
<a name="test-invoke-endpoint"></a>

以下示例使用 `invoke_endpoint` 方法，通过 适用于 Python (Boto3) 的 AWS SDK调用端点：

```
# Gets inference from the model hosted at the specified endpoint:
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name, 
    Body=bytes('{"features": ["This is great!"]}', 'utf-8')
    )

# Decodes and prints the response body:
print(response['Body'].read().decode('utf-8'))
```

此示例提供`Body`字段中的输入数据， SageMaker 让 AI 传递给模型。此数据的格式必须与用于训练的数据格式相同。示例将响应赋值给 `response` 变量。

`response` 变量提供了对 HTTP 状态、已部署模型的名称以及其他字段的访问。以下代码段将打印 HTTP 状态代码：

```
print(response["HTTPStatusCode"])
```

### 调用以流式处理推理响应
<a name="test-invoke-endpoint-with-response-stream"></a>

如果您部署了支持推理流的模型，则可调用该模型以流的形式接收其推理负载部分。模型在生成推理响应时，会以增量方式交付这些部分。应用程序在接收推理流时，无需等待模型生成整个响应负载。取而代之的是，当响应的部分内容可用时，应用程序会立即收到。

通过在应用程序中使用推理流，您可以创建交互，在交互中用户会认为推理速度很快，因为他们能立即获得第一部分。您可以实施流式处理以支持快速的交互式体验，例如聊天机器人、虚拟助手和音乐生成器。例如，您可以创建一个聊天机器人，以增量方式显示大型语言模型 (LLM) 生成的文本。

要获取推理流，您可以使用 `invoke_endpoint_with_response_stream` 方法。在响应正文中，SDK 提供了 `EventStream` 对象，该对象以一系列 `PayloadPart` 对象的形式给出推理。

**Example 推理流**  
以下示例是 `PayloadPart` 对象流：  

```
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}}
. . .
```
在每个负载部分中，`Bytes` 字段提供模型推理响应的一部分。此部分可以是模型生成的任何内容类型，如文本、图像或音频数据。在此示例中，这些部分是 JSON 对象，其中包含 LLM 生成的文本。  
通常，负载部分包含来自模型的离散数据块。在本示例中，离散块是整个 JSON 对象。有时，流媒式响应会将数据块分成多个负载部分，或者将多个数据块组合成一个负载部分。以下示例显示了一个 JSON 格式的数据块，该数据块分为两个负载部分：  

```
{'PayloadPart': {'Bytes': b'{"outputs": '}}
{'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
```
在编写处理推理流的应用程序代码时，应包括处理这些偶尔的数据拆分和组合的逻辑。作为一种策略，您可以编写代码，在应用程序接收负载部分的同时，连接 `Bytes` 的内容。通过连接此处的示例 JSON 数据，可以将这些数据组合成一个以换行符分隔的 JSON 正文。然后，您的代码可以通过解析每行上的整个 JSON 对象来处理流。  
以下示例显示了您在连接 `Bytes` 的以下示例内容时，创建的以换行符分隔的 JSON：  

```
{"outputs": [" a"]}
{"outputs": [" challenging"]}
{"outputs": [" problem"]}
. . .
```

**Example 用于处理推理流的代码**  

以下示例 Python 类 `SmrInferenceStream` 演示了如何处理以 JSON 格式发送文本数据的推理流：

```
import io
import json

# Example class that processes an inference stream:
class SmrInferenceStream:
    
    def __init__(self, sagemaker_runtime, endpoint_name):
        self.sagemaker_runtime = sagemaker_runtime
        self.endpoint_name = endpoint_name
        # A buffered I/O stream to combine the payload parts:
        self.buff = io.BytesIO() 
        self.read_pos = 0
        
    def stream_inference(self, request_body):
        # Gets a streaming inference response 
        # from the specified model endpoint:
        response = self.sagemaker_runtime\
            .invoke_endpoint_with_response_stream(
                EndpointName=self.endpoint_name, 
                Body=json.dumps(request_body), 
                ContentType="application/json"
        )
        # Gets the EventStream object returned by the SDK:
        event_stream = response['Body']
        for event in event_stream:
            # Passes the contents of each payload part
            # to be concatenated:
            self._write(event['PayloadPart']['Bytes'])
            # Iterates over lines to parse whole JSON objects:
            for line in self._readlines():
                resp = json.loads(line)
                part = resp.get("outputs")[0]
                # Returns parts incrementally:
                yield part
    
    # Writes to the buffer to concatenate the contents of the parts:
    def _write(self, content):
        self.buff.seek(0, io.SEEK_END)
        self.buff.write(content)

    # The JSON objects in buffer end with '\n'.
    # This method reads lines to yield a series of JSON objects:
    def _readlines(self):
        self.buff.seek(self.read_pos)
        for line in self.buff.readlines():
            self.read_pos += len(line)
            yield line[:-1]
```

此示例通过执行以下操作处理推理流：
+ 初始化 A SageMaker I 运行时客户端并设置模型端点的名称。在获得推理流之前，端点托管的模型必须支持推理流。
+ 在示例 `stream_inference` 方法中，接收请求正文并将其传递给 SDK 的 `invoke_endpoint_with_response_stream` 方法。
+ 遍历 SDK 返回的 `EventStream` 对象中的每个事件。
+ 从每个事件中获取 `PayloadPart` 对象中 `Bytes` 对象的内容。
+ 在示例 `_write` 方法中，写入缓冲区以连接 `Bytes` 对象的内容。组合后的内容构成以换行符分隔的 JSON 正文。
+ 使用示例 `_readlines` 方法获取一系列可迭代的 JSON 对象。
+ 在每个 JSON 对象中，获取推理的一部分。
+ 使用 `yield` 表达式，以增量方式返回这些部分。

以下示例创建并使用了 `SmrInferenceStream` 对象：

```
request_body = {"inputs": ["Large model inference is"],
                "parameters": {"max_new_tokens": 100,
                               "enable_sampling": "true"}}
smr_inference_stream = SmrInferenceStream(
    sagemaker_runtime, endpoint_name)
stream = smr_inference_stream.stream_inference(request_body)
for part in stream:
    print(part, end='')
```

此示例将请求正文传递给 `stream_inference` 方法。该方法将遍历响应，以打印推理流返回的每个部分。

此示例假设指定端点处的模型是生成文本的 LLM。此示例的输出是生成的文本正文，文本以增量方式打印：

```
a challenging problem in machine learning. The goal is to . . .
```

## 使用调用您的模型 AWS CLI
<a name="realtime-endpoints-test-endpoints-cli"></a>

您可以通过使用 AWS Command Line Interface (AWS CLI) 运行命令来调用您的模型端点。 AWS CLI 支持使用 `invoke-endpoint` 命令发送标准推理请求，并支持使用 `invoke-endpoint-async` 命令发送异步推理请求。

**注意**  
 AWS CLI 不支持流式推理请求。

以下示例使用 `invoke-endpoint` 命令，向模型端点发送推理请求：

```
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name endpoint_name \
    --body fileb://$file_name \
    output_file.txt
```

对于 `--endpoint-name` 参数，请提供创建端点时指定的端点名称。对于`--body`参数，提供 SageMaker AI 要传递给模型的输入数据。数据的格式必须与用于训练的数据格式相同。此示例显示了如何向端点发送二进制数据。

有关在将文件内容传递给的参数`fileb://`时何时使用 `file://` over 的更多信息 AWS CLI，请参阅[本地文件参数的最佳实践](https://aws.amazon.com/blogs/developer/best-practices-for-local-file-parameters/)。

有关更多信息以及可以传递的其他参数，请参阅《AWS CLI 命令参考》中的 [https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html](https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html)**。

如果 `invoke-endpoint` 命令成功，则将返回如下所示的响应：

```
{
    "ContentType": "<content_type>; charset=utf-8",
    "InvokedProductionVariant": "<Variant>"
}
```

如果命令不成功，请检查输入负载的格式是否正确。

可通过检查文件输出文件（在此例中为 `output_file.txt`），查看调用的输出。

```
more output_file.txt
```

## 使用适用于 Python 的 AWS 软件开发工具包调用您的模型
<a name="realtime-endpoints-test-endpoints-sdk"></a>

### 调用以双向流式传输推理请求和响应
<a name="realtime-endpoints-test-endpoints-sdk-overview"></a>

如果您想在应用程序代码中调用模型端点以支持双向流式传输，则可以使用[新的实验性 Python SDK](https://github.com/awslabs/aws-sdk-python)，该软件开发工具包支持双向流传输功能并支持 HTTP/2。此 SDK 支持您的客户端应用程序和 SageMaker 终端节点之间的实时、双向通信，使您能够以增量方式发送推理请求，同时在模型生成流响应时接收流式响应。这对于交互式应用程序特别有用，在这些应用程序中，客户端和服务器都需要通过持久连接持续交换数据。

**注意**  
新的实验性 SDK 不同于标准 Boto3 SDK，它支持用于数据交换的永久双向连接。在使用实验性 Python SDK 时，对于任何非实验性用例，我们强烈建议严格固定到某个 SDK 版本。

要通过双向流媒体调用您的终端节点，请使用`invoke_endpoint_with_bidirectional_stream`方法。此方法可建立持久连接，允许您将多个有效载荷区块流式传输到模型，同时在模型处理数据时实时接收响应。在您明确关闭输入流或端点关闭连接之前，连接将保持打开状态，最多支持 30 分钟的连接时间。

### 先决条件
<a name="realtime-endpoints-test-endpoints-sdk-prereq"></a>

在应用程序代码中使用双向流媒体之前，您必须：

1. 安装实验 SageMaker 运行时 HTTP/2 软件开发工具包

1. 为您的 SageMaker Runtime 客户端设置 AWS 凭据

1. 部署支持向端点进行双向流式传输的 SageMaker 模型

### 设置双向流媒体客户端
<a name="realtime-endpoints-test-endpoints-sdk-setup-client"></a>

以下示例显示如何初始化双向流式传输所需的组件：

```
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

# Configuration
AWS_REGION = "us-west-2"
BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443"
ENDPOINT_NAME = "your-endpoint-name"

# Initialize the client configuration
config = Config(
    endpoint_uri=BIDI_ENDPOINT,
    region=AWS_REGION,
    aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
    auth_scheme_resolver=HTTPAuthSchemeResolver(),
    auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
)

# Create the SageMaker Runtime HTTP/2 client
client = SageMakerRuntimeHTTP2Client(config=config)
```

### 完整的双向流媒体客户端
<a name="realtime-endpoints-test-endpoints-sdk-complete-client"></a>

以下示例演示如何创建双向流式传输客户端，该客户端将多个文本负载发送到 SageMaker 端点并实时处理响应：

```
import asyncio
import logging
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from sagemaker_runtime_http2.models import (
    InvokeEndpointWithBidirectionalStreamInput, 
    RequestStreamEventPayloadPart, 
    RequestPayloadPart
)
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SageMakerBidirectionalClient:
    
    def __init__(self, endpoint_name, region="us-west-2"):
        self.endpoint_name = endpoint_name
        self.region = region
        self.client = None
        self.stream = None
        self.response_task = None
        self.is_active = False
        
    def _initialize_client(self):
        bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443"
        config = Config(
            endpoint_uri=bidi_endpoint,
            region=self.region,
            aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
            auth_scheme_resolver=HTTPAuthSchemeResolver(),
            auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
        )
        self.client = SageMakerRuntimeHTTP2Client(config=config)
    
    async def start_session(self):
        """Establish a bidirectional streaming connection with the endpoint."""
        if not self.client:
            self._initialize_client()
            
        logger.info(f"Starting session with endpoint: {self.endpoint_name}")
        self.stream = await self.client.invoke_endpoint_with_bidirectional_stream(
            InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name)
        )
        self.is_active = True
        
        # Start processing responses concurrently
        self.response_task = asyncio.create_task(self._process_responses())
    
    async def send_message(self, message):
        """Send a single message to the endpoint."""
        if not self.is_active:
            raise RuntimeError("Session not active. Call start_session() first.")
            
        logger.info(f"Sending message: {message}")
        payload = RequestPayloadPart(bytes_=message.encode('utf-8'))
        event = RequestStreamEventPayloadPart(value=payload)
        await self.stream.input_stream.send(event)
    
    async def send_multiple_messages(self, messages, delay=1.0):
        """Send multiple messages with a delay between each."""
        for message in messages:
            await self.send_message(message)
            await asyncio.sleep(delay)
    
    async def end_session(self):
        """Close the bidirectional streaming connection."""
        if not self.is_active:
            return
            
        await self.stream.input_stream.close()
        self.is_active = False
        logger.info("Stream closed")
        
        # Cancel the response processing task
        if self.response_task and not self.response_task.done():
            self.response_task.cancel()
    
    async def _process_responses(self):
        """Process incoming responses from the endpoint."""
        try:
            output = await self.stream.await_output()
            output_stream = output[1]
            
            while self.is_active:
                result = await output_stream.receive()
                
                if result is None:
                    logger.info("No more responses")
                    break
                
                if result.value and result.value.bytes_:
                    response_data = result.value.bytes_.decode('utf-8')
                    logger.info(f"Received: {response_data}")
                    
        except Exception as e:
            logger.error(f"Error processing responses: {e}")

# Example usage
async def run_bidirectional_client():
    client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name")
    
    try:
        # Start the session
        await client.start_session()
        
        # Send multiple messages
        messages = [
            "I need help with", 
            "my account balance", 
            "I can help with that", 
            "and recent charges"
        ]
        await client.send_multiple_messages(messages)
        
        # Wait for responses to be processed
        await asyncio.sleep(2)
        
        # End the session
        await client.end_session()
        logger.info("Session ended successfully")
        
    except Exception as e:
        logger.error(f"Client error: {e}")
        await client.end_session()

if __name__ == "__main__":
    asyncio.run(run_bidirectional_client())
```

客户端使用端口 8443 上的区域端点 URI 初始化 SageMaker Runtime HTTP/2 客户端，这是双向流媒体连接所必需的。start\$1 `session()` 方法调用`invoke_endpoint_with_bidirectional_stream()`以建立持久连接，并创建一个异步任务来同时处理传入的响应。

该`send_event()`方法将负载数据封装在相应的请求对象中，并通过输入流发送它们，而该`_process_responses()`方法则在端点到达时持续监听和处理来自端点的响应。这种双向方法可以实现实时交互，在这种交互中，发送请求和接收响应都通过同一个连接同时发生。