

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Invocar modelos para inferência em tempo real
<a name="realtime-endpoints-test-endpoints"></a>

Depois de usar o Amazon SageMaker AI para implantar um modelo em um endpoint, você pode interagir com o modelo enviando solicitações de inferência para ele. Para enviar uma solicitação de inferência para um modelo, você invoca o endpoint que o hospeda. Você pode invocar seus endpoints usando o Amazon SageMaker Studio AWS SDKs, o ou o. AWS CLI

## Invoque seu modelo usando o Amazon Studio SageMaker
<a name="realtime-endpoints-test-endpoints-studio"></a>

Depois de implantar seu modelo em um endpoint, você pode visualizar o endpoint por meio do Amazon SageMaker Studio e testar seu endpoint enviando solicitações de inferência únicas.

**nota**  
SageMaker A IA só oferece suporte a testes de endpoint no Studio para endpoints em tempo real.

**Para enviar uma solicitação de inferência de teste para seu endpoint**

1. Inicie o Amazon SageMaker Studio.

1. No painel de navegação à esquerda, escolha **Implantações**.

1. No menu suspenso, escolha **Endpoints.**

1. Encontre seu endpoint pelo nome e escolha o nome na tabela. Os nomes dos endpoints listados no painel **Endpoints** são definidos quando você implanta um modelo. A área de trabalho do Studio abre a página de **Endpoint** em uma nova guia.

1. Escolha a guia **Testar inferência**.

1. Em **Opções de teste**, escolha uma das seguintes opções:

   1. Selecione **Testar a solicitação de amostra** para enviar imediatamente uma solicitação ao endpoint. Use o **editor JSON** para fornecer dados de amostra no formato JSON e escolha **Enviar solicitação**para enviar a solicitação ao endpoint. Depois de enviar a solicitação, o Studio mostra a saída da inferência em um cartão à direita do editor JSON.

   1. Selecione **Usar código de exemplo do Python SDK** para visualizar o código e enviar uma solicitação ao endpoint. Em seguida, copie o exemplo de código da seção **Exemplo de solicitação de inferência** e execute o código no ambiente de teste.

A parte superior do cartão mostra o tipo de solicitação que foi enviada ao endpoint (somente JSON é aceito). O cartão mostra os seguintes campos:
+ **Status**: exibe um dos seguintes tipos de status:
  + `Success`: a solicitação foi bem-sucedida.
  + `Failed`: a solicitação falhou. Uma resposta aparece em **Motivo da falha**.
  + `Pending`: enquanto a solicitação de inferência está pendente, o status mostra um ícone circular giratório.
+ **Duração da execução**: quanto tempo demorou a invocação (hora de término menos a hora de início) em milissegundos.
+ **Tempo da solicitação**: quantos minutos se passaram desde que a solicitação foi enviada.
+ **Tempo do resultado**: quantos minutos se passaram desde que o resultado foi devolvido.

## Invoque seu modelo usando o AWS SDK para Python (Boto3)
<a name="realtime-endpoints-test-endpoints-api"></a>

Se quiser invocar um endpoint de modelo no código do seu aplicativo, você pode usar um dos AWS SDKs, incluindo o. AWS SDK para Python (Boto3) Para invocar o endpoint com esse SDK, use um dos seguintes métodos Python:
+ `invoke_endpoint`: envia uma solicitação de inferência para um endpoint do modelo e exibe a resposta que o modelo gera. 

  Esse método retorna a carga útil de inferência como uma resposta depois que o modelo termina de gerá-la. Para mais informações, consulte a [invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html) no *AWS SDK para Referência API Python (Boto3)*.
+ `invoke_endpoint_with_response_stream`: envia uma solicitação de inferência para um endpoint do modelo e transmite a resposta de forma incremental enquanto o modelo a gera. 

  Com esse método, a aplicação recebe partes da resposta à medida que se tornam disponíveis. Para mais informações, consulte a [invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html) no *AWS SDK para Referência API Python (Boto3)*.

  Use esse método somente para invocar modelos que seja compatível comm streaming de inferência.

Antes de usar esses métodos no código do aplicativo, você deve inicializar um cliente do SageMaker AI Runtime e especificar o nome do seu endpoint. O seguinte exemplo configura o cliente e o endpoint para os demais seguintes exemplos:

