

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Crear un script de usuario
<a name="create-user-script"></a>

El script de usuario debe contener una función de punto de entrada (en otras palabras, un controlador). Puede asignar un nombre a su archivo de script de usuario con cualquier nombre de archivo de Python válido.

El siguiente procedimiento describe cómo crear un script de usuario para definir la funcionalidad principal del PySpark análisis.

**Requisitos previos**
+ PySpark 1.0 (corresponde a Python 3.11 y Spark 3.5.3)
+ Los conjuntos de datos de Amazon S3 solo se pueden leer como asociaciones de tablas configuradas en la sesión de Spark que defina. 
+ Su código no puede llamar directamente a Amazon S3 y AWS Glue
+ Su código no puede realizar llamadas de red

**Para crear un script de usuario**

1. Abra el editor de texto o el entorno de desarrollo integrado (IDE) de su elección.

   Puede usar cualquier editor de texto o IDE (como Visual Studio Code o Notepad\+\+) que admita archivos de Python. PyCharm

1. Cree un nuevo archivo de Python con el nombre que desee (por ejemplo,**my\_analysis.py**).

1. Defina una función de punto de entrada que acepte un parámetro de objeto de contexto.

   ```
   def entrypoint(context)
   ```

   El parámetro de `context` objeto es un diccionario que proporciona acceso a los componentes esenciales de Spark, a las tablas referenciadas y a los parámetros de análisis. Contiene:

   Acceda a la sesión de Spark mediante `context['sparkSession']`

   Tablas referenciadas mediante `context['referencedTables']`

   Parámetros de análisis mediante `context['analysisParameters']` (si los parámetros están definidos en la plantilla)

1. Defina los resultados de la función de punto de entrada: 

   ```
   return results
   ```

   `results`Debe devolver un objeto que contenga un diccionario de resultados con nombres de archivos a una salida. DataFrame
**nota**  
AWS Clean Rooms escribe automáticamente los DataFrame objetos en el depósito S3 del receptor de resultados.

1. Ya puede hacer lo siguiente: 

   1. Guarde este script de usuario en S3. Para obtener más información, consulte [Almacenamiento de un script de usuario y un entorno virtual en S3](store-artifacts-in-s3.md).

   1. Cree el entorno virtual opcional para admitir cualquier biblioteca adicional que necesite su script de usuario. Para obtener más información, consulte [Crear un entorno virtual (opcional)](create-virtual-environment.md).

**Example Ejemplo 1**  

```
# File name: my_analysis.py

def entrypoint(context):
    try:
        # Access Spark session
        spark = context['sparkSession']

        # Access input tables
        input_table1 = context['referencedTables']['table1_name']
        input_table2 = context['referencedTables']['table2_name']

        # Example data processing operations
        output_df1 = input_table1.select("column1", "column2")
        output_df2 = input_table2.join(input_table1, "join_key")
        output_df3 = input_table1.groupBy("category").count()
    
        # Return results - each key creates a separate output folder
        return {
            "results": {
                "output1": output_df1,        # Creates output1/ folder
                "output2": output_df2,        # Creates output2/ folder
                "analysis_summary": output_df3 # Creates analysis_summary/ folder
            }
        }
   
    except Exception as e:
        print(f"Error in main function: {str(e)}")
        raise e
```
La estructura de carpetas de este ejemplo es la siguiente:   

```
analysis_results/
│
├── output1/ # Basic selected columns
│ ├── part-00000.parquet
│ └── _SUCCESS
│
├── output2/ # Joined data
│ ├── part-00000.parquet
│ └── _SUCCESS
│
└── analysis_summary/ # Aggregated results
├── part-00000.parquet
└── _SUCCESS
```

**Example Ejemplo 2**  

```
def entrypoint(context):
    try:
        # Get DataFrames from context
        emp_df = context['referencedTables']['employees']
        dept_df = context['referencedTables']['departments']

        # Apply Transformations
        emp_dept_df = emp_df.join(
            dept_df,
            on="dept_id",
            how="left"
        ).select(
            "emp_id",
            "name",
            "salary",
            "dept_name"
        )

        # Return Dataframes
        return {
            "results": {
                "outputTable": emp_dept_df
            }
        }

    except Exception as e:
        print(f"Error in entrypoint function: {str(e)}")
        raise e
```