

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建用户脚本
<a name="create-user-script"></a>

用户脚本必须包含入口点函数（换句话说，处理程序）。您可以使用任何有效的 Python 文件名来命名用户脚本文件。

以下过程介绍如何创建用户脚本来定义 PySpark 分析的核心功能。

**先决条件**
+ PySpark 1.0（对应于 Python 3.11 和 Spark 3.5.3）
+ Amazon S3 中的数据集只能在您定义的 Spark 会话中作为已配置的表关联进行读取。
+ 你的代码无法直接调用 Amazon S3 而且 AWS Glue
+ 你的代码无法进行网络调用

**创建用户脚本**

1. 打开您选择的文本编辑器或集成开发环境 (IDE)。

   你可以使用任何支持 Python 文件的文本编辑器或 IDE（例如 Visual Studio Code 或 Notepad\$1\$1）。 PyCharm

1. 使用您选择的名称创建一个新的 Python 文件（例如，**my\$1analysis.py**）。

1. 定义一个接受上下文对象参数的入口点函数。

   ```
   def entrypoint(context)
   ```

   `context`对象参数是一个字典，用于访问基本的 Spark 组件、引用的表和分析参数。其中包含：

   通过 Spark 访问会话 `context['sparkSession']`

   通过以下方式引用了表 `context['referencedTables']`

   通过分析参数`context['analysisParameters']`（如果参数是在模板中定义的）

1. 定义入口点函数的结果：

   ```
   return results
   ```

   `results`必须向输出 DataFrame返回一个包含文件名结果字典的对象。
**注意**  
AWS Clean Rooms 自动将 DataFrame 对象写入结果接收器的 S3 存储桶。

1. 您现在已准备好执行以下操作：

   1. 将此用户脚本存储在 S3 中。有关更多信息，请参阅 [在 S3 中存储用户脚本和虚拟环境](store-artifacts-in-s3.md)。

   1. 创建可选的虚拟环境以支持用户脚本所需的任何其他库。有关更多信息，请参阅 [创建虚拟环境（可选）](create-virtual-environment.md)。

**Example 示例 1**  

```
# File name: my_analysis.py

def entrypoint(context):
    try:
        # Access Spark session
        spark = context['sparkSession']

        # Access input tables
        input_table1 = context['referencedTables']['table1_name']
        input_table2 = context['referencedTables']['table2_name']

        # Example data processing operations
        output_df1 = input_table1.select("column1", "column2")
        output_df2 = input_table2.join(input_table1, "join_key")
        output_df3 = input_table1.groupBy("category").count()
    
        # Return results - each key creates a separate output folder
        return {
            "results": {
                "output1": output_df1,        # Creates output1/ folder
                "output2": output_df2,        # Creates output2/ folder
                "analysis_summary": output_df3 # Creates analysis_summary/ folder
            }
        }
   
    except Exception as e:
        print(f"Error in main function: {str(e)}")
        raise e
```
此示例的文件夹结构如下所示：  

```
analysis_results/
│
├── output1/ # Basic selected columns
│ ├── part-00000.parquet
│ └── _SUCCESS
│
├── output2/ # Joined data
│ ├── part-00000.parquet
│ └── _SUCCESS
│
└── analysis_summary/ # Aggregated results
├── part-00000.parquet
└── _SUCCESS
```

**Example 示例 2**  

```
def entrypoint(context):
    try:
        # Get DataFrames from context
        emp_df = context['referencedTables']['employees']
        dept_df = context['referencedTables']['departments']

        # Apply Transformations
        emp_dept_df = emp_df.join(
            dept_df,
            on="dept_id",
            how="left"
        ).select(
            "emp_id",
            "name",
            "salary",
            "dept_name"
        )

        # Return Dataframes
        return {
            "results": {
                "outputTable": emp_dept_df
            }
        }

    except Exception as e:
        print(f"Error in entrypoint function: {str(e)}")
        raise e
```