

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 建立使用者指令碼
<a name="create-user-script"></a>

使用者指令碼必須包含進入點函數 （也就是處理常式）。您可以使用任何有效的 Python 檔案名稱來命名使用者指令碼檔案。

下列程序說明如何建立使用者指令碼來定義 PySpark 分析的核心功能。

**先決條件**
+ PySpark 1.0 （對應至 Python 3.11 和 Spark 3.5.3)
+ Amazon S3 中的資料集只能在您定義的 Spark 工作階段中讀取為已設定的資料表關聯。
+ 您的程式碼無法直接呼叫 Amazon S3 和 AWS Glue
+ 您的程式碼無法進行網路呼叫

**建立使用者指令碼**

1. 開啟您選擇的文字編輯器或整合式開發環境 (IDE)。

   您可以使用支援 Python 檔案的任何文字編輯器或 IDE （例如 Visual Studio Code、PyCharm 或 Notepad\$1\$1)。

1. 使用您選擇的名稱建立新的 Python 檔案 （例如 **my\$1analysis.py**)。

1. 定義接受內容物件參數的進入點函數。

   ```
   def entrypoint(context)
   ```

   `context` 物件參數是一種字典，可讓您存取重要的 Spark 元件、參考的資料表和分析參數。它包含以下內容：

   透過 存取 Spark 工作階段 `context['sparkSession']`

   透過 參考的資料表 `context['referencedTables']`

   透過 分析參數 `context['analysisParameters']`（如果在範本中定義參數）

1. 定義進入點函數的結果：

   ```
   return results
   ```

   `results` 必須將包含檔案名稱結果字典的物件傳回至輸出 DataFrame。
**注意**  
AWS Clean Rooms 會自動將 DataFrame 物件寫入結果接收器的 S3 儲存貯體。

1. 您現在已準備好：

   1. 將此使用者指令碼儲存在 S3 中。如需詳細資訊，請參閱[在 S3 中存放使用者指令碼和虛擬環境](store-artifacts-in-s3.md)。

   1. 建立選用的虛擬環境，以支援使用者指令碼所需的任何其他程式庫。如需詳細資訊，請參閱[建立虛擬環境 （選用）](create-virtual-environment.md)。

**Example 範例 1**  

```
# File name: my_analysis.py

def entrypoint(context):
    try:
        # Access Spark session
        spark = context['sparkSession']

        # Access input tables
        input_table1 = context['referencedTables']['table1_name']
        input_table2 = context['referencedTables']['table2_name']

        # Example data processing operations
        output_df1 = input_table1.select("column1", "column2")
        output_df2 = input_table2.join(input_table1, "join_key")
        output_df3 = input_table1.groupBy("category").count()
    
        # Return results - each key creates a separate output folder
        return {
            "results": {
                "output1": output_df1,        # Creates output1/ folder
                "output2": output_df2,        # Creates output2/ folder
                "analysis_summary": output_df3 # Creates analysis_summary/ folder
            }
        }
   
    except Exception as e:
        print(f"Error in main function: {str(e)}")
        raise e
```
此範例的資料夾結構如下：  

```
analysis_results/
│
├── output1/ # Basic selected columns
│ ├── part-00000.parquet
│ └── _SUCCESS
│
├── output2/ # Joined data
│ ├── part-00000.parquet
│ └── _SUCCESS
│
└── analysis_summary/ # Aggregated results
├── part-00000.parquet
└── _SUCCESS
```

**Example 範例 2**  

```
def entrypoint(context):
    try:
        # Get DataFrames from context
        emp_df = context['referencedTables']['employees']
        dept_df = context['referencedTables']['departments']

        # Apply Transformations
        emp_dept_df = emp_df.join(
            dept_df,
            on="dept_id",
            how="left"
        ).select(
            "emp_id",
            "name",
            "salary",
            "dept_name"
        )

        # Return Dataframes
        return {
            "results": {
                "outputTable": emp_dept_df
            }
        }

    except Exception as e:
        print(f"Error in entrypoint function: {str(e)}")
        raise e
```