

# Use a cluster with Iceberg
<a name="emr-iceberg-use-cluster"></a>

This section includes information for using Iceberg with Spark, Trino, Flink, and Hive.

# Use an Iceberg cluster with Spark
<a name="emr-iceberg-use-spark-cluster"></a>

Starting with Amazon EMR version 6.5.0, you can use Iceberg with your Spark cluster with no requirement to include bootstrap actions. For Amazon EMR versions 6.4.0 and earlier, you can use a bootstrap action to pre-install all necessary dependencies.

In this tutorial, you use the AWS CLI to work with Iceberg on an Amazon EMR Spark cluster. To use the console to create a cluster with Iceberg installed, follow the steps in [Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/).

## Create an Iceberg cluster
<a name="emr-iceberg-create-cluster"></a>

You can create a cluster with Iceberg installed using the AWS Management Console, the AWS CLI or the Amazon EMR API. In this tutorial, you use the AWS CLI to work with Iceberg on an Amazon EMR cluster. To use the console to create a cluster with Iceberg installed, follow the steps in [Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/). 

To use Iceberg on Amazon EMR with the AWS CLI, first create a cluster with the following steps. For information on specifying the Iceberg classification using the AWS CLI, see [Supply a configuration using the AWS CLI when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli) or [Supply a configuration using the Java SDK when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk).

1. Create a `configurations.json` file with the following content:

   ```
   [{
       "Classification":"iceberg-defaults",
       "Properties":{"iceberg.enabled":"true"}
   }]
   ```

1. Next, create a cluster with the following configuration. Replace the example Amazon S3 bucket path and the subnet ID with your own.

   ```
   aws emr create-cluster --release-label emr-6.5.0 \
   --applications Name=Spark \
   --configurations file://configurations.json \
   --region us-east-1 \
   --name My_Spark_Iceberg_Cluster \
   --log-uri s3://amzn-s3-demo-bucket/ \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

Alternatively, you can create an Amazon EMR cluster including the Spark application and include the file `/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` as a JAR dependency in a Spark job. For more information, see [Submitting Applications](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications).

To include the jar as a dependency in a Spark job, add the following configuration property to the Spark application:

```
--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
```

For more information about Spark job dependencies, see [Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management) in the Apache Spark document [Running Spark on Kubernetes](https://spark.apache.org/docs/latest/running-on-kubernetes.html).

## Initialize a Spark session for Iceberg
<a name="emr-iceberg-initialize-spark-session"></a>

The following examples demonstrate how to launch the interactive Spark shell, use Spark submit, or use Amazon EMR Notebooks to work with Iceberg on Amazon EMR.

------
#### [ spark-shell ]

1. Connect to the master node using SSH. For more information, see [Connect to the master node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) in the *Amazon EMR Management Guide*.

1. Enter the following command to launch the Spark shell. To use the PySpark shell, replace `spark-shell` with `pyspark`.

   ```
   spark-shell \
       --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket/prefix/
       --conf spark.sql.catalog.my_catalog.type=glue \
       --conf spark.sql.defaultCatalog=my_catalog \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ spark-submit ]

1. Connect to the master node using SSH. For more information, see [Connect to the master node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) in the *Amazon EMR Management Guide*.

1. Enter the following command to launch the Spark session for Iceberg.

   ```
   spark-submit \
   --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
   --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
   --conf spark.sql.catalog.my_catalog.type=glue \
   --conf spark.sql.defaultCatalog=my_catalog \
   --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   ```

------
#### [ EMR Studio notebooks ]

To initialize a Spark session using EMR Studio notebooks, configure your Spark session using the `%%configure` magic command in your Amazon EMR notebook, as in the following example. For more information, see [Use EMR Notebooks magics](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics) in the *Amazon EMR Management Guide*.

```
%%configure -f{
"conf":{
    "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.my_catalog.type":"glue",
    "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
    "spark.sql.defaultCatalog":"my_catalog",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
}
```

------
#### [ CLI ]

To initialize a Spark cluster using the CLI and set all of the Spark Iceberg session default configurations, run the following sample. For more information about specifying a configuration classification using the AWS CLI and Amazon EMR API, see [Configure applications](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html).

```
[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.my_catalog.type":"glue",
      "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/",
      "spark.sql.defaultCatalog":"my_catalog",
      "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
  }
]
```

------

## Write to an Iceberg table
<a name="emr-iceberg-write-to-table"></a>

