Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Nozioni di base su Kinesis Data Streams per Amazon DynamoDB
Questa sezione descrive come utilizzare Kinesis Data Streams per le tabelle Amazon DynamoDB con la console Amazon DynamoDB, il () e l'API AWS Command Line Interface .AWS CLI
Creazione di un flusso di dati Amazon Kinesis attivo
Tutti questi esempi utilizzano la tabella DynamoDB Music che è stata creata come parte del tutorial Nozioni di base su DynamoDB.
Per ulteriori informazioni su come creare consumatori e connettere il flusso di dati Kinesis ad altri servizi AWS , consulta Leggere i dati dal flusso di dati Amazon Kinesis, nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis.
Quando utilizzi per la prima volta le partizioni KDS, consigliamo di impostarle in modo che aumentino o diminuiscano in base ai modelli di utilizzo. Dopo aver accumulato ulteriori dati sui modelli di utilizzo, è possibile adattare le partizioni nel flusso di conseguenza.
- Console
-
-
Accedi Console di gestione AWS e apri la console Kinesis all'indirizzo. https://console.aws.amazon.com/kinesis/
-
Scegli Crea flusso di dati e segui le istruzioni per creare un flusso denominato samplestream.
-
Apri la console DynamoDB all'indirizzo. https://console.aws.amazon.com/dynamodb/
-
Nel riquadro di navigazione sul lato sinistro della console scegli Tables (Tabelle).
-
Seleziona la tabella Music.
-
Scegli la scheda Exports and streams (Esportazioni e flussi).
-
(Facoltativo) Nella sezione Dettagli del flusso di dati Amazon Kinesis, è possibile modificare la precisione del timestamp del record da microsecondi (impostazione predefinita) a millisecondi.
-
Scegli samplestream dall'elenco a discesa.
-
Seleziona il pulsante Attiva.
- AWS CLI
-
-
Crea un flusso di dati Kinesis denominato samplestream utilizzando il comando create-stream.
aws kinesis create-stream --stream-name samplestream --shard-count 3
Prima di configurare il numero di partizioni per il flusso di dati Kinesis, consulta Considerazioni sulla gestione delle partizioni per Kinesis Data Streams.
-
Verificare che il flusso Kinesis sia attivo e pronto per l'uso utilizzando il comando describe-stream.
aws kinesis describe-stream --stream-name samplestream
-
Abilitare lo streaming di Kinesis sulla tabella DynamoDB utilizzando il comando DynamoDB enable-kinesis-streaming-destination. Sostituisci il valore stream-arn con quello restituito da describe-stream nella fase precedente. Facoltativamente, abilita lo streaming con una precisione più granulare (microsecondi) dei valori di timestamp restituiti su ogni record.
Abilita lo streaming con la precisione del timestamp in microsecondi:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
Oppure abilita lo streaming con la precisione del timestamp predefinita (millisecondi):
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
Abilitare lo streaming di Kinesis sulla tabella DynamoDB utilizzando il comando describe-kinesis-streaming-destination DynamoDB.
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
Scrivere i dati nella tabella DynamoDB utilizzando il comando put-item, come descritto nella Guida per gli sviluppatori di DynamoDB.
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Utilizza il comando get-record della CLI di Kinesis per recuperare il contenuto del flusso Kinesis. Quindi utilizzare il seguente frammento di codice per deserializzare il contenuto del flusso.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
Seguire le istruzioni contenute nella guida per gli sviluppatori di Kinesis Data Streams per creare un flusso di dati Kinesis denominato samplestream tramite Java.
Prima di configurare il numero di partizioni per il flusso di dati Kinesis, consulta Considerazioni sulla gestione delle partizioni per Kinesis Data Streams.
-
Utilizzo il seguente frammento di codice per abilitare lo streamingKinesis sulla tabella DynamoDB. Facoltativamente, abilita lo streaming con una precisione più granulare (microsecondi) dei valori di timestamp restituiti su ogni record.
Abilita lo streaming con la precisione del timestamp in microsecondi:
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
Oppure abilita lo streaming con la precisione del timestamp predefinita (millisecondi):
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
Segui le istruzioni contenute nella guida per gli sviluppatori di Kinesis Data Streams per leggere dal flusso di dati creato.
-
Utilizza il seguente frammento di codice per deserializzare il contenuto del flusso.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
Applicazione di modifiche a un flusso di dati Amazon Kinesis attivo
Questa sezione descrive come apportare modifiche a una configurazione attiva di Kinesis Data Streams for DynamoDB utilizzando la console e l'API. AWS CLI
Console di gestione AWS
AWS CLI
-
Chiama describe-kinesis-streaming-destination per confermare che il flusso sia ACTIVE.
-
Chiama UpdateKinesisStreamingDestination, come in questo esempio:
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
Chiama describe-kinesis-streaming-destination per confermare che il flusso sia UPDATING.
-
Chiama describe-kinesis-streaming-destination periodicamente fino a quando lo stato dello streaming non è nuovamente ACTIVE. In genere sono necessari fino a 5 minuti perché gli aggiornamenti alla precisione del timestamp entrino in vigore. Una volta che lo stato è aggiornato, significa che l’aggiornamento è completo e il nuovo valore di precisione verrà applicato ai record futuri.
-
Scrivi nella tabella usando putItem.
-
Utilizza il comando get-records di Kinesis per recuperare il contenuto del flusso.
-
Verifica che ApproximateCreationDateTime delle scritture presenti la precisione desiderata.
API Java
-
Fornisci un frammento di codice che costruisce una richiesta UpdateKinesisStreamingDestination e una risposta UpdateKinesisStreamingDestination.
-
Fornisci un frammento di codice che costruisce una richiesta DescribeKinesisStreamingDestination e una DescribeKinesisStreamingDestination response.
-
Chiama describe-kinesis-streaming-destination periodicamente fino a quando lo stato dello streaming non è nuovamente ACTIVE, a indicare che l’aggiornamento è completo e il nuovo valore di precisione verrà applicato ai record futuri.
-
Effettua le scritture sulla tabella.
-
Effettua la lettura dal flusso e deserializza il contenuto del flusso.
-
Verifica che ApproximateCreationDateTime delle scritture presenti la precisione desiderata.