PySpark 分析テンプレートでのパラメータの操作 - AWS Clean Rooms

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

PySpark 分析テンプレートでのパラメータの操作

パラメータは、ジョブの送信時に異なる値を指定できるようにすることで、PySpark 分析テンプレートの柔軟性を高めます。パラメータは、エントリポイント関数に渡されるコンテキストオブジェクトからアクセスできます。

注記

パラメータは、任意のコンテンツを含めることができるユーザー提供の文字列です。

  • コードを確認して、分析で予期しない動作が発生しないように、パラメータが安全に処理されていることを確認します。

  • 送信時に提供されるパラメータ値に関係なく、安全に機能するようにパラメータ処理を設計します。

パラメータへのアクセス

パラメータはcontext['analysisParameters']ディクショナリで使用できます。すべてのパラメータ値は文字列です。

例パラメータへの安全なアクセス
def entrypoint(context): # Access parameters from context parameters = context['analysisParameters'] threshold = parameters['threshold'] table_name = parameters['table_name'] # Continue with analysis using parameters spark = context['sparkSession'] input_df = context['referencedTables'][table_name] # Convert threshold value threshold_val = int(threshold) # Use parameter in DataFrame operation filtered_df = input_df.filter(input_df.amount > threshold_val) return { "results": { "output": filtered_df } }

パラメータのセキュリティベストプラクティス

警告

パラメータは、任意のコンテンツを含めることができるユーザー提供の文字列です。分析コードのセキュリティの脆弱性を防ぐために、パラメータを安全に処理する必要があります。

以下を回避するための安全でないパラメータ処理パターン:

  • パラメータをコードとして実行する – パラメータ値exec()eval()または を使用しないでください

    # UNSAFE - Don't do this eval(parameters['expression']) # Can execute arbitrary code
  • SQL 文字列補間 – パラメータを SQL 文字列に直接連結しないでください

    # UNSAFE - Don't do this sql = f"SELECT * FROM table WHERE column = '{parameters['value']}'" # SQL injection risk
  • 安全でないファイルパスオペレーション – 検証なしでファイルシステムオペレーションでパラメータを直接使用しない

    # UNSAFE - Don't do this file_path = f"/data/{parameters['filename']}" # Path traversal risk

安全なパラメータ処理パターン:

  • DataFrame オペレーションでパラメータを使用する – Spark DataFrames はパラメータ値を安全に処理します

    # SAFE - Use parameters in DataFrame operations threshold = int(parameters['threshold']) filtered_df = input_df.filter(input_df.value > threshold)
  • パラメータ値を検証する – 使用前に、パラメータが予想される形式を満たしていることを確認します。

    # SAFE - Validate parameters before use def validate_date(date_str): try: from datetime import datetime datetime.strptime(date_str, '%Y-%m-%d') return True except ValueError: return False date_param = parameters['date_filter'] or '2024-01-01' if not validate_date(date_param): raise ValueError(f"Invalid date format: {date_param}")
  • パラメータ値に許可リストを使用する – 可能であれば、既知の正常な値と照らし合わせてパラメータを検証します。

    # SAFE - Use allowlists allowed_columns = ['column1', 'column2', 'column3'] column_param = parameters['column_name'] if column_param not in allowed_columns: raise ValueError(f"Invalid column: {column_param}")
  • エラー処理による型変換 — 文字列パラメータを予期される型に安全に変換する

    # SAFE - Convert with error handling try: batch_size = int(parameters['batch_size'] or '1000') if batch_size <= 0 or batch_size > 10000: raise ValueError(f"Batch size must be between 1 and 10000") except ValueError as e: print(f"Invalid parameter: {e}") raise
重要

ジョブランナーが異なる値を提供する場合、パラメータはコードレビューをバイパスすることに注意してください。提供されるパラメータ値に関係なく、安全に機能するようにパラメータ処理を設計します。

完全なパラメータの例

例 PySpark スクリプトでのパラメータの安全な使用
def entrypoint(context): try: # Access Spark session and tables spark = context['sparkSession'] input_table = context['referencedTables']['sales_data'] # Access parameters - fail fast if analysisParameters missing parameters = context['analysisParameters'] # Validate and convert numeric parameter (handles empty strings with default) try: threshold = int(parameters['threshold'] or '100') if threshold <= 0: raise ValueError("Threshold must be positive") except (ValueError, TypeError) as e: print(f"Invalid threshold parameter: {e}") raise # Validate date parameter (handles empty strings with default) date_filter = parameters['start_date'] or '2024-01-01' from datetime import datetime try: datetime.strptime(date_filter, '%Y-%m-%d') except ValueError: raise ValueError(f"Invalid date format: {date_filter}") # Use parameters safely in DataFrame operations filtered_df = input_table.filter( (input_table.amount > threshold) & (input_table.date >= date_filter) ) result_df = filtered_df.groupBy("category").agg( {"amount": "sum"} ) return { "results": { "filtered_results": result_df } } except Exception as e: print(f"Error in analysis: {str(e)}") raise