The following example shows how to create a DataFrame and write it as an Iceberg dataset. The examples demonstrate working with datasets using the Spark shell while connected to the master node using SSH as the default hadoop user.

**Note**  
To paste code samples into the Spark shell, type `:paste` at the prompt, paste the example, and then press `CTRL+D`.

------
#### [ PySpark ]

Spark includes a Python-based shell, `pyspark`, that you can use to prototype Spark programs written in Python. Invoke `pyspark` on the master node.

```
## Create a DataFrame.
data = spark.createDataFrame([
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
],["id", "creation_date", "last_update_time"])

## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._

// Create a DataFrame.
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")

// Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""")

data.writeTo("dev.db.iceberg_table").append()
```

------

## Read from an Iceberg table
<a name="emr-iceberg-read-from-table"></a>

------
#### [ PySpark ]

```
df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Scala ]

```
val df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
```

------
#### [ Spark SQL ]

```
SELECT * from dev.db.iceberg_table LIMIT 10
```

------

## Using AWS Glue Data Catalog with Spark Iceberg
<a name="emr-iceberg-glue-catalog-config-spark"></a>

You can connect to AWS Glue Data Catalog from Spark Iceberg. This section shows different commands for connecting.

### Connect to the default AWS Glue catalog in your default region
<a name="emr-iceberg-glue-catalog-config-spark"></a>

This sample shows how to connect, using the Glue catalog type. If you don't specify a catalog ID, it uses the default:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

### Connect to an AWS Glue catalog with a specific catalog ID
<a name="emr-iceberg-glue-catalog-config-spark"></a>

This sample shows how to connect, using a catalog ID:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.catalog.my_catalog.glue.id=AWS Glue catalog ID \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

This command can be used for connecting to an AWS Glue catalog in a different account, or to an RMS catalog, or to a federated catalog.

## Using Iceberg REST Catalog (IRC) with Spark Iceberg
<a name="emr-iceberg-rest-catalog-config"></a>

The sections that follow detail how to configure Iceberg integration with a catalog.

### Connect to AWS Glue Data Catalog IRC endpoint
<a name="emr-iceberg-rest-catalog-config-gdc"></a>

The following shows a sample `spark-submit` command for using Iceberg REST:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=glue catalog ID \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=glue endpoint URI/iceberg \
    --conf spark.sql.catalog.my_catalog.rest.sigv4-enabled=true \
    --conf spark.sql.catalog.my_catalog.rest.signing-name=glue \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

To use it on a runtime-role enabled cluster, the following additional spark configuration settings are necessary:

```
"spark.hadoop.fs.s3.credentialsResolverClass": "software.amazon.glue.GlueTableCredentialsResolver",
"spark.hadoop.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.hadoop.glue.id": glue catalog ID
"spark.hadoop.glue.endpoint": "glue endpoint"
```

For AWS Glue endpoint URL list for each region, see [AWS Glue endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/glue.html).

### Connect to an arbitrary IRC endpoint
<a name="emr-iceberg-rest-catalog-config-arbitrary"></a>

The following shows a sample `spark-submit` command for using an IRC endpoint:

```
spark-submit \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=warehouse name \
    --conf spark.sql.catalog.my_catalog.type=rest \
    --conf spark.sql.catalog.my_catalog.uri=your rest endpoint \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
```

## Configuration differences when you use Iceberg SparkCatalog versus SparkSessionCatalog
<a name="emr-iceberg-spark-catalog"></a>

Iceberg makes available two ways to create Spark Iceberg catalogs. You can set the Spark configuration to either `SparkCatalog` or to `SparkSessionCatalog`. 

### Using Iceberg SparkCatalog
<a name="emr-iceberg-spark-catalog-spark-catalog"></a>

The following shows the command for using **SparkCatalog** as the Spark Iceberg catalog:

```
spark-shell \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.defaultCatalog=my_catalog
```

Considerations for this approach:
+ You can access Iceberg tables but no other tables.
+ The catalog name cannot be **spark\$1catalog**. This is the name of the initial catalog in Spark. It always connects to a Hive metastore. It is the default catalog in Spark, unless the user overwrites it using `spark.sql.defaultCatalog`.
+ You can set the `spark.sql.defaultCatalog` to your catalog name to make that the default catalog.

### Using Iceberg SparkSessionCatalog
<a name="emr-iceberg-spark-catalog-spark-session"></a>

The following shows the command for using **SparkSessionCatalog** as the Spark Iceberg catalog:

```
spark-shell \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \
    --conf spark.sql.catalog.spark_catalog.type=glue
