View a markdown version of this page

Automatización de la ingesat del flujo de datos en una base de datos de Snowflake mediante Snowflake Snowpipe, Amazon S3, Amazon SNS y Amazon Data Firehose - Recomendaciones de AWS

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Automatización de la ingesat del flujo de datos en una base de datos de Snowflake mediante Snowflake Snowpipe, Amazon S3, Amazon SNS y Amazon Data Firehose

Bikash Chandra Rout, Amazon Web Services

Resumen

Este patrón describe cómo puede utilizar los servicios de la nube de Amazon Web Services (AWS) para procesar un flujo continuo de datos y cargarlo en una base de datos de Snowflake. El patrón utiliza Amazon Data Firehose para entregar los datos a Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (Amazon SNS), y así enviar notificaciones cuando se reciben nuevos datos. También utiliza Snowflake Snowpipe para cargar los datos en una base de datos de Snowflake.

Si sigue este patrón, podrá disponer de datos generados de forma continua para analizarlos en cuestión de segundos, evitando así múltiples comandos COPY manuales. También podrá conseguir un soporte completo para datos semiestructurados al cargarlos.

Requisitos previos y limitaciones

Requisitos previos 

  • Un activo. Cuenta de AWS

  • Un origen de datos que envía datos de forma continua a un flujo de entrega de Firehose.

  • Un bucket de S3 existente que recibe los datos desde el flujo de entrega de Firehose.

  • Una cuenta de Snowflake activa.

Limitaciones

  • Snowflake Snowpipe no se conecta directamente a Firehose.

Arquitectura

Los datos que ingiere Firehose van a Amazon S3, Amazon SNS, Snowflake Snowpipe y Base de datos de Snowflake.

Pila de tecnología

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake Snowpipe

  • Base de datos de Snowflake

Tools (Herramientas)

  • Amazon Data Firehose es un servicio totalmente gestionado para entregar datos de streaming en tiempo real a destinos como Amazon S3, Amazon Redshift, OpenSearch Amazon Service, Splunk y cualquier punto de enlace HTTP personalizado o punto de enlace HTTP propiedad de proveedores de servicios externos compatibles.

  • Amazon Simple Storage Service (Amazon S3) es un servicio de almacenamiento para Internet.

  • Amazon Simple Notification Service (Amazon SNS) coordina y administra la entrega o el envío de mensajes a los puntos de enlace o clientes suscritos.

  • Snowflake: Snowflake es un almacén de datos analíticos que se proporciona como (SaaS). Software-as-a-Service

  • Snowflake Snowpipe: Snowpipe carga los datos de los archivos tan pronto como están disponibles en una etapa de Snowflake.

Epics

TareaDescripciónHabilidades requeridas

Cree un archivo CSV en Snowflake.

Inicie sesión en Snowflake y ejecute el comando CREATE FILE FORMAT para crear un archivo CSV con un delimitador de campo específico. Para más información sobre este y otros comandos de Snowflake, consulte la sección Información adicional.

Desarrollador

Cree una etapa de Snowflake externa.

Ejecute el comando CREATE STAGE para crear una etapa de Snowflake externa que haga referencia al archivo CSV que creó anteriormente. Importante: Necesitará la URL del bucket de S3, su clave de acceso y su clave de AWS acceso secreta. AWS Ejecute el comando SHOW STAGES para comprobar que se haya creado la etapa de Snowflake.

Desarrollador

Cree la tabla de destino de Snowflake.

Ejecute el comando CREATE TABLE para crear la tabla de Snowflake.

Desarrollador

Cree una canalización.

Ejecute el comando CREATE PIPE y asegúrese de que auto_ingest=true esté en el comando. Ejecute el comando SHOW PIPES para comprobar que se haya creado la canalización. Copie y guarde el valor de columna notification_channel. Este valor se utilizará para configurar las notificaciones de eventos de Amazon S3.

Desarrollador
TareaDescripciónHabilidades requeridas

Cree una política de ciclo de vida de 30 días para el bucket de S3.

Inicie sesión en la consola Amazon S3 Consola de administración de AWS y ábrala. Seleccione el bucket de S3 que tiene los datos de Firehose. A continuación, seleccione la pestaña Administración en el bucket de S3 y elija Agregar regla de ciclo de vida. Escriba un nombre para la regla en el cuadro de diálogo Regla de ciclo de vida y configure una regla del ciclo de vida de 30 días para su bucket. Para obtener ayuda con esta y otras explicaciones, consulte la sección Recursos relacionados.

Administrador de sistemas, desarrollador

Cree una política de IAM para el bucket de S3.

Abra la consola AWS Identity and Access Management (IAM) y elija Políticas. Elija Crear política y, a continuación, elija la pestaña JSON. Copie y pegue la política de la sección Información adicional en el campo JSON. Esta política concederá los permisos PutObject y DeleteObject, así como los permisos GetObject, GetObjectVersion y ListBucket. Elija Revisar política, escriba un nombre para la política y después elija Crear política.

Administrador de sistemas, desarrollador

Asigne la política a un rol de IAM.

Abra la consola de IAM, elija Roles y, a continuación, elija Crear rol. Elija Otra cuenta de AWS como entidad de confianza. Introduzca su Cuenta de AWS ID y seleccione Requerir ID externo. Introduzca un marcador de posición de ID que podrá cambiar más adelante. Elija Siguiente y asigne la política de IAM que creó anteriormente. Luego, cree el rol de IAM.

Administrador de sistemas, desarrollador

Copie el nombre de recurso de Amazon (ARN) para el rol de IAM.

Abra la consola de IAM; y elija Roles. Elija el rol de IAM que creó anteriormente y, a continuación, copie y almacene el ARN del rol.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Cree una integración de almacenamiento en Snowflake.

Inicie sesión en Snowflake y ejecute el comando CREATE STORAGE INTEGRATION. Esto modificará la relación de confianza, concederá acceso a Snowflake y proporcionará el ID externo de su etapa de Snowflake.

Administrador de sistemas, desarrollador

Recupere el rol de IAM para su cuenta de Snowflake.

Ejecute el comando DESC INTEGRATION para recuperar el ARN del rol de IAM.

importante

<integration_ name> es el nombre de la integración de almacenamiento de Snowflake que creó anteriormente.

Administrador de sistemas, desarrollador

Registre los valores de dos columnas.

Copie y guarde los valores de las columnas storage_aws_iam_user_arn y storage_aws_external_id.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Modifique la política del rol de IAM.

Abra la consola de IAM; y elija Roles. Elija el rol de IAM que creó anteriormente y seleccione la pestaña Relaciones de confianza. Elija Editar relación de confianza. Reemplace snowflake_external_id por el valor storage_aws_external_id que copió anteriormente. Reemplace snowflake_user_arn por el valor storage_aws_iam_user_arn que copió anteriormente. A continuación, seleccione Actualizar la política de confianza.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Active las notificaciones de eventos para el bucket de S3.

Abra la consola de Amazon S3 y elija un bucket. Seleccione Propiedades y, en Configuración avanzada, seleccione Eventos. Seleccione Añadir notificación y especifique un nombre para este evento. Si no introduce un nombre, se generará un identificador único global (GUID) y se utilizará para el nombre.

Administrador de sistemas, desarrollador

Configure las notificaciones de Amazon SNS para el bucket de S3.

En Eventos, elija ObjectCreate (Todos) y, a continuación, elija SQS Queue en la lista desplegable Enviar a. En la lista de SNS, elija Agregar ARN de cola SQS y pegue el valor notification_channel que copió anteriormente. A continuación, elija Guardar.

Administrador de sistemas, desarrollador

Suscriba la cola de SQS de Snowflake al tema de SNS.

Suscriba la cola de SQS de Snowflake al tema de SNS que ha creado. Para obtener más información sobre este paso, consulte la sección Recursos relacionados.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Revise y pruebe Snowpipe.

Inicie sesión en Snowflake y abra la etapa de Snowflake. Coloque los archivos en su bucket de S3 y compruebe si la tabla Snowflake los carga. Amazon S3 enviará notificaciones de SNS a Snowpipe cuando aparezcan nuevos objetos en el bucket de S3.

Administrador de sistemas, desarrollador

Recursos relacionados

Información adicional

Crear un formato de archivo:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

Crear una etapa externa:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

Creación de una tabla:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

Mostrar etapas:

SHOW STAGES;

Creación de una canalización:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

Mostrar canalizaciones:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

Crear una integración de almacenamiento:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

Ejemplo:

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

Para obtener más información sobre este paso, consulte Configuración de una integración de almacenamiento de Snowflake para acceder a Amazon S3 en la documentación de Snowflake.

Describir una integración:

DESC INTEGRATION <integration_name>;

Política de bucket de S3:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }