View a markdown version of this page

在 PySpark 分析範本中使用參數 - AWS Clean Rooms

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 PySpark 分析範本中使用參數

參數允許在任務提交時提供不同的值,以增加 PySpark 分析範本的彈性。您可以透過傳遞至進入點函數的內容物件來存取參數。

注意

參數是使用者提供的字串,可包含任意內容。

  • 檢閱程式碼以確保安全地處理參數,以防止分析中的意外行為。

  • 無論提交時提供哪些參數值,設計參數處理都能安全地運作。

存取參數

參數可在 context['analysisParameters'] 字典中使用。所有參數值都是字串。

範例安全地存取參數
def entrypoint(context): # Access parameters from context parameters = context['analysisParameters'] threshold = parameters['threshold'] table_name = parameters['table_name'] # Continue with analysis using parameters spark = context['sparkSession'] input_df = context['referencedTables'][table_name] # Convert threshold value threshold_val = int(threshold) # Use parameter in DataFrame operation filtered_df = input_df.filter(input_df.amount > threshold_val) return { "results": { "output": filtered_df } }

參數安全最佳實務

警告

參數是使用者提供的字串,可包含任意內容。您必須安全地處理參數,以防止分析程式碼中的安全漏洞。

要避免的不安全參數處理模式:

  • 以程式碼形式執行參數 – 切勿在參數值exec()上使用 eval()

    # UNSAFE - Don't do this eval(parameters['expression']) # Can execute arbitrary code
  • SQL 字串插補 – 切勿將參數直接串連至 SQL 字串

    # UNSAFE - Don't do this sql = f"SELECT * FROM table WHERE column = '{parameters['value']}'" # SQL injection risk
  • 不安全的檔案路徑操作 – 請勿在未驗證的情況下直接在檔案系統操作中使用參數

    # UNSAFE - Don't do this file_path = f"/data/{parameters['filename']}" # Path traversal risk

安全參數處理模式:

  • 在 DataFrame 操作中使用參數 – Spark DataFrames 安全地處理參數值

    # SAFE - Use parameters in DataFrame operations threshold = int(parameters['threshold']) filtered_df = input_df.filter(input_df.value > threshold)
  • 驗證參數值 – 在使用前檢查參數是否符合預期的格式

    # SAFE - Validate parameters before use def validate_date(date_str): try: from datetime import datetime datetime.strptime(date_str, '%Y-%m-%d') return True except ValueError: return False date_param = parameters['date_filter'] or '2024-01-01' if not validate_date(date_param): raise ValueError(f"Invalid date format: {date_param}")
  • 使用參數值的允許清單 – 如果可能,請根據已知的良好值驗證參數

    # SAFE - Use allowlists allowed_columns = ['column1', 'column2', 'column3'] column_param = parameters['column_name'] if column_param not in allowed_columns: raise ValueError(f"Invalid column: {column_param}")
  • 使用錯誤處理進行類型轉換 – 將字串參數安全地轉換為預期類型

    # SAFE - Convert with error handling try: batch_size = int(parameters['batch_size'] or '1000') if batch_size <= 0 or batch_size > 10000: raise ValueError(f"Batch size must be between 1 and 10000") except ValueError as e: print(f"Invalid parameter: {e}") raise
重要

請記住,當任務執行器提供不同的值時,參數會略過程式碼檢閱。設計您的參數處理方式,無論提供哪些參數值,都能安全地運作。

完成參數範例

範例在 PySpark 指令碼中安全地使用參數
def entrypoint(context): try: # Access Spark session and tables spark = context['sparkSession'] input_table = context['referencedTables']['sales_data'] # Access parameters - fail fast if analysisParameters missing parameters = context['analysisParameters'] # Validate and convert numeric parameter (handles empty strings with default) try: threshold = int(parameters['threshold'] or '100') if threshold <= 0: raise ValueError("Threshold must be positive") except (ValueError, TypeError) as e: print(f"Invalid threshold parameter: {e}") raise # Validate date parameter (handles empty strings with default) date_filter = parameters['start_date'] or '2024-01-01' from datetime import datetime try: datetime.strptime(date_filter, '%Y-%m-%d') except ValueError: raise ValueError(f"Invalid date format: {date_filter}") # Use parameters safely in DataFrame operations filtered_df = input_table.filter( (input_table.amount > threshold) & (input_table.date >= date_filter) ) result_df = filtered_df.groupBy("category").agg( {"amount": "sum"} ) return { "results": { "filtered_results": result_df } } except Exception as e: print(f"Error in analysis: {str(e)}") raise