```

Considerations for this approach:
+ If a table is not found as an Iceberg table, Spark will try to see if it is a table in the Hive metastore. See [Using the AWS Glue Data Catalog as the catalog for Hive](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html) for more information.
+ The catalog name must be **spark\$1catalog**.

## Using Iceberg Spark extensions
<a name="emr-iceberg-spark-catalog-extensions"></a>

Iceberg offers Spark extension `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions` that users can set through the Spark extensions config `spark.sql.extensions`. The extensions enable key Iceberg features such as row level DELETE, UPDATE and MERGE, Iceberg-specific Spark data-definition language statements and procedures, such as compaction, snapshot expiration, branching and tagging, and so on. See the following for more details:
+ Iceberg Spark write extensions: [Spark Writes](https://iceberg.apache.org/docs/nightly/spark-writes/)
+ Iceberg Spark DDL extensions: [ALTER TABLE SQL extensions](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)
+ Iceberg Spark procedure extensions: [Spark Procedures](https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-sql-extensions/)

## Considerations for using Iceberg with Spark
<a name="spark-considerations-catalog"></a>
+ Amazon EMR 6.5.0 does not support Iceberg running on Amazon EMR on EKS by default. An Amazon EMR 6.5.0 custom image is available so that you can pass `--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar` as a `spark-submit` parameter to create Iceberg tables on Amazon EMR on EKS. For more information, see [Submit a Spark workload in Amazon EMR using a custom image](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-steps.html#docker-custom-images-submit) in the *Amazon EMR on EKS Development Guide*. You can also contact Support for assistance. Starting with Amazon EMR 6.6.0, Iceberg is supported on Amazon EMR on EKS.
+ When using AWS Glue as a catalog for Iceberg, make sure the database in which you are creating a table exists in AWS Glue. If you are using services such as AWS Lake Formation and you're unable to load the catalog, make sure you have proper access to the service to execute the command.
+ If you use Iceberg SparkSessionCatalog, as described in [Configuration differences when you use Iceberg SparkCatalog versus SparkSessionCatalog](#emr-iceberg-spark-catalog), you must follow the configuration steps described in [Configure AWS Glue Data Catalog as the Apache Hive metastore](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-glue.html), in addition to configuring Spark Iceberg AWS Glue Data Catalog settings.

# Use an Iceberg cluster with Trino
<a name="emr-iceberg-use-trino-cluster"></a>

Starting with Amazon EMR version 6.6.0, you can use Iceberg with your Trino cluster. 

In this tutorial, you use the AWS CLI to work with Iceberg on an Amazon EMR Trino cluster. To use the console to create a cluster with Iceberg installed, follow the steps in [Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs//big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/).

## Create an Iceberg cluster
<a name="emr-iceberg-create-cluster-trino"></a>

To use Iceberg on Amazon EMR with the AWS CLI, first create a cluster with the following steps. For information on specifying the Iceberg classification using the AWS CLI, see [Supply a configuration using the AWS CLI when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli) or [Supply a configuration using the Java SDK when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk).

1. Create an `configurations.json` file with the following content. For example, if you want to use the Hive metastore as your catalog, your file should have the following content.

   ```
   [
     {
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "hive.metastore.uri": "thrift://localhost:9083"
       }
     }
   ]
   ```

   If you want to use the AWS Glue Data Catalog as your store, your file should have the following content.

   ```
   [
     {
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "iceberg.catalog.type": "glue"
       }
     }
   ]
   ```

   Starting with Amazon EMR 7.7.0, include the property *fs.native-s3.enabled=true*

   ```
   [
     { 
       "Classification": "trino-connector-iceberg",
       "Properties": {
         "connector.name": "iceberg",
         "iceberg.catalog.type": "glue",
         "fs.native-s3.enabled": "true"
       }           
     }                 
   ]
   ```

1. Create a cluster with the following configuration, replacing the example Amazon S3 bucket path and key name with your own.

   ```
   aws emr create-cluster --release-label emr-6.7.0 \
   --applications Name=Trino \
   --region us-east-1 \
   --name My_Trino_Iceberg_Cluster \
   --log-uri s3://amzn-s3-demo-bucket \
   --configurations file://configurations.json \
   --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.4xlarge InstanceGroupType=CORE,InstanceCount=3,InstanceType=c3.4xlarge \ 
   --use-default-roles \
   --ec2-attributes KeyName=<key-name>
   ```

## Initialize a Trino session for Iceberg
<a name="emr-iceberg-initialize-trino"></a>

To initialize a Trino session, run the following command.

```
trino-cli --catalog iceberg
```

## Write to an Iceberg table
<a name="emr-iceberg-write-to-table-trino"></a>

Create and write to your table with the following SQL commands.

```
trino> SHOW SCHEMAS;
trino> CREATE TABLE default.iceberg_table (
            id int,
            data varchar,
            category varchar)
       WITH (
            format = 'PARQUET',
            partitioning = ARRAY['category', 'bucket(id, 16)'],
            location = 's3://amzn-s3-demo-bucket/<prefix>')
          
trino> INSERT INTO default.iceberg_table VALUES (1,'a','c1'), (2,'b','c2'), (3,'c','c3');
```

## Read from a table for Iceberg
<a name="emr-iceberg-read-from-table-trino"></a>

To read from your Iceberg table, run the following command.

```
trino> SELECT * from default.iceberg_table;
```

## Considerations for using Iceberg with Trino
<a name="trino-considerations"></a>
+ Amazon EMR 6.5 does not offer Trino Iceberg Catalog support for Iceberg natively. Trino needs Iceberg v0.11, so we recommend launching an Amazon EMR cluster for Trino separate from the Spark cluster and including Iceberg v0.11 on that cluster.
+ When using AWS Glue as a catalog for Iceberg, make sure that the database in which you are creating a table exists in AWS Glue. If you are using services such as AWS Lake Formation and you're unable to load the catalog, make sure you have proper access to the service to execute the command.
+ Iceberg Glue integration does not work with the Redshift Managed Storage catalog.

# Use an Iceberg cluster with Flink
<a name="emr-iceberg-use-flink-cluster"></a>

Starting with Amazon EMR version 6.9.0, you can use Iceberg with a Flink cluster without the setup steps required when using the open source Iceberg Flink Integration.

## Creating an Iceberg cluster
<a name="creating-iceberg-cluster"></a>

You can create a cluster with Iceberg installed using the AWS Management Console, the AWS CLI, or the Amazon EMR API. In this tutorial, you use the AWS CLI to work with Iceberg on an Amazon EMR cluster. To use the console to create a cluster with Iceberg installed, follow the steps in [Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/).

To use Iceberg on Amazon EMR with the AWS CLI, first create a cluster with the following steps. For information on specifying the Iceberg classification using the AWS CLI, see [Supply a configuration using the AWS CLI when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli) or [Supply a configuration using the Java SDK when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk). Create a file called `configurations.json` with the following content:

```
[{
"Classification":"iceberg-defaults",
    "Properties":{"iceberg.enabled":"true"}
}]
```

Next, create a cluster with the following configuration, replacing the example Amazon S3 bucket path and subnet ID with your own values:

```
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
```

Alternatively, you can create an Amazon EMR 6.9.0 cluster with a Flink application in it and use the file `/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar` as a JAR dependency in a Flink job.

## Using the Flink SQL Client
<a name="using-flink-sql-client"></a>

The SQL Client script is located under `/usr/lib/flink/bin`. You can run the script with the following command:

```
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
```

This launches a Flink SQL Shell.

## Flink examples
<a name="flink-examples"></a>

### Create an Iceberg table
<a name="create-iceberg-table"></a>

**Flink SQL**

```
CREATE CATALOG glue_catalog WITH (
   'type'='iceberg',
   'warehouse'='<WAREHOUSE>',
   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
    'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
 );

USE CATALOG  glue_catalog;

CREATE DATABASE IF NOT EXISTS <DB>;

USE <DB>;

CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
```

**Table API**

```
EnvironmentSettings settings =
                EnvironmentSettings.newInstance().inBatchMode().build();

TableEnvironment tEnv = TableEnvironment.create(settings);

String warehouse = "<WAREHOUSE>";
String db = "<DB>";

tEnv.executeSql(
                "CREATE CATALOG glue_catalog WITH (\n"
                        + "   'type'='iceberg',\n"
                        + "   'warehouse'='"
                        + warehouse
                        + "',\n"
                        + "   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
                        + "   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
                        + " );");

