

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 PySpark 分析範本中使用參數
<a name="pyspark-parameter-handling"></a>

參數允許在任務提交時提供不同的值，以增加 PySpark 分析範本的彈性。您可以透過傳遞至進入點函數的內容物件來存取參數。

**注意**  
參數是使用者提供的字串，可包含任意內容。  
檢閱程式碼以確保安全地處理參數，以防止分析中的意外行為。
無論提交時提供哪些參數值，設計參數處理都能安全地運作。

## 存取參數
<a name="accessing-parameters"></a>

參數可在 `context['analysisParameters']` 字典中使用。所有參數值都是字串。

**Example 安全地存取參數**  

```
def entrypoint(context):
    # Access parameters from context
    parameters = context['analysisParameters']
    threshold = parameters['threshold']
    table_name = parameters['table_name']
    
    # Continue with analysis using parameters
    spark = context['sparkSession']
    input_df = context['referencedTables'][table_name]
    
    # Convert threshold value
    threshold_val = int(threshold)
    
    # Use parameter in DataFrame operation
    filtered_df = input_df.filter(input_df.amount > threshold_val)
    
    return {
        "results": {
            "output": filtered_df
        }
    }
```

## 參數安全最佳實務
<a name="parameter-security-best-practices"></a>

**警告**  
參數是使用者提供的字串，可包含任意內容。您必須安全地處理參數，以防止分析程式碼中的安全漏洞。

**要避免的不安全參數處理模式：**
+ **以程式碼形式執行參數** – 切勿在參數值`exec()`上使用 `eval()`或

  ```
  # UNSAFE - Don't do this
  eval(parameters['expression'])  # Can execute arbitrary code
  ```
+ **SQL 字串插補** – 切勿將參數直接串連至 SQL 字串

  ```
  # UNSAFE - Don't do this
  sql = f"SELECT * FROM table WHERE column = '{parameters['value']}'"  # SQL injection risk
  ```
+ **不安全的檔案路徑操作** – 請勿在未驗證的情況下直接在檔案系統操作中使用參數

  ```
  # UNSAFE - Don't do this
  file_path = f"/data/{parameters['filename']}"  # Path traversal risk
  ```

**安全參數處理模式：**
+ **在 DataFrame 操作中使用參數** – Spark DataFrames 安全地處理參數值

  ```
  # SAFE - Use parameters in DataFrame operations
  threshold = int(parameters['threshold'])
  filtered_df = input_df.filter(input_df.value > threshold)
  ```
+ **驗證參數值** – 在使用前檢查參數是否符合預期的格式

  ```
  # SAFE - Validate parameters before use
  def validate_date(date_str):
      try:
          from datetime import datetime
          datetime.strptime(date_str, '%Y-%m-%d')
          return True
      except ValueError:
          return False
  
  date_param = parameters['date_filter'] or '2024-01-01'
  if not validate_date(date_param):
      raise ValueError(f"Invalid date format: {date_param}")
  ```
+ **使用參數值的允許清單** – 如果可能，請根據已知的良好值驗證參數

  ```
  # SAFE - Use allowlists
  allowed_columns = ['column1', 'column2', 'column3']
  column_param = parameters['column_name']
  if column_param not in allowed_columns:
      raise ValueError(f"Invalid column: {column_param}")
  ```
+ **使用錯誤處理進行類型轉換** – 將字串參數安全地轉換為預期類型

  ```
  # SAFE - Convert with error handling
  try:
      batch_size = int(parameters['batch_size'] or '1000')
      if batch_size <= 0 or batch_size > 10000:
          raise ValueError(f"Batch size must be between 1 and 10000")
  except ValueError as e:
      print(f"Invalid parameter: {e}")
      raise
  ```

**重要**  
請記住，當任務執行器提供不同的值時，參數會略過程式碼檢閱。設計您的參數處理方式，無論提供哪些參數值，都能安全地運作。

## 完成參數範例
<a name="parameter-examples"></a>

**Example 在 PySpark 指令碼中安全地使用參數**  

```
def entrypoint(context):
    try:
        # Access Spark session and tables
        spark = context['sparkSession']
        input_table = context['referencedTables']['sales_data']
        
        # Access parameters - fail fast if analysisParameters missing
        parameters = context['analysisParameters']
        
        # Validate and convert numeric parameter (handles empty strings with default)
        try:
            threshold = int(parameters['threshold'] or '100')
            if threshold <= 0:
                raise ValueError("Threshold must be positive")
        except (ValueError, TypeError) as e:
            print(f"Invalid threshold parameter: {e}")
            raise
        
        # Validate date parameter (handles empty strings with default)
        date_filter = parameters['start_date'] or '2024-01-01'
        from datetime import datetime
        try:
            datetime.strptime(date_filter, '%Y-%m-%d')
        except ValueError:
            raise ValueError(f"Invalid date format: {date_filter}")
        
        # Use parameters safely in DataFrame operations
        filtered_df = input_table.filter(
            (input_table.amount > threshold) &
            (input_table.date >= date_filter)
        )
        
        result_df = filtered_df.groupBy("category").agg(
            {"amount": "sum"}
        )
        
        return {
            "results": {
                "filtered_results": result_df
            }
        }
    
    except Exception as e:
        print(f"Error in analysis: {str(e)}")
        raise
```