

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 調用模型以進行即時推論
<a name="realtime-endpoints-test-endpoints"></a>

使用 Amazon SageMaker AI 將模型部署到端點後，您可以傳送推論請求來與模型互動。若要將推論請求傳送至模型，您可以調用負責託管該模型的端點。您可以使用 Amazon SageMaker Studio、 AWS SDK 或 AWS CLI來調用端點。

## 使用 Amazon SageMaker Studio 調用您的模型
<a name="realtime-endpoints-test-endpoints-studio"></a>

將模型部署至端點後，您可以透過 Amazon SageMaker Studio 檢視端點，並傳送單一推論請求來測試端點。

**注意**  
SageMaker AI 僅支援針對即時端點在 Studio 進行端點測試。

**將測試推論請求傳送至您的端點**

1. 啟動 Amazon SageMaker Studio。

1. 在左側導覽窗格中選擇**部署**。

1. 從下拉式清單中，選擇**端點**。

1. 依名稱尋找您的端點，然後在資料表中選擇名稱。**端點**面板中列出的端點名稱是在部署模型時所定義的。Studio工作區會在新索引標籤中開啟**端點**頁面。

1. 選擇**測試推論**索引標籤。

1. 針對**測試選項**選取以下任一選項：

   1. 選取**測試範例請求**，以立即將請求傳送至端點。使用 **JSON 編輯器**，以 JSON 格式提供範例資料，然後選擇**傳送請求**，將請求提交至端點。提交請求之後，Studio 會在 JSON 編輯器右側的卡片中顯示推論輸出。

   1. 選取**使用 Python SDK 範例程式碼**，檢視傳送請求至端點的程式碼。接著，從**範例推論請求**區段複製程式碼範例，並從測試環境執行程式碼。

卡片頂端會顯示傳送至端點的請求類型 (僅接受 JSON)。卡片會顯示下列欄位：
+ **狀態** — 會顯示以下其中一項狀態類型：
  + `Success` — 請求已成功。
  + `Failed` — 請求失敗。回應會顯示在 **Failure Reason** (失敗原因) 底下。
  + `Pending` — 當推論請求處於待處理時，狀態會顯示旋轉的圓形圖示。
+ **執行長度** — 調用所花費的時間 (結束時間減去開始時間)，以毫秒為單位。
+ **請求時間** — 自發送請求以來已過了多少分鐘。
+ **結果時間** — 自傳回結果後已過了多少分鐘。

## 使用 叫用您的模型 適用於 Python (Boto3) 的 AWS SDK
<a name="realtime-endpoints-test-endpoints-api"></a>

如果您想要在應用程式程式碼中叫用模型端點，您可以使用其中一個 AWS SDKs，包括 適用於 Python (Boto3) 的 AWS SDK。若要使用此 SDK 調用您的端點，請使用下列其中一種 Python 方法：
+ `invoke_endpoint` — 將推論請求傳送至模型端點，並傳回模型產生的回應。

  此方法會傳回推論承載，以作為模型完成生成後的一個回應。如需更多資訊，請參閱*適用於 Python 的AWS SDK (Boto3) API 參考*中的 [invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html)。
+ `invoke_endpoint_with_response_stream` — 將推論請求傳送至模型端點，並在模型產生推論時以累加方式串流回應。

  透過這種方法，應用程式會在回應的某部分可用時即接收到該回應部分。如需更多資訊，請參閱*適用於 Python 的AWS SDK (Boto3) API 參考*中的 [invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html)。

  只能使用此方法來調用支援推論串流的模型。

您必須先啟用 SageMaker AI 執行時期用戶端，並且指定端點名稱，才能在應用程式程式碼中使用這些方法。下列範例會針對隨後其餘範例設定用戶端和端點：

```
import boto3

sagemaker_runtime = boto3.client(
    "sagemaker-runtime", region_name='aws_region')

endpoint_name='endpoint-name'
```

### 調用以取得推論回應
<a name="test-invoke-endpoint"></a>

下列範例會使用 `invoke_endpoint` 方法透過 適用於 Python (Boto3) 的 AWS SDK來調用端點：

```
# Gets inference from the model hosted at the specified endpoint:
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name, 
    Body=bytes('{"features": ["This is great!"]}', 'utf-8')
    )

# Decodes and prints the response body:
print(response['Body'].read().decode('utf-8'))
```

此範例在 `Body` 欄位中提供輸入資料，供 SageMaker AI 傳遞至模型。此資料的格式必須與用於訓練的格式相同。此範例會將回應指派至 `response` 變數。

`response` 變數可讓您存取 HTTP 狀態、已部署模型的名稱以及其他欄位。下列程式碼片段會列印 HTTP 狀態程式碼：

```
print(response["HTTPStatusCode"])
```

### 調用以串流推論回應
<a name="test-invoke-endpoint-with-response-stream"></a>

如果您部署了支援推論串流的模型，您可以調用模型來接收其推論承載作為串流的部分。模型會在模型產生這些部分時累加地交付這些部分。當應用程式收到推論串流時，應用程式不需要等待模型產生整個回應承載。相反地，應用程式會在可用時接收部分回應。

藉由在應用程式中使用推論串流，您可以建立互動，讓使用者認為推論速度很快，因為他們會立即取得第一部分。您可以實作串流以支援快速互動體驗，例如聊天機器人、虛擬助理和音樂產生器。例如，您可以建立一個聊天機器人，以累加方式顯示由大型語言模型 (LLM) 產生的文字。

若要取得推論串流，您可以使用 `invoke_endpoint_with_response_stream` 方法。在回應內文中，SDK 提供了一個 `EventStream` 物件，它會將推論作為一系列 `PayloadPart` 物件來提供。

**Example 推論串流**  
下列為 `PayloadPart` 物件串流的範例。  

```
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}}
. . .
```
在每個承載部分中，`Bytes` 欄位會提供來自模型的推論回應之一部分。此部分可以是模型產生的任何內容類型，例如文字、影像或音訊資料。在此範例中，這些部分是 JSON 物件，其中包含來自 LLM 的生成文字。  
通常，承載部分包含來自模型的離散資料區塊。在此範例中，離散區塊是整個 JSON 物件。有時候，回應會將區塊拆分為多個承載部分，或者將多個區塊合併為一個承載部分。下列範例會顯示 JSON 格式的資料區塊，這些資料分割為兩個承載部分：  

```
{'PayloadPart': {'Bytes': b'{"outputs": '}}
{'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
```
當您撰寫處理推論串流的應用程式程式碼時，請包含處理這些偶爾分割和組合資料的邏輯。作為一種策略，您可以編寫代碼來串連應用程式接收承載部分的 `Bytes` 內容。透過在此處串連 JSON 資料範例，您可以將資料合併為以換行符號分隔的 JSON 主體。接著，您的程式碼可透過剖析每行上的整個 JSON 物件來處理串流。  
下列範例會顯示當您串連 `Bytes` 範例內容時所建立的以換行符號分隔之 JSON：  

```
{"outputs": [" a"]}
{"outputs": [" challenging"]}
{"outputs": [" problem"]}
. . .
```

**Example 處理推論串流的程式碼**  

下列範例 Python 類別 (`SmrInferenceStream`) 示範如何處理以 JSON 格式傳送文字資料的推論串流：

```
import io
import json

# Example class that processes an inference stream:
class SmrInferenceStream:
    
    def __init__(self, sagemaker_runtime, endpoint_name):
        self.sagemaker_runtime = sagemaker_runtime
        self.endpoint_name = endpoint_name
        # A buffered I/O stream to combine the payload parts:
        self.buff = io.BytesIO() 
        self.read_pos = 0
        
    def stream_inference(self, request_body):
        # Gets a streaming inference response 
        # from the specified model endpoint:
        response = self.sagemaker_runtime\
            .invoke_endpoint_with_response_stream(
                EndpointName=self.endpoint_name, 
                Body=json.dumps(request_body), 
                ContentType="application/json"
        )
        # Gets the EventStream object returned by the SDK:
        event_stream = response['Body']
        for event in event_stream:
            # Passes the contents of each payload part
            # to be concatenated:
            self._write(event['PayloadPart']['Bytes'])
            # Iterates over lines to parse whole JSON objects:
            for line in self._readlines():
                resp = json.loads(line)
                part = resp.get("outputs")[0]
                # Returns parts incrementally:
                yield part
    
    # Writes to the buffer to concatenate the contents of the parts:
    def _write(self, content):
        self.buff.seek(0, io.SEEK_END)
        self.buff.write(content)

    # The JSON objects in buffer end with '\n'.
    # This method reads lines to yield a series of JSON objects:
    def _readlines(self):
        self.buff.seek(self.read_pos)
        for line in self.buff.readlines():
            self.read_pos += len(line)
            yield line[:-1]
```

此範例會執行下列動作來處理推論串流：
+ 啟用 SageMaker AI 執行時期用戶端並命名模型端點。取得推論串流之前，端點託管的模型必須支援推論串流。
+ 在範例 `stream_inference` 方法中，接收請求內文並將其傳遞給 SDK 的 `invoke_endpoint_with_response_stream` 方法。
+ 迭代 SDK 傳回的 `EventStream` 物件中之每個事件。
+ 從每個事件中，取得 `PayloadPart` 物件中 `Bytes` 物件的內容。
+ 在範例 `_write` 方法中，寫入緩衝區以串連 `Bytes` 物件的內容。合併的內容會形成以換行符號分隔的 JSON 主體。
+ 使用範例 `_readlines` 方法來獲取可迭代的一系列 JSON 物件。
+ 在每個 JSON 物件中，您都會取得一段推論。
+ 使用 `yield` 表達式，以累加方式傳回片段。