tEnv.executeSql("USE CATALOG  glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
        "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
```

### Write to an Iceberg table
<a name="write-to-iceberg-table"></a>

**Flink SQL**

```
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
```

**Table API**

```
tEnv.executeSql(
        "INSERT INTO `glue_catalog`.`"
                + db
                + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
```

**Datastream API**

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String db = "<DB Name>";

String warehouse = "<Warehouse Path>";

GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));

DataStream<RowData> input = env.fromElements(rowData1);

Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");

CatalogLoader glueCatlogLoader =
        CatalogLoader.custom(
                "glue",
                props,
                new Configuration(),
                "org.apache.iceberg.aws.glue.GlueCatalog");

TableLoader tableLoader =
        TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));

DataStreamSink<Void> dataStreamSink =
        FlinkSink.forRowData(input).tableLoader(tableLoader).append();

env.execute("Datastream Write");
```

### Read from an Iceberg table
<a name="read-from-iceberg-table"></a>

**Flink SQL**

```
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
```

**Table API**

```
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
```

**Datastream API**

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String db = "<DB Name>";

String warehouse = "<Warehouse Path>";

Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");

CatalogLoader glueCatlogLoader =
        CatalogLoader.custom(
                "glue",
                props,
                new Configuration(),
                "org.apache.iceberg.aws.glue.GlueCatalog");
                
TableLoader tableLoader =
        TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));

DataStream<RowData> batch =
                FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();

