

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Apache Hudi mit Apache Flink verwenden
<a name="tutorial-hudi-for-flink"></a>

Apache Hudi ist ein Open-Source-Datenmanagement-Framework mit Operationen auf Datensatzebene wie Einfügen, Aktualisieren, Hochsetzen und Löschen, mit denen Sie die Datenverwaltung und die Entwicklung von Datenleitungen vereinfachen können. In Kombination mit effizientem Datenmanagement in Amazon S3 können Sie mit Hudi Daten in Echtzeit aufnehmen und aktualisieren. Hudi verwaltet Metadaten aller Operationen, die Sie mit dem Datensatz ausführen, sodass alle Aktionen atomar und konsistent bleiben. 

Apache Hudi ist auf Amazon EMR auf EKS mit Apache Flink mit Amazon EMR-Versionen 7.2.0 und höher verfügbar. In den folgenden Schritten erfahren Sie, wie Sie beginnen und Apache Hudi-Jobs einreichen können.

## Reichen Sie einen Apache Hudi-Job ein
<a name="tutorial-hudi-for-flink-submit-jobs"></a>

In den folgenden Schritten erfahren Sie, wie Sie einen Apache Hudi-Job einreichen.

1. Erstellen Sie eine AWS Glue-Datenbank mit dem Namen`default`.

   ```
   aws glue create-database --database-input "{\"Name\":\"default\"}"
   ```

1. Folgen Sie dem [SQL-Beispiel für den Flink-Kubernetes-Operator, um die Datei](https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example) zu erstellen. `flink-sql-runner.jar`

1. Erstellen Sie ein Hudi-SQL-Skript wie das Folgende.

   ```
   CREATE CATALOG hudi_glue_catalog WITH (
   'type' = 'hudi',
   'mode' = 'hms',
   'table.external' = 'true',
   'default-database' = 'default',
   'hive.conf.dir' = '/glue/confs/hive/conf/',
   'catalog.path' = 's3://<hudi-example-bucket>/FLINK_HUDI/warehouse/'
   );
   
   USE CATALOG hudi_glue_catalog;
   CREATE DATABASE IF NOT EXISTS hudi_db;
   use hudi_db;
   
   CREATE TABLE IF NOT EXISTS hudi-flink-example-table(
       uuid VARCHAR(20),
       name VARCHAR(10),
       age INT,
       ts TIMESTAMP(3),
       `partition` VARCHAR(20)
   )
   PARTITIONED BY (`partition`)
   WITH (
     'connector' = 'hudi',
     'path' = 's3://<hudi-example-bucket>/hudi-flink-example-table',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'glue',
     'hive_sync.table' = 'hudi-flink-example-table',
     'hive_sync.db' = 'hudi_db',
     'compaction.delta_commits' = '1',
     'hive_sync.partition_fields' = 'partition',
     'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
     'table.type' = 'COPY_ON_WRITE'
   );
   
   EXECUTE STATEMENT SET
   BEGIN
   
   INSERT INTO hudi-flink-example-table VALUES
       ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
       ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
       ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
       ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
       ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
       ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
       ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
       ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
   
   END;
   ```

1. Laden Sie Ihr Hudi-SQL-Skript und die `flink-sql-runner.jar` Datei an einen S3-Speicherort hoch.

1. Stellen `hudi.enabled` Sie in Ihrer `FlinkDeployments` YAML-Datei auf ein. `true`

   ```
   spec:
     flinkConfiguration:
       hudi.enabled: "true"
   ```

1. Erstellen Sie eine YAML-Datei, um Ihre Konfiguration auszuführen. Diese Beispieldatei trägt den Namen`hudi-write.yaml`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: hudi-write-example
   spec:
     flinkVersion: v1_18
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       hudi.enabled: "true"
     executionRoleArn: "<JobExecutionRole>"
     emrReleaseLabel: "emr-7.12.0-flink-latest"
     jobManager:
       highAvailabilityEnabled: false
       replicas: 1
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar
       args: ["/opt/flink/scripts/hudi-write.sql"]
       parallelism: 1
       upgradeMode: stateless
     podTemplate:
       spec:
         initContainers:
           - name: flink-sql-script-download
             args: 
               - s3
               - cp
               - s3://<s3_location>/hudi-write.sql
               - /flink-scripts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-scripts
                 name: flink-scripts
           - name: flink-sql-runner-download
             args: 
               - s3
               - cp
               - s3://<s3_location>/flink-sql-runner.jar
               - /flink-artifacts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-artifacts
                 name: flink-artifact
         containers:
           - name: flink-main-container
             volumeMounts:
               - mountPath: /opt/flink/scripts
                 name: flink-scripts
               - mountPath: /opt/flink/usrlib
                 name: flink-artifact
         volumes:
           - emptyDir: {}
             name: flink-scripts
           - emptyDir: {}
             name: flink-artifact
   ```

1. Senden Sie einen Flink Hudi-Job an den [Flink](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator.html) Kubernetes-Operator.

   ```
   kubectl apply -f hudi-write.yaml
   ```