```
import boto3

sagemaker_runtime = boto3.client(
    "sagemaker-runtime", region_name='aws_region')

endpoint_name='endpoint-name'
```

### Invoque para obter uma resposta de inferência
<a name="test-invoke-endpoint"></a>

O seguinte exemplo usa o método `invoke_endpoint` para invocar um endpoint com o AWS SDK para Python (Boto3):

```
# Gets inference from the model hosted at the specified endpoint:
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name, 
    Body=bytes('{"features": ["This is great!"]}', 'utf-8')
    )

# Decodes and prints the response body:
print(response['Body'].read().decode('utf-8'))
```

Este exemplo fornece dados de entrada no `Body` campo para a SageMaker IA passar para o modelo. Esses dados devem estar no mesmo formato usado para treinamento. O exemplo designa a resposta para a variável `response`.

A variável `response` fornece acesso ao status HTTP, ao nome do modelo implantado e a outros campos. O trecho a seguir imprime o código de status de HTTP:

```
print(response["HTTPStatusCode"])
```

### Invoque para obter uma resposta de fluxo
<a name="test-invoke-endpoint-with-response-stream"></a>

Se você implantou um modelo que é compatível com streaming de inferência, você pode invocar o modelo para receber sua carga útil de inferência como um fluxo de partes. O modelo entrega essas peças de forma incremental à medida que o modelo as gera. Quando uma aplicação recebe um fluxo de inferência, a aplicação não precisa esperar que o modelo gere toda a carga útil da resposta. Em vez disso, a aplicação recebe imediatamente partes da resposta à medida que se tornam disponíveis. 

Ao consumir um fluxo de inferência em sua aplicação, você pode criar interações em que seus usuários percebam que a inferência é rápida porque obtêm a primeira parte imediatamente. Você pode implementar o streaming para oferecer apoio a experiências interativas rápidas, como chatbots, assistentes virtuais e geradores de música. Por exemplo, você pode criar um chatbot que mostre incrementalmente o texto gerado por um modelo de linguagem grande (LLM).

Para obter um fluxo de inferência, você pode usar o método `invoke_endpoint_with_response_stream`. No corpo da resposta, o SDK fornece um objeto `EventStream`, que fornece a inferência como uma série de objetos `PayloadPart`.

**Example Fluxo de inferência**  
O seguinte exemplo é um fluxo de objetos `PayloadPart`:  

```
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}}
. . .
```
Em cada parte da carga útil, o campo `Bytes` fornece uma parte da resposta de inferência do modelo. Essa parte pode ser qualquer tipo de conteúdo gerado por um modelo, como texto, imagem ou dados de áudio. Neste exemplo, as partes são objetos JSON que contêm texto gerado de um LLM.  
Normalmente, a parte da carga útil contém um bloco discreto de dados do modelo. Neste exemplo, os blocos discretos são objetos JSON inteiros. Ocasionalmente, a resposta de streaming divide as partes em várias partes da carga útil ou combina várias partes em uma parte da carga útil. O seguinte exemplo mostra um bloco de dados no formato JSON dividido em duas partes da carga útil:  

```
{'PayloadPart': {'Bytes': b'{"outputs": '}}
{'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
```
Ao escrever um código de aplicação que processa um fluxo de inferência, inclua uma lógica que manipule essas divisões e combinações ocasionais de dados. Como uma estratégia, você pode escrever um código que concatene o conteúdo `Bytes` enquanto sua aplicação recebe as partes da carga útil. Ao concatenar os dados JSON de exemplo aqui, você combinaria os dados em um corpo JSON delimitado por nova linha. Em seguida, seu código poderia processar o fluxo analisando todo o objeto JSON em cada linha.  
O seguinte exemplo mostra o JSON delimitado por nova linha que você criaria ao concatenar o conteúdo de exemplo de `Bytes`:  

```
{"outputs": [" a"]}
{"outputs": [" challenging"]}
{"outputs": [" problem"]}
. . .
```

**Example Código para processar um fluxo de inferência**  

O seguinte exemplo de classe Python, `SmrInferenceStream`, demonstra como você pode processar um fluxo de inferência que envia dados de texto no formato JSON:

```
import io
import json

# Example class that processes an inference stream:
class SmrInferenceStream:
    
    def __init__(self, sagemaker_runtime, endpoint_name):
        self.sagemaker_runtime = sagemaker_runtime
        self.endpoint_name = endpoint_name
        # A buffered I/O stream to combine the payload parts:
        self.buff = io.BytesIO() 
        self.read_pos = 0
        
    def stream_inference(self, request_body):
        # Gets a streaming inference response 
        # from the specified model endpoint:
        response = self.sagemaker_runtime\
            .invoke_endpoint_with_response_stream(
                EndpointName=self.endpoint_name, 
                Body=json.dumps(request_body), 
                ContentType="application/json"
        )
        # Gets the EventStream object returned by the SDK:
        event_stream = response['Body']
        for event in event_stream:
            # Passes the contents of each payload part
            # to be concatenated:
            self._write(event['PayloadPart']['Bytes'])
            # Iterates over lines to parse whole JSON objects:
            for line in self._readlines():
                resp = json.loads(line)
                part = resp.get("outputs")[0]
                # Returns parts incrementally:
                yield part
    
    # Writes to the buffer to concatenate the contents of the parts:
    def _write(self, content):
        self.buff.seek(0, io.SEEK_END)
        self.buff.write(content)

    # The JSON objects in buffer end with '\n'.
    # This method reads lines to yield a series of JSON objects:
    def _readlines(self):
        self.buff.seek(self.read_pos)
        for line in self.buff.readlines():
            self.read_pos += len(line)
            yield line[:-1]
```

Este exemplo processa o fluxo de inferência fazendo o seguinte:
+ Inicializa um cliente SageMaker AI Runtime e define o nome de um endpoint do modelo. Antes de obter um fluxo de inferência, o modelo que o endpoint hospeda deve oferecer compatibilidade com o streaming de inferência.
+ No método de exemplo `stream_inference`, recebe um corpo de solicitação e o passa para o método `invoke_endpoint_with_response_stream` do SDK.
+ Itera sobre cada evento no objeto `EventStream` que o SDK retorna.
+ De cada evento, obtém o conteúdo do objeto `Bytes` no objeto `PayloadPart`.
+ No método `_write` de exemplo, grava em um buffer para concatenar o conteúdo dos objetos `Bytes`. O conteúdo combinado forma um corpo JSON delimitado por nova linha.
+ Usa o método `_readlines` de exemplo para obter uma série iterável de objetos JSON.
+ Em cada objeto JSON, obtém uma parte da inferência.
+ Com a expressão `yield`, retorna as peças de forma incremental.

O seguinte exemplo cria e usa um objeto `SmrInferenceStream`:

```
request_body = {"inputs": ["Large model inference is"],
                "parameters": {"max_new_tokens": 100,
                               "enable_sampling": "true"}}
smr_inference_stream = SmrInferenceStream(
    sagemaker_runtime, endpoint_name)
stream = smr_inference_stream.stream_inference(request_body)
for part in stream:
    print(part, end='')
```

Este exemplo passa um corpo de solicitação para o método `stream_inference`. Ele itera sobre a resposta para imprimir cada peça que o fluxo de inferência retorna.

O exemplo pressupõe que o modelo no endpoint especificado é um LLM que gera texto. A saída desse exemplo é um corpo de texto gerado que é impresso de forma incremental:

```
a challenging problem in machine learning. The goal is to . . .
```

## Invoque seu modelo usando o AWS CLI
<a name="realtime-endpoints-test-endpoints-cli"></a>

Você pode invocar o endpoint do seu modelo executando comandos com o AWS Command Line Interface ()AWS CLI. A AWS CLI oferece apoio a solicitações de inferência padrão com o comando `invoke-endpoint` e oferece apoio a solicitações de inferência assíncronas com o comando `invoke-endpoint-async`.

**nota**  
O AWS CLI não suporta solicitações de inferência de streaming.

O seguinte exemplo usa o comando `invoke-endpoint` para enviar uma solicitação de inferência para um endpoint do modelo:

```
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name endpoint_name \
    --body fileb://$file_name \
    output_file.txt
```

Para o parâmetro `--endpoint-name`, forneça o nome do endpoint que você especificou quando o criou. Para o `--body` parâmetro, forneça dados de entrada para que a SageMaker IA passe para o modelo. Os dados devem estar no mesmo formato usado para treinamento. Este exemplo mostra como enviar dados binários para o seu endpoint.

Para obter mais informações sobre quando usar `file://` over `fileb://` ao passar o conteúdo de um arquivo para um parâmetro do AWS CLI, consulte [Melhores práticas para parâmetros de arquivos locais](https://aws.amazon.com/blogs/developer/best-practices-for-local-file-parameters/).

Para obter mais informações e ver parâmetros adicionais que você pode passar, consulte [https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html](https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html) na *Referência de Comandos AWS CLI *.

Se o comando `invoke-endpoint` for bem-sucedido, ele retornará uma resposta como a seguinte:

```
{
    "ContentType": "<content_type>; charset=utf-8",
    "InvokedProductionVariant": "<Variant>"
}
```

Se o comando não for bem-sucedido, verifique se a carga útil de entrada está no formato correto.

Visualize a saída da invocação verificando o arquivo de saída do arquivo (`output_file.txt` neste exemplo).

```
more output_file.txt
```

## Invoque seu modelo usando o AWS SDK para Python
<a name="realtime-endpoints-test-endpoints-sdk"></a>

### Invocar para transmitir bidirecionalmente uma solicitação e resposta de inferência
<a name="realtime-endpoints-test-endpoints-sdk-overview"></a>

Se você quiser invocar um endpoint de modelo no código do seu aplicativo para suportar streaming bidirecional, você pode usar o [novo SDK experimental para Python](https://github.com/awslabs/aws-sdk-python) que oferece suporte ao recurso de streaming bidirecional com suporte a HTTP/2. Esse SDK permite a comunicação bidirecional em tempo real entre seu aplicativo cliente e o SageMaker endpoint, permitindo que você envie solicitações de inferência de forma incremental e, ao mesmo tempo, receba respostas de streaming à medida que o modelo as gera. Isso é particularmente útil para aplicativos interativos em que o cliente e o servidor precisam trocar dados continuamente por meio de uma conexão persistente.

**nota**  
O novo SDK experimental é diferente do SDK Boto3 padrão e oferece suporte a conexões bidirecionais persistentes para troca de dados. Ao usar o SDK experimental do Python, recomendamos fortemente a fixação estrita em uma versão do SDK para qualquer caso de uso não experimental.

Para invocar seu endpoint com streaming bidirecional, use o método. `invoke_endpoint_with_bidirectional_stream` Esse método estabelece uma conexão persistente que permite transmitir vários fragmentos de carga útil para seu modelo enquanto recebe respostas em tempo real enquanto o modelo processa os dados. A conexão permanece aberta até que você feche explicitamente o fluxo de entrada ou o endpoint feche a conexão, suportando até 30 minutos de tempo de conexão.

### Pré-requisitos
<a name="realtime-endpoints-test-endpoints-sdk-prereq"></a>

Antes de usar o streaming bidirecional no código do aplicativo, você deve:

1. Instale o SDK experimental SageMaker Runtime HTTP/2

1. Configurar AWS credenciais para seu cliente SageMaker Runtime

1. Implemente um modelo que suporte streaming bidirecional para um endpoint SageMaker 

### Configurar o cliente de streaming bidirecional
<a name="realtime-endpoints-test-endpoints-sdk-setup-client"></a>

O exemplo a seguir mostra como inicializar os componentes necessários para streaming bidirecional:

```
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

# Configuration
AWS_REGION = "us-west-2"
BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443"
ENDPOINT_NAME = "your-endpoint-name"

# Initialize the client configuration
config = Config(
    endpoint_uri=BIDI_ENDPOINT,
    region=AWS_REGION,
    aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
    auth_scheme_resolver=HTTPAuthSchemeResolver(),
    auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
)

# Create the SageMaker Runtime HTTP/2 client
client = SageMakerRuntimeHTTP2Client(config=config)
```

### Cliente de streaming bidirecional completo
<a name="realtime-endpoints-test-endpoints-sdk-complete-client"></a>

O exemplo a seguir demonstra como criar um cliente de streaming bidirecional que envia várias cargas de texto para um SageMaker endpoint e processa as respostas em tempo real:

```
import asyncio
import logging
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from sagemaker_runtime_http2.models import (
    InvokeEndpointWithBidirectionalStreamInput, 
    RequestStreamEventPayloadPart, 
    RequestPayloadPart
)
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SageMakerBidirectionalClient:
    
    def __init__(self, endpoint_name, region="us-west-2"):
        self.endpoint_name = endpoint_name
        self.region = region
        self.client = None
        self.stream = None
        self.response_task = None
        self.is_active = False
        
    def _initialize_client(self):
        bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443"
        config = Config(
            endpoint_uri=bidi_endpoint,
            region=self.region,
            aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
            auth_scheme_resolver=HTTPAuthSchemeResolver(),
            auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
        )
        self.client = SageMakerRuntimeHTTP2Client(config=config)
    
    async def start_session(self):
        """Establish a bidirectional streaming connection with the endpoint."""
        if not self.client:
            self._initialize_client()
            
        logger.info(f"Starting session with endpoint: {self.endpoint_name}")
        self.stream = await self.client.invoke_endpoint_with_bidirectional_stream(
            InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name)
        )
        self.is_active = True
        
        # Start processing responses concurrently
        self.response_task = asyncio.create_task(self._process_responses())
    
    async def send_message(self, message):
        """Send a single message to the endpoint."""
        if not self.is_active:
            raise RuntimeError("Session not active. Call start_session() first.")
            
        logger.info(f"Sending message: {message}")
        payload = RequestPayloadPart(bytes_=message.encode('utf-8'))
        event = RequestStreamEventPayloadPart(value=payload)
        await self.stream.input_stream.send(event)
    
    async def send_multiple_messages(self, messages, delay=1.0):
        """Send multiple messages with a delay between each."""
        for message in messages:
            await self.send_message(message)
            await asyncio.sleep(delay)
    
    async def end_session(self):
        """Close the bidirectional streaming connection."""
        if not self.is_active:
            return
            
        await self.stream.input_stream.close()
        self.is_active = False
        logger.info("Stream closed")
        
        # Cancel the response processing task
        if self.response_task and not self.response_task.done():
            self.response_task.cancel()
    
    async def _process_responses(self):
        """Process incoming responses from the endpoint."""
        try:
            output = await self.stream.await_output()
            output_stream = output[1]
            
            while self.is_active:
                result = await output_stream.receive()
                
                if result is None:
                    logger.info("No more responses")
                    break
                
                if result.value and result.value.bytes_:
                    response_data = result.value.bytes_.decode('utf-8')
                    logger.info(f"Received: {response_data}")
                    
        except Exception as e:
            logger.error(f"Error processing responses: {e}")

# Example usage
async def run_bidirectional_client():
    client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name")
    
    try:
        # Start the session
        await client.start_session()
        
        # Send multiple messages
        messages = [
            "I need help with", 
            "my account balance", 
            "I can help with that", 
            "and recent charges"
        ]
        await client.send_multiple_messages(messages)
        
        # Wait for responses to be processed
        await asyncio.sleep(2)
        
        # End the session
        await client.end_session()
        logger.info("Session ended successfully")
        
    except Exception as e:
        logger.error(f"Client error: {e}")
        await client.end_session()

if __name__ == "__main__":
    asyncio.run(run_bidirectional_client())
```

O cliente inicializa o cliente SageMaker Runtime HTTP/2 com o URI do endpoint regional na porta 8443, que é necessário para conexões de streaming bidirecionais. O `session()` método start\$1 chama `invoke_endpoint_with_bidirectional_stream()` para estabelecer a conexão persistente e cria uma tarefa assíncrona para processar as respostas recebidas simultaneamente.

O `send_event()` método agrupa os dados da carga nos objetos de solicitação apropriados e os envia pelo fluxo de entrada, enquanto escuta e processa `_process_responses()` continuamente as respostas do endpoint à medida que elas chegam. Essa abordagem bidirecional permite a interação em tempo real, na qual o envio de solicitações e o recebimento de respostas acontecem simultaneamente na mesma conexão.