batch.print().name("print-sink");
```

## Using the Hive catalog
<a name="using-hive-catalog"></a>

Make sure the Flink and Hive dependencies are resolved as described in [Configure Flink with Hive Metastore and Glue Catalog](flink-configure.md#flink-configure-hive).

## Running a Flink Job
<a name="running-flink-job"></a>

One way to submit a job to Flink is to use a per job Flink YARN session. This can be launched with the following command:

```
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
```

## Considerations for using Iceberg with Flink
<a name="flink-considerations"></a>
+ When using AWS Glue as a catalog for Iceberg, make sure the database in which you are creating a table exists in AWS Glue. If you are using services such as AWS Lake Formation and you're unable to load the catalog, make sure you have proper access to the service to execute the command.
+ Iceberg Glue integration does not work with the Redshift Managed Storage catalog.

# Use an Iceberg cluster with Hive
<a name="emr-iceberg-use-hive-cluster"></a>

With Amazon EMR releases 6.9.0 and higher, you can use Iceberg with a Hive cluster without having to perform the setup steps that are required for Open Source Iceberg Hive Integration. For Amazon EMR versions 6.8.0 and earlier, you can use a bootstrap action to install `iceberg-hive-runtime` jar for configuring Hive for Iceberg support.

Amazon EMR 6.9.0 includes all features for [Hive 3.1.3 integration with Iceberg 0.14.1](https://iceberg.apache.org/releases/#0140-release) and also includes Amazon EMR added features such as auto selection of supported execution engines at runtime (Amazon EMR on EKS 6.9.0).

## Create an Iceberg cluster
<a name="create-iceberg-cluster"></a>

You can create a cluster with Iceberg installed using the AWS Management Console, the AWS CLI or the Amazon EMR API. In this tutorial, you use the AWS CLI to work with Iceberg on an Amazon EMR cluster. To use the console to create a cluster with Iceberg installed, follow the steps in [ Build an Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue](https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/).

To use Iceberg on Amazon EMR with the AWS CLI, first create a cluster using the steps below. For information on specifying the Iceberg classification using the AWS CLI or the Java SDK, see [Supply a configuration using the AWS CLI when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-cli) or [Supply a configuration using the Java SDK when you create a cluster](emr-configure-apps-create-cluster.md#emr-configure-apps-create-cluster-sdk). Create a file named `configurations.json` with the following content:

```
[{
    "Classification":"iceberg-defaults",
    "Properties":{"iceberg.enabled":"true"}
}]
```

Next, create a cluster with the following configuration, replacing the example Amazon S3 bucket path and the subnet ID with your own:

```
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Hive \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_hive_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
```

A Hive Iceberg cluster does the following things:
+ Loads the Iceberg Hive runtime jar in Hive and enables Iceberg-related configuration for the Hive engine.
+ Enables Amazon EMR Hive’s dynamic execution engine selection to prevent users from setting supported execution engine for Iceberg compatibility.

**Note**  
Hive Iceberg clusters do not currently support AWS Glue Data Catalog. The default Iceberg catalog is `HiveCatalog`, which corresponds to the metastore configured for the Hive environment. For more information on catalog management, see [Using HCatalog](https://cwiki.apache.org/confluence/display/Hive/HCatalog+UsingHCat#HCatalogUsingHCat-UsingHCatalog) in the [Apache Hive documentation](https://cwiki.apache.org/confluence/display/HIVE).

## Feature support
<a name="feature-support"></a>

Amazon EMR 6.9.0 supports Hive 3.1.3 and Iceberg 0.14.1. The feature support is limited to Iceberg-compatible features for Hive 3.1.2 and 3.1.3. The following commands are supported:
+ With Amazon EMR releases 6.9.0 to 6.12.x, you must include the `libfb303` jar in the Hive `auxlib` directory. Use the following command to include it:

  ```
  sudo /usr/bin/ln -sf /usr/lib/hive/lib/libfb303-*.jar /usr/lib/hive/auxlib/libfb303.jar
  ```

  With Amazon EMR releases 6.13 and higher, the `libfb303` jar is automatically symlinked to the Hive `auxlib` directory. 
+ **Creating a table**
  + **Non-partitioned table** – External tables in Hive can be created by providing the storage handler as follows:

    ```
    CREATE EXTERNAL TABLE x (i int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    ```
  + **Partitioned table** – External partitioned tables in Hive can be created as follows:

    ```
    CREATE EXTERNAL TABLE x (i int) PARTITIONED BY (j int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    ```
**Note**  
The `STORED AS` file format of ORC/AVRO/PARQUET is not supported in Hive 3. The default and only option is Parquet.
+ **Dropping a table** – The `DROP TABLE` command is used to drop tables, such as in the following example:

  ```
  DROP TABLE [IF EXISTS] table_name [PURGE];
  ```
+ **Reading a table** – `SELECT` statements can be used to read Iceberg tables in Hive, such as in the following example. Supported execution engines are MR and Tez.

  ```
  SELECT * FROM table_name
  ```

  For information on Hive’s select syntax, see [LanguageManual Select](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select). For information on select statements with Iceberg tables in Hive, see [Apache Iceberg Select](https://iceberg.apache.org/docs/latest/hive/#select).
+ **Inserting into a table** – HiveQL‘s `INSERT INTO` statement works on Iceberg tables with support for the Map Reduce execution engine only. Amazon EMR users don’t need to explicitly set the execution engine because Amazon EMR Hive selects the engine for Iceberg Tables at runtime. 
  + **Single table insert into** – Example:

    ```
    INSERT INTO table_name VALUES ('a', 1);
    INSERT INTO table_name SELECT...;
    ```
  + **Multi-table insert into** – Non-atomic multi-table insert into statements are supported. Example:

    ```
    FROM source
     INSERT INTO table_1 SELECT a, b
     INSERT INTO table_2 SELECT c,d;
    ```

Starting with Amazon EMR 7.3.0, Hive with Iceberg supports the AWS Glue Data Catalog as a metastore. To use the AWS Glue Data Catalog as the metastore, set the following property.

```
SET iceberg.catalog.<catalog_name>.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog;
```

Alternatively, you can also set the following property.

```
SET iceberg.catalog.<catalog_name>.type=glue;
```

You can then create a table using the following example.

```
CREATE EXTERNAL TABLE table_name (col1 type1, col2 type2,..)
ROW FORMAT SERDE 'org.apache.iceberg.mr.hive.HiveIcebergSerDe'
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location '<location>'
TBLPROPERTIES ('table_type'='iceberg', 'iceberg.catalog'='<catalog_name>');
```

## Considerations for using Iceberg with Hive
<a name="hive-considerations"></a>
+ Iceberg supports the following query types:
  + Create table
  + Drop table
  + Insert into table
  + Read table
+ Only the MR (MapReduce) execution engine is supported for DML (data manipulation language) operations, and MR is deprecated in Hive 3.1.3.
+ For Amazon EMR prior to 7.3.0, AWS Glue Data Catalog is not currently supported for Iceberg with Hive.
+ Error handling is insufficiently robust. In cases of misconfiguration, inserts into queries might complete successfully. However, failure to update metadata can result in data loss.
+ Iceberg Glue integration does not work with the Redshift Managed Storage catalog.