下列範例會建立並使用 `SmrInferenceStream` 物件：

```
request_body = {"inputs": ["Large model inference is"],
                "parameters": {"max_new_tokens": 100,
                               "enable_sampling": "true"}}
smr_inference_stream = SmrInferenceStream(
    sagemaker_runtime, endpoint_name)
stream = smr_inference_stream.stream_inference(request_body)
for part in stream:
    print(part, end='')
```

此範例會將請求內容傳遞給 `stream_inference` 方法。它會不斷回應以印出推論串流傳回的每個部分。

此範例假設位於指定端點的模型是產生文字的 LLM。此範例的輸出是以累加方式列印的文字產生內文：

```
a challenging problem in machine learning. The goal is to . . .
```

## 使用 叫用您的模型 AWS CLI
<a name="realtime-endpoints-test-endpoints-cli"></a>

您可以使用 AWS Command Line Interface () 執行命令來叫用模型端點AWS CLI。 AWS CLI 支援使用 `invoke-endpoint` 命令的標準推論請求，並支援使用 `invoke-endpoint-async` 命令的非同步推論請求。

**注意**  
 AWS CLI 不支援串流推論請求。

下列範例會使用 `invoke-endpoint` 命令，將推論請求傳送至模型端點：

```
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name endpoint_name \
    --body fileb://$file_name \
    output_file.txt
```

請針對 `--endpoint-name` 參數提供您在建立端點時為端點指定的名稱。請針對 `--body` 參數提供要傳遞至模型之 SageMaker AI 的輸入資料。資料的格式必須與用於訓練的格式相同。此範例顯示如何將二進位資料傳送至您的端點。

如需將檔案內容傳遞至 參數`fileb://`時何時使用 `file://` 的詳細資訊 AWS CLI，請參閱[本機檔案參數的最佳實務](https://aws.amazon.com/blogs/developer/best-practices-for-local-file-parameters/)。

若要取得更多資訊，並查看您可以傳遞的其他參數，請參閱 *AWS CLI 命令推論*中的 [https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html](https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html)。

如果 `invoke-endpoint` 命令成功，它會傳回類似以下的回應：

```
{
    "ContentType": "<content_type>; charset=utf-8",
    "InvokedProductionVariant": "<Variant>"
}
```

如果命令未成功，請檢查輸入承載的格式是否正確。

檢查檔案輸出檔案來檢視調用的輸出 (在此範例中為 `output_file.txt`)。

```
more output_file.txt
```

## 使用適用於 Python 的 AWS SDK 叫用您的模型
<a name="realtime-endpoints-test-endpoints-sdk"></a>

### 叫用 以雙向串流推論請求和回應
<a name="realtime-endpoints-test-endpoints-sdk-overview"></a>

如果您想要在應用程式程式碼中調用模型端點以支援雙向串流，您可以使用[新的適用於 Python 的實驗性 SDK](https://github.com/awslabs/aws-sdk-python)，其支援雙向串流功能，並支援 HTTP/2。此 SDK 可讓您在用戶端應用程式與 SageMaker 端點之間進行即時雙向通訊，讓您可以在模型產生推論請求的同時接收串流回應。這對於用戶端和伺服器都需要透過持久性連線持續交換資料的互動式應用程式特別有用。

**注意**  
新的實驗性 SDK 與標準 Boto3 SDK 不同，並支援資料交換的持久性雙向連線。使用實驗性 Python 開發套件時，我們強烈建議對任何非實驗性使用案例嚴格鎖定開發套件版本。

若要使用雙向串流叫用您的端點，請使用 `invoke_endpoint_with_bidirectional_stream`方法。此方法會建立持久性連線，可讓您將多個承載區塊串流至模型，同時在模型處理資料時即時接收回應。在您明確關閉輸入串流或端點關閉連線之前，連線會保持開啟狀態，最多支援 30 分鐘的連線時間。

### 先決條件
<a name="realtime-endpoints-test-endpoints-sdk-prereq"></a>

在應用程式程式碼中使用雙向串流之前，您必須：

1. 安裝實驗性 SageMaker 執行期 HTTP/2 SDK

1. 為您的 SageMaker 執行期用戶端設定 AWS 登入資料

1. 部署支援雙向串流到 SageMaker 端點的模型

### 設定雙向串流用戶端
<a name="realtime-endpoints-test-endpoints-sdk-setup-client"></a>

下列範例示範如何初始化雙向串流所需的元件：

```
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

# Configuration
AWS_REGION = "us-west-2"
BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443"
ENDPOINT_NAME = "your-endpoint-name"

# Initialize the client configuration
config = Config(
    endpoint_uri=BIDI_ENDPOINT,
    region=AWS_REGION,
    aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
    auth_scheme_resolver=HTTPAuthSchemeResolver(),
    auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
)

# Create the SageMaker Runtime HTTP/2 client
client = SageMakerRuntimeHTTP2Client(config=config)
```

### 完成雙向串流用戶端
<a name="realtime-endpoints-test-endpoints-sdk-complete-client"></a>

下列範例示範如何建立雙向串流用戶端，將多個文字承載傳送至 SageMaker 端點，並即時處理回應：

```
import asyncio
import logging
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from sagemaker_runtime_http2.models import (
    InvokeEndpointWithBidirectionalStreamInput, 
    RequestStreamEventPayloadPart, 
    RequestPayloadPart
)
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SageMakerBidirectionalClient:
    
    def __init__(self, endpoint_name, region="us-west-2"):
        self.endpoint_name = endpoint_name
        self.region = region
        self.client = None
        self.stream = None
        self.response_task = None
        self.is_active = False
        
    def _initialize_client(self):
        bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443"
        config = Config(
            endpoint_uri=bidi_endpoint,
            region=self.region,
            aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
            auth_scheme_resolver=HTTPAuthSchemeResolver(),
            auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
        )
        self.client = SageMakerRuntimeHTTP2Client(config=config)
    
    async def start_session(self):
        """Establish a bidirectional streaming connection with the endpoint."""
        if not self.client:
            self._initialize_client()
            
        logger.info(f"Starting session with endpoint: {self.endpoint_name}")
        self.stream = await self.client.invoke_endpoint_with_bidirectional_stream(
            InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name)
        )
        self.is_active = True
        
        # Start processing responses concurrently
        self.response_task = asyncio.create_task(self._process_responses())
    
    async def send_message(self, message):
        """Send a single message to the endpoint."""
        if not self.is_active:
            raise RuntimeError("Session not active. Call start_session() first.")
            
        logger.info(f"Sending message: {message}")
        payload = RequestPayloadPart(bytes_=message.encode('utf-8'))
        event = RequestStreamEventPayloadPart(value=payload)
        await self.stream.input_stream.send(event)
    
    async def send_multiple_messages(self, messages, delay=1.0):
        """Send multiple messages with a delay between each."""
        for message in messages:
            await self.send_message(message)
            await asyncio.sleep(delay)
    
    async def end_session(self):
        """Close the bidirectional streaming connection."""
        if not self.is_active:
            return
            
        await self.stream.input_stream.close()
        self.is_active = False
        logger.info("Stream closed")
        
        # Cancel the response processing task
        if self.response_task and not self.response_task.done():
            self.response_task.cancel()
    
    async def _process_responses(self):
        """Process incoming responses from the endpoint."""
        try:
            output = await self.stream.await_output()
            output_stream = output[1]
            
            while self.is_active:
                result = await output_stream.receive()
                
                if result is None:
                    logger.info("No more responses")
                    break
                
                if result.value and result.value.bytes_:
                    response_data = result.value.bytes_.decode('utf-8')
                    logger.info(f"Received: {response_data}")
                    
        except Exception as e:
            logger.error(f"Error processing responses: {e}")

# Example usage
async def run_bidirectional_client():
    client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name")
    
    try:
        # Start the session
        await client.start_session()
        
        # Send multiple messages
        messages = [
            "I need help with", 
            "my account balance", 
            "I can help with that", 
            "and recent charges"
        ]
        await client.send_multiple_messages(messages)
        
        # Wait for responses to be processed
        await asyncio.sleep(2)
        
        # End the session
        await client.end_session()
        logger.info("Session ended successfully")
        
    except Exception as e:
        logger.error(f"Client error: {e}")
        await client.end_session()

if __name__ == "__main__":
    asyncio.run(run_bidirectional_client())
```

用戶端會使用連接埠 8443 上的區域端點 URI 初始化 SageMaker 執行期 HTTP/2 用戶端，這是雙向串流連線所需的。start\$1`session()` 方法會呼叫 `invoke_endpoint_with_bidirectional_stream()` 來建立持久性連線，並建立非同步任務來同時處理傳入的回應。

`send_event()` 方法會將承載資料包裝在適當的請求物件中，並透過輸入串流傳送，同時`_process_responses()`方法會在端點到達時持續接聽和處理來自端點的回應。這種雙向方法可啟用即時互動，其中傳送請求和接收回應都會同時透過相同的連線進行。