View a markdown version of this page

Création d'un canal d'entrée ML dans AWS Clean Rooms ML - AWS Clean Rooms

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Création d'un canal d'entrée ML dans AWS Clean Rooms ML

Prérequis :

  • Et Compte AWS avec accès à AWS Clean Rooms

  • Une collaboration configurée dans l' AWS Clean Rooms endroit où vous souhaitez créer le canal d'entrée ML

  • Autorisations permettant d'interroger des données et de créer des canaux d'entrée ML dans le cadre de la collaboration.

  • (Facultatif) Un algorithme de modèle existant à associer au canal d'entrée ML, ou des autorisations pour en créer un nouveau

  • (Facultatif) Tables avec des règles d'analyse qui peuvent être exécutées pour le modèle que vous avez spécifié.

  • (Facultatif) Modèle de requête ou d'analyse SQL existant à utiliser pour générer le jeu de données

  • (Facultatif) Un rôle de service existant avec les autorisations appropriées, ou des autorisations pour créer un nouveau rôle de service

  • (Facultatif) Une AWS KMS clé personnalisée si vous souhaitez utiliser votre propre clé de chiffrement

  • Autorisations appropriées pour créer et gérer des modèles de machine learning dans le cadre de la collaboration

Un canal d'entrée ML est un ensemble de données créé à partir d'une requête de données spécifique. Les membres capables d'interroger des données peuvent préparer leurs données pour l'entraînement et l'inférence en créant un canal d'entrée ML. La création d'un canal d'entrée ML permet d'utiliser ces données dans différents modèles d'entraînement au sein d'une même collaboration. Vous devez créer des canaux d'entrée ML distincts pour l'entraînement et l'inférence.

Pour créer un canal d'entrée ML, vous devez spécifier la requête SQL utilisée pour interroger les données d'entrée et créer le canal d'entrée ML. Les résultats de cette requête ne sont jamais partagés avec aucun membre et restent dans les limites de Clean Rooms ML. Le nom de ressource Amazon (ARN) de référence est utilisé dans les étapes suivantes pour entraîner un modèle ou exécuter une inférence.

Console
Pour créer un canal d'entrée ML (console)
  1. Connectez-vous à la AWS Clean Rooms console AWS Management Console et ouvrez-la à l'adresse https://console.aws.amazon.com/cleanrooms.

  2. Dans le volet de navigation de gauche, sélectionnez Collaborations.

  3. Sur la page Collaborations, choisissez la collaboration dans laquelle vous souhaitez créer un canal d'entrée ML.

  4. Une fois la collaboration ouverte, choisissez l'onglet Modèles ML.

  5. Sous Modèles ML personnalisés, dans la section Canaux d'entrée ML, choisissez Créer un canal d'entrée ML.

  6. Sur la page Créer un canal d'entrée ML, pour les détails du canal d'entrée ML, procédez comme suit :

    1. Dans Nom, entrez un nom unique pour votre chaîne.

    2. (Facultatif) Dans Description, entrez une description de votre chaîne.

    3. Pour Algorithme de modèle associé, sélectionnez l'algorithme à utiliser.

      Choisissez Associer un algorithme de modèle pour en ajouter un nouveau.

  7. Pour Dataset, choisissez une méthode pour générer le jeu de données d'entraînement :

    • Choisissez SQL query pour utiliser les résultats d'une requête SQL comme ensemble de données d'apprentissage.

      Si vous avez choisi SQL query, saisissez votre requête dans le champ de requête SQL.

      (Facultatif) Pour importer une requête que vous avez utilisée récemment, choisissez Importer à partir de requêtes récentes.

    • Choisissez le modèle d'analyse pour utiliser les résultats d'un modèle d'analyse comme jeu de données d'apprentissage.

      Avertissement

      La génération de données synthétiques empêche de déduire des attributs individuels, que des individus spécifiques soient présents dans l'ensemble de données d'origine ou que des attributs d'apprentissage de ces individus soient présents. Cependant, cela n'empêche pas les valeurs littérales de l'ensemble de données d'origine, y compris les informations personnelles identifiables (PII), d'apparaître dans l'ensemble de données synthétique.

      Nous recommandons d'éviter dans le jeu de données d'entrée les valeurs associées à une seule personne concernée, car elles peuvent permettre de réidentifier une personne concernée. Par exemple, si un seul utilisateur vit dans un code postal, la présence de ce code postal dans le jeu de données synthétique confirmera que cet utilisateur figurait dans le jeu de données d'origine. Des techniques telles que la troncation de valeurs de haute précision ou le remplacement de catalogues peu courants par d'autres peuvent être utilisées pour atténuer ce risque. Ces transformations peuvent faire partie de la requête utilisée pour créer le canal d'entrée ML.

    1. Si aucune table n'est associée, choisissez Associer une table pour ajouter des tables avec une règle d'analyse pouvant être exécutée pour le modèle spécifié.

    2. Choisissez le type de travailleur à utiliser lors de la création de ce canal de données. Le type de travailleur par défaut est CR.1X. Spécifiez le nombre de travailleurs à utiliser. Le numéro de travailleur par défaut est 16. Pour définir les propriétés de Spark :

      1. Développez les propriétés Spark.

      2. Choisissez Ajouter des propriétés Spark.

      3. Dans la boîte de dialogue des propriétés de Spark, choisissez un nom de propriété dans la liste déroulante et entrez une valeur.

      Les tableaux suivants fournissent une définition pour chaque propriété.

      Pour plus d'informations sur les propriétés de Spark, consultez la section Propriétés de Spark dans la documentation d'Apache Spark.

      Note

      Vous pouvez configurer un maximum de 50 propriétés Spark. La valeur de chaque propriété peut comporter jusqu'à 500 caractères.

      Nom de propriété Description Valeur par défaut

      Défaillances de Spark.Task.Max

      Contrôle le nombre de fois consécutives qu'une tâche peut échouer avant que la tâche échoue. Nécessite une valeur supérieure ou égale à 1. Le nombre de tentatives autorisées est égal à cette valeur moins 1. Le nombre d'échecs est réinitialisé en cas de réussite d'une tentative. Les échecs liés aux différentes tâches ne s'accumulent pas jusqu'à atteindre cette limite.

      4

      spark.sql.files.max PartitionBytes

      Définit le nombre maximum d'octets à regrouper dans une seule partition lors de la lecture à partir de sources basées sur des fichiers telles que Parquet, JSON et ORC.

      128 MO

      Spark.Hadoop.FS.S3.max Réessaie

      Définit le nombre maximal de tentatives pour les opérations sur les fichiers Amazon S3.

      (aucun)

      spark.network.timeout

      Définit le délai d'expiration par défaut pour toutes les interactions réseau. Remplace les paramètres de délai d'expiration suivants s'ils ne sont pas configurés :

      • spark.storage.block ManagerHeartbeatTimeoutMs

      • Spark.shuffle.io. Délai de connexion

      • Spark.rpc.AskTimeout

      • Spark.rpc. Délai de recherche expiré

      années 120

      spark.rdd.compress

      Spécifie s'il faut compresser les partitions RDD sérialisées à l'aide de spark.io.compression.codec. S'applique à StorageLevel.MEMORY _ONLY_SER en Java et Scala, ou à StorageLevel.MEMORY _ONLY en Python. Réduit l'espace de stockage mais nécessite un temps de traitement supplémentaire du processeur.

      false

      spark.shuffle.spill.compress

      Spécifie s'il faut compresser les données du shuffle spill à l'aide de spark.io.compression.codec.

      true

      spark.shuffle.compress

      Spécifie s'il faut compresser les fichiers de sortie cartographique. La compression utilise spark.io.compression.codec.

      true

      spark.shuffle.service.index.cache.size

      Définit la limite de taille du cache, en octets, sauf indication contraire.

      100 m

      Spark.shuffle.io.Max Réessaie

      Définit le nombre maximal de tentatives pour les extractions qui échouent en raison d' IO-related exceptions.

      3

      spark.shuffle.io.RetryWait

      Définit le temps d'attente entre les nouvelles tentatives d'extraction. Le délai maximal causé par une nouvelle tentative est de 15 secondes par défaut, calculé comme MaxRetries * RetryWait.

      5s

      Spark.shuffle.io. Délai de connexion

      Définit le délai d'attente pour que les connexions établies entre les serveurs shuffle et les clients soient marquées comme inactives et fermées s'il y a toujours des demandes de récupération en suspens mais aucun trafic sur le canal.

      (valeur de spark.network.timeout)

      spark.driver.max ResultSize

      Définit la limite de taille totale des résultats sérialisés de toutes les partitions pour chaque action Spark, en octets. Doit être d'au moins 1 million, ou 0 pour un nombre illimité.

      1 g

      spark.memory.fraction

      Définit la fraction (espace de stockage - 300 Mo) utilisée pour l'exécution et le stockage. Plus cette valeur est faible, plus les fuites et les expulsions de données mises en cache sont fréquentes. Il est recommandé de laisser cette valeur à la valeur par défaut.

      0.6

      spark.scheduler.mode

      Définit le mode de planification entre les tâches soumises à une même tâche SparkContext. Peut être réglé sur FAIR pour utiliser le partage équitable au lieu de mettre les tâches en file d'attente les unes après les autres. Valeurs soutenues : FAIR, FIFO.

      FIFO

      spark.sql.adaptive.advisory PartitionSizeInBytes

      Définit la taille cible en octets pour les partitions shuffle lors de l'optimisation adaptative lorsque spark.sql.adaptive.enabled est vrai. Contrôle la taille de la partition lors de la fusion de petites partitions ou du fractionnement de partitions asymétriques.

      (valeur de PostShuffleInputSize spark.sql.adaptive.shuffle.target)

      spark.sql.adaptive.auto BroadcastJoinThreshold

      Définit la taille maximale de la table en octets pour la diffusion vers les nœuds de travail lors des jointures. S'applique uniquement dans le cadre adaptatif. Utilise la même valeur par défaut que BroadcastJoinThreshold spark.sql.auto. Réglez sur -1 pour désactiver la diffusion.

      (aucun)

      spark.sql.adaptive.coalesce Partitions.enabled

      Spécifie s'il faut fusionner les partitions shuffle contiguës sur la base de spark.sql.adaptive.advisory afin d'optimiser la taille des tâches. PartitionSizeInBytes Nécessite que spark.sql.adaptive.enabled soit vrai.

      true

      spark.sql.adaptive.coalesce Partitions.initialPartitionNum

      Définit le nombre initial de partitions shuffle avant la fusion. Nécessite que spark.sql.adaptive.enabled et spark.sql.adaptive.coalesce soient vrais. Partitions.enabled La valeur par défaut est spark.sql.shuffle.partitions.

      (aucun)

      spark.sql.adaptive.coalesce Partitions.minPartitionSize

      Définit la taille minimale des partitions shuffle coalescées afin d'éviter que les partitions ne deviennent trop petites lors de l'optimisation adaptative.

      1 Mo

      spark.sql.adaptive.coalesce Partitions.parallelismFirst

      Spécifie s'il faut calculer la taille des partitions en fonction du parallélisme des clusters au lieu de PartitionSizeInBytes spark.sql.adaptive.advisory lors de la fusion des partitions. Génère des tailles de partition inférieures à la taille cible configurée afin de maximiser le parallélisme. Nous vous recommandons de définir ce paramètre sur false sur les clusters occupés afin d'améliorer l'utilisation des ressources en évitant de trop petites tâches.

      true

      spark.sql.adaptive.enabled

      Spécifie s'il faut activer l'exécution adaptative des requêtes afin de réoptimiser les plans de requêtes pendant l'exécution des requêtes, sur la base de statistiques d'exécution précises.

      true

      spark.sql.adaptive.force OptimizeSkewedJoin

      Spécifie s'il faut forcer l'activation OptimizeSkewedJoin même si cela introduit un shuffle supplémentaire.

      false

      spark.sql.adaptive.local ShuffleReader.enabled

      Spécifie s'il faut utiliser des lecteurs de shuffle locaux lorsque le partitionnement aléatoire n'est pas requis, par exemple après la conversion des jointures par tri-fusion en jointures par hachage par diffusion. Nécessite que spark.sql.adaptive.enabled soit vrai.

      true

      spark.sql.adaptive.max ShuffledHashJoinLocalMapThreshold

      Définit la taille de partition maximale en octets pour la création de cartes de hachage locales. Privilégie les jointures par hachage mélangées par rapport aux jointures par tri-fusion lorsque :

      • Cette valeur est égale ou supérieure à spark.sql.adaptive.advisory PartitionSizeInBytes

      • Toutes les tailles de partition se situent dans cette limite

      Remplace le paramètre SortMergeJoin spark.sql.join.prefer.

      0 octet

      spark.sql.adaptive.optimize SkewsInRebalancePartitions.enabled

      Spécifie s'il faut optimiser les partitions shuffle asymétriques en les divisant en partitions plus petites sur la base de spark.sql.adaptive.advisory. PartitionSizeInBytes Nécessite que spark.sql.adaptive.enabled soit vrai.

      true

      spark.sql.adaptive.rebalance PartitionsSmallPartitionFactor

      Définit le facteur de seuil de taille pour la fusion des partitions lors du fractionnement. Les partitions inférieures à ce facteur multiplié par PartitionSizeInBytes spark.sql.adaptive.advisory sont fusionnées.

      0.2

      spark.sql.adaptive.skew Join.enabled

      Spécifie s'il faut gérer l'inclinaison des données dans les jointures mélangées en divisant et éventuellement en répliquant les partitions asymétriques. S'applique aux jointures par tri-fusion et aux jointures de hachage mélangées. Nécessite que spark.sql.adaptive.enabled soit vrai.

      true

      spark.sql.adaptive.skew Join.skewedPartitionFactor

      Détermine le facteur de taille qui détermine l'inclinaison de la partition. Une partition est inclinée lorsque sa taille dépasse les deux valeurs suivantes :

      • Ce facteur multiplié par la taille médiane de la partition

      • La valeur de spark.sql.adaptive.skew Join.skewedPartitionThresholdInBytes

      5

      spark.sql.adaptive.skew Join.skewedPartitionThresholdInBytes

      Définit le seuil de taille en octets pour identifier les partitions asymétriques. Une partition est inclinée lorsque sa taille dépasse les deux valeurs suivantes :

      • Ce seuil

      • La taille de partition médiane multipliée par spark.sql.adaptive.skew Join.skewedPartitionFactor

      Nous recommandons de définir cette valeur à une valeur supérieure à PartitionSizeInBytes spark.sql.adaptive.advisory.

      256 MO

      Spark.SQL. Délai de diffusion

      Contrôle le délai d'expiration en secondes pour les opérations de diffusion lors des jointures de diffusion.

      300 secondes

      spark.sql.cbo.enabled

      Spécifie s'il faut activer l'optimisation basée sur les coûts (CBO) pour l'estimation des statistiques du plan.

      false

      spark.sql.cbo.join Reorder.dp.star.filter

      Spécifie s'il faut appliquer l'heuristique du filtre de jointure en étoile lors de l'énumération des jointures basée sur les coûts.

      false

      spark.sql.cbo.join Reorder.dp.threshold

      Définit le nombre maximum de nœuds joints autorisés dans l'algorithme de programmation dynamique.

      12

      spark.sql.cbo.join Reorder.enabled

      Spécifie s'il faut activer la réorganisation des jointures dans le cadre de l'optimisation basée sur les coûts (CBO).

      false

      spark.sql.cbo.plan Stats.enabled

      Spécifie s'il faut extraire le nombre de lignes et les statistiques des colonnes du catalogue lors de la génération du plan logique.

      false

      spark.sql.cbo.star SchemaDetection

      Spécifie s'il faut activer la réorganisation des jointures en fonction de la détection du schéma en étoile.

      false

      spark.sql.files.max PartitionNum

      Définit le nombre maximal cible de partitions de fichiers fractionnées pour les sources basées sur des fichiers (Parquet, JSON et ORC). Redimensionne les partitions lorsque le nombre initial dépasse cette valeur. Il s'agit d'une cible suggérée et non d'une limite garantie.

      (aucun)

      spark.sql.files.max RecordsPerFile

      Définit le nombre maximum d'enregistrements à écrire dans un seul fichier. Aucune limite ne s'applique lorsqu'il est réglé sur zéro ou sur une valeur négative.

      0

      spark.sql.files.min PartitionNum

      Définit le nombre minimum cible de partitions de fichiers fractionnées pour les sources basées sur des fichiers (Parquet, JSON et ORC). La valeur par défaut est spark.sql.leafNodeDefaultParallelism. Il s'agit d'une cible suggérée et non d'une limite garantie.

      (aucun)

      spark.sql.in MemoryColumnarStorage.batchSize

      Contrôle la taille du lot pour la mise en cache en colonnes. L'augmentation de la taille améliore l'utilisation de la mémoire et la compression, mais augmente le risque d'erreurs liées au manque de mémoire.

      10 000

      spark.sql.in MemoryColumnarStorage.compressed

      Spécifie s'il faut sélectionner automatiquement les codecs de compression pour les colonnes en fonction des statistiques de données.

      true

      spark.sql.in MemoryColumnarStorage.enableVectorizedReader

      Spécifie s'il faut activer la lecture vectorisée pour la mise en cache en colonnes.

      true

      spark.sql.legacy.allow HashOnMapType

      Spécifie s'il faut autoriser les opérations de hachage sur les structures de données de type carte. Cet ancien paramètre assure la compatibilité avec la gestion des types de carte dans les anciennes versions de Spark.

      (aucun)

      spark.sql.legacy.allow NegativeScaleOfDecimal

      Spécifie s'il faut autoriser les valeurs d'échelle négatives dans les définitions de type décimal. Cet ancien paramètre maintient la compatibilité avec les anciennes versions de Spark qui prenaient en charge les échelles décimales négatives.

      (aucun)

      spark.sql.legacy.cast ComplexTypesToString.enabled

      Spécifie s'il faut activer le comportement existant pour convertir des types complexes en chaînes. Maintient la compatibilité avec les règles de conversion de type des anciennes versions de Spark.

      (aucun)

      spark.sql.legacy.char VarcharAsString

      Spécifie s'il faut traiter les types CHAR et VARCHAR comme des types STRING. Cet ancien paramètre assure la compatibilité avec la gestion des types de chaînes des anciennes versions de Spark.

      (aucun)

      spark.sql.legacy.create EmptyCollectionUsingStringType

      Spécifie s'il faut créer des collections vides à l'aide d'éléments de type chaîne. Cet ancien paramètre assure la compatibilité avec le comportement d'initialisation des collections des anciennes versions de Spark.

      (aucun)

      spark.sql.legacy.exposant LiteralAsDecimal.enabled

      Spécifie s'il faut interpréter les littéraux exponentiels comme des types décimaux. Cet ancien paramètre assure la compatibilité avec la gestion littérale numérique des anciennes versions de Spark.

      (aucun)

      spark.sql.legacy.json.allow EmptyString.enabled

      Spécifie s'il faut autoriser les chaînes vides dans le traitement JSON. Cet ancien paramètre maintient la compatibilité avec le comportement d'analyse JSON des anciennes versions de Spark.

      (aucun)

      spark.sql.legacy.parquet.int96 RebaseModelRead

      Spécifie s'il faut utiliser l'ancien mode de rebase de l'horodatage INT96 lors de la lecture des fichiers Parquet. Cet ancien paramètre assure la compatibilité avec la gestion des horodatages des anciennes versions de Spark.

      (aucun)

      spark.sql.legacy.time ParserPolicy

      Contrôle le comportement d'analyse temporelle à des fins de rétrocompatibilité. Cet ancien paramètre détermine la manière dont les horodatages et les dates sont analysés à partir de chaînes.

      (aucun)

      spark.sql.legacy.type Coercion.datetimeToString.enabled

      Spécifie s'il faut activer le comportement de coercition de type existant lors de la conversion de valeurs date/heure en chaînes. Maintient la compatibilité avec les règles de conversion date/heure des anciennes versions de Spark.

      (aucun)

      spark.sql.max SinglePartitionBytes

      Définit la taille maximale de la partition en octets. Le planificateur introduit des opérations de shuffle pour les partitions plus grandes afin d'améliorer le parallélisme.

      128 m

      spark.sql.MetadatacachTl Seconds

      Contrôle la durée de vie (TTL) des caches de métadonnées. S'applique aux métadonnées des fichiers de partition et aux caches du catalogue de sessions. Nécessite :

      • Une valeur positive supérieure à zéro

      • spark.sql.CatalogImplementation défini sur hive

      • PartitionFileCacheSize spark.sql.hive.filesource supérieur à zéro

      • FilesourcePartitions spark.sql.hive.manage défini sur true

      -1000 ms

      spark.sql.optimizer.collapse ProjectAlwaysInline

      Spécifie s'il faut réduire les projections adjacentes et les expressions en ligne, même si cela entraîne une duplication.

      false

      spark.sql.optimizer.dynamic PartitionPruning.enabled

      Spécifie s'il faut générer des prédicats pour les colonnes de partition utilisées comme clés de jointure.

      true

      spark.sql.optimizer.enable CsvExpressionOptimization

      Spécifie s'il faut optimiser les expressions CSV dans l'optimiseur SQL en supprimant les colonnes inutiles des opérations from_csv.

      true

      spark.sql.optimizer.enable JsonExpressionOptimization

      Spécifie s'il faut optimiser les expressions JSON dans l'optimiseur SQL en :

      • Supprimer les colonnes inutiles des opérations from_json

      • Simplification des combinaisons from_json et to_json

      • Optimisation des opérations named_struct

      true

      Spark.SQL.Optimizer.Règles exclues

      Définit les règles d'optimisation à désactiver, identifiées par des noms de règles séparés par des virgules. Certaines règles ne peuvent pas être désactivées car elles sont nécessaires pour être correctes. L'optimiseur enregistre les règles qui ont été désactivées avec succès.

      (aucun)

      spark.sql.optimizer.runtime.bloom Filter.applicationSideScanSizeThreshold

      Définit la taille de scan agrégée minimale en octets requise pour injecter un filtre Bloom côté application.

      10 GO

      spark.sql.optimizer.runtime.bloom Filter.creationSideThreshold

      Définit le seuil de taille maximale pour injecter un filtre Bloom côté création.

      10 MO

      spark.sql.optimizer.runtime.bloom Filter.enabled

      Spécifie s'il faut insérer un filtre Bloom pour réduire les données de mélange lorsqu'un côté d'une jointure aléatoire comporte un prédicat sélectif.

      true

      spark.sql.optimizer.runtime.bloom Filter.expectedNumItems

      Définit le nombre par défaut d'éléments attendus dans le filtre Bloom d'exécution.

      1000000

      spark.sql.optimizer.runtime.bloom Filter.maxNumBits

      Définit le nombre maximum de bits autorisés dans le filtre Bloom d'exécution.

      67108864

      spark.sql.optimizer.runtime.bloom Filter.maxNumItems

      Définit le nombre maximum d'éléments attendus autorisés dans le filtre Bloom d'exécution.

      4000000

      spark.sql.optimizer.runtime.bloom Filter.numBits

      Définit le nombre de bits par défaut utilisés dans le filtre Bloom d'exécution.

      8388608

      spark.sql.optimizer.runtime.rowlevel OperationGroupFilter.enabled

      Spécifie s'il faut activer le filtrage des groupes d'exécution pour les opérations au niveau des lignes. Permet aux sources de données de :

      • Réduisez des groupes entiers de données (tels que des fichiers ou des partitions) à l'aide de filtres de source de données

      • Exécuter des requêtes d'exécution pour identifier les enregistrements correspondants

      • Supprimez les groupes inutiles pour éviter des réécritures coûteuses

      Limites:

      • Toutes les expressions ne peuvent pas être converties en filtres de source de données

      • Certaines expressions nécessitent une évaluation Spark (telles que les sous-requêtes)

      true

      spark.sql.optimizer.runtime Filter.number.threshold

      Définit le nombre total de filtres d'exécution injectés (non DPP). Cela permet d'éviter que les pilotes OOM ne contiennent trop de filtres Bloom.

      10

      spark.sql.optimizer.runtime Filter.semiJoinReduction.enabled

      Spécifie s'il faut insérer une semi-jointure pour réduire les données de mélange lorsqu'un côté d'une jointure aléatoire comporte un prédicat sélectif.

      false

      spark.sql.parquet.AggregatePushdown

      Spécifie s'il faut transférer les agrégats vers Parquet à des fins d'optimisation. Supporte :

      • MIN et MAX pour les types booléen, entier, flottant et date

      • COUNT pour tous les types de données

      Lance une exception si les statistiques sont absentes du pied de page d'un fichier Parquet.

      false

      spark.sql.parquet.columnar ReaderBatchSize

      Contrôle le nombre de lignes dans chaque lot de lecteurs vectorisés Parquet. Choisissez une valeur qui équilibre le surcoût des performances et l'utilisation de la mémoire afin d'éviter les erreurs liées à l'épuisement de la mémoire.

      4096

      spark.sql.parquet.enable VectorizedReader

      Spécifie s'il faut activer le décodage vectoriel du parquet.

      true

      spark.sql.shuffle.partitions

      Définit le nombre de partitions par défaut pour le brassage des données lors des jointures ou des agrégations. Ne peut pas être modifié entre les redémarrages de requêtes de streaming structurées à partir du même emplacement de point de contrôle.

      200

      spark.sql.shuffled HashJoinFactor

      Définit le facteur de multiplication utilisé pour déterminer l'éligibilité à la jointure par hachage aléatoire. Une jointure par hachage aléatoire est sélectionnée lorsque la taille des données du petit côté multipliée par ce facteur est inférieure à la taille des données du grand côté.

      3

      spark.sql.sources.parallel PartitionDiscovery.threshold

      Définit le nombre maximum de chemins pour la liste des fichiers côté pilote avec des sources basées sur des fichiers (Parquet, JSON et ORC). En cas de dépassement lors de la découverte de partitions, les fichiers sont répertoriés à l'aide d'une tâche distribuée Spark distincte.

      32

      spark.sql.statistics.histogram.enabled

      Spécifie s'il faut générer des histogrammes à hauteur égale lors du calcul des statistiques des colonnes afin d'améliorer la précision de l'estimation. Nécessite une analyse de table supplémentaire en plus de celle requise pour les statistiques de base sur les colonnes.

      false

      spark.dynamic Allocation.executorIdleTimeout

      Définit la durée pendant laquelle un exécuteur doit être inactif avant d'être supprimé lorsque l'allocation dynamique est activée.

      Années 60

      spark.dynamic Allocation.schedulerBacklogTimeout

      Définit la durée pendant laquelle les tâches en attente doivent être retardées avant que de nouveaux exécuteurs ne soient demandés lorsque l'allocation dynamique est activée.

      1s

      spark.dynamic Allocation.sustainedSchedulerBacklogTimeout

      Identique à spark.dynamicAllocation.schedulerBacklogTimeout, mais utilisé uniquement pour les requêtes d'exécuteur suivantes.

      (valeur de spark.dynamicAllocation.schedulerBacklogTimeout)

      spark.scheduler.min RegisteredResourcesRatio

      Définit le ratio minimum de ressources enregistrées (ressources enregistrées/total des ressources attendues) à attendre avant le début de la planification. Spécifié sous la forme d'un double compris entre 0,0 et 1,0. Que le ratio minimum de ressources ait été atteint ou non, le temps maximum d'attente avant le début de la planification est contrôlé par RegisteredResourcesWaitingTime spark.scheduler.max.

      0.8

      spark.scheduler.max RegisteredResourcesWaitingTime

      Définit le délai maximal d'attente pour que les ressources soient enregistrées avant le début de la planification.

      Années 30

      spark.sql.hive.metastore PartitionPruningFallbackOnException

      Spécifie s'il convient de revenir à l'extraction de toutes les partitions depuis le métastore Hive et de procéder à l'élagage des partitions côté client Spark en cas de rencontre MetaException depuis le métastore.

      false

      Nom de propriété Description Valeur par défaut

      spark.sql.auto BroadcastJoinThreshold

      Définit la taille maximale de la table en octets pour la diffusion vers les nœuds de travail lors des jointures. Réglez sur -1 pour désactiver la diffusion.

      10 Mo (-1 pour CR.4X 32 travailleurs)

      spark.dynamic Allocation.enabled

      Spécifie s'il faut utiliser l'allocation dynamique des ressources, qui augmente ou diminue le nombre d'exécuteurs enregistrés auprès de cette application en fonction de la charge de travail.

      true

      codec spark.io de compression

      Définit le codec utilisé pour compresser les données internes telles que les partitions RDD, le journal des événements, les variables de diffusion et les sorties shuffle. Valeurs prises en charge : lz4, snappy, zstd, gzip.

      snappy

      spark.sql.session.TimeZone

      Définit le fuseau horaire de session pour la gestion des horodatages sous forme de chaînes littérales et pour la conversion d'objets Java. Accepte :

      • Region-based Identifiants au area/city format (par exemple America/Los _Angeles)

      • Décalages de zone au HH:mm:ss format (+/-) HH, (+/-) HH:mm ou (+/-) (tel que -08 ou + 01:00)

      • UTC ou Z comme alias pour + 00:00

      UTC

    3. Pour Conservation des données en jours, entrez le nombre de jours pendant lesquels les données sont conservées.

    4. Pour le format des résultats, choisissez CSV ou Parquet comme format de données que le canal d'entrée ML doit utiliser.

  8. Pour l'accès au service, choisissez le nom du rôle de service existant qui sera utilisé pour accéder à cette table ou choisissez Créer et utiliser un nouveau rôle de service.

  9. Pour le chiffrement, choisissez le secret de chiffrement avec une clé KMS personnalisée pour spécifier votre propre clé KMS et les informations associées. Sinon, Clean Rooms ML gérera le chiffrement.

  10. (Facultatif) Pour Compute payer, sélectionnez le membre de la collaboration qui prend en charge les coûts de calcul des requêtes.

    Note

    S'il n'existe qu'un seul payeur candidat pour le calcul des requêtes dans le cadre de la collaboration, c'est ce payeur par défaut.

  11. (Facultatif) Pour le payeur de génération de données synthétiques, sélectionnez le membre de la collaboration qui prend en charge les coûts de génération de données synthétiques.

    Note

    Cette option apparaît lorsque le canal d'entrée ML utilise un modèle d'analyse configuré pour la sortie de données synthétiques. S'il n'y a qu'un seul payeur candidat à la génération de données synthétiques dans le cadre de la collaboration, c'est ce payeur par défaut.

  12. Choisissez Create ML input channel.

    La création du canal d'entrée ML prendra quelques minutes. Vous pouvez voir une liste des canaux d'entrée ML dans l'onglet Modèles ML.

Note

Une fois le canal d'entrée ML créé, vous ne pouvez pas le modifier.

API

Pour créer un canal d'entrée ML (API)

Exécutez le code suivant avec vos paramètres spécifiques :

import boto3 acr_client = boto3.client('cleanroomsml') acr_client.create_ml_input_channel( name="ml_input_channel_name", membershipIdentifier='membership_id', configuredModelAlgorithmAssociations=[configured_model_algorithm_association_arn], retentionInDays=1, inputChannel={ "dataSource": { "protectedQueryInputParameters": { "sqlParameters": { "queryString": "select * from table", "computeConfiguration": { "worker": { "type": "CR.1X", "number": 16, "properties": { "spark": { "spark configuration key": "spark configuration value", } } } }, "resultFormat": "PARQUET" } } }, "roleArn": "arn:aws:iam::111122223333:role/role_name" } ) channel_arn = resp['ML Input Channel ARN']