本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 PySpark 分析模板中的参数
参数允许在作业提交时提供不同的值,从而提高 PySpark 分析模板的灵活性。参数可通过传递给入口点函数的上下文对象进行访问。
注意
参数是用户提供的字符串,可以包含任意内容。
-
查看代码以确保参数得到安全处理,以防止分析中出现意外行为。
-
无论提交时提供什么参数值,设计参数处理都要安全运行。
访问参数
参数可在context['analysisParameters']字典中找到。所有参数值均为字符串。
例安全访问参数
def entrypoint(context): # Access parameters from context parameters = context['analysisParameters'] threshold = parameters['threshold'] table_name = parameters['table_name'] # Continue with analysis using parameters spark = context['sparkSession'] input_df = context['referencedTables'][table_name] # Convert threshold value threshold_val = int(threshold) # Use parameter in DataFrame operation filtered_df = input_df.filter(input_df.amount > threshold_val) return { "results": { "output": filtered_df } }
参数安全的最佳实践
警告
参数是用户提供的字符串,可以包含任意内容。您必须安全地处理参数,以防分析代码中存在安全漏洞。
要避免的不安全参数处理模式:
-
将参数作为代码执行 — 切勿
exec()在参数值上使用 oeval()r# UNSAFE - Don't do this eval(parameters['expression']) # Can execute arbitrary code -
SQL 字符串插值 — 切勿将参数直接连接到 SQL 字符串中
# UNSAFE - Don't do this sql = f"SELECT * FROM table WHERE column = '{parameters['value']}'" # SQL injection risk -
不安全的文件路径操作 — 未经验证,切勿在文件系统操作中直接使用参数
# UNSAFE - Don't do this file_path = f"/data/{parameters['filename']}" # Path traversal risk
安全的参数处理模式:
-
在 DataFrame 操作中使用参数 — Spark DataFrames 可以安全地处理参数值
# SAFE - Use parameters in DataFrame operations threshold = int(parameters['threshold']) filtered_df = input_df.filter(input_df.value > threshold) -
验证参数值-使用前检查参数是否符合预期格式
# SAFE - Validate parameters before use def validate_date(date_str): try: from datetime import datetime datetime.strptime(date_str, '%Y-%m-%d') return True except ValueError: return False date_param = parameters['date_filter'] or '2024-01-01' if not validate_date(date_param): raise ValueError(f"Invalid date format: {date_param}") -
对参数值使用许可名单-如果可能,请根据已知的正确值验证参数
# SAFE - Use allowlists allowed_columns = ['column1', 'column2', 'column3'] column_param = parameters['column_name'] if column_param not in allowed_columns: raise ValueError(f"Invalid column: {column_param}") -
带错误处理的类型转换-将字符串参数安全地转换为预期类型
# SAFE - Convert with error handling try: batch_size = int(parameters['batch_size'] or '1000') if batch_size <= 0 or batch_size > 10000: raise ValueError(f"Batch size must be between 1 and 10000") except ValueError as e: print(f"Invalid parameter: {e}") raise
重要
请记住,当作业运行器提供不同的值时,参数会绕过代码审查。无论提供什么参数值,都要设计出可以安全运行的参数处理。
完整参数示例
例在 PySpark 脚本中安全地使用参数
def entrypoint(context): try: # Access Spark session and tables spark = context['sparkSession'] input_table = context['referencedTables']['sales_data'] # Access parameters - fail fast if analysisParameters missing parameters = context['analysisParameters'] # Validate and convert numeric parameter (handles empty strings with default) try: threshold = int(parameters['threshold'] or '100') if threshold <= 0: raise ValueError("Threshold must be positive") except (ValueError, TypeError) as e: print(f"Invalid threshold parameter: {e}") raise # Validate date parameter (handles empty strings with default) date_filter = parameters['start_date'] or '2024-01-01' from datetime import datetime try: datetime.strptime(date_filter, '%Y-%m-%d') except ValueError: raise ValueError(f"Invalid date format: {date_filter}") # Use parameters safely in DataFrame operations filtered_df = input_table.filter( (input_table.amount > threshold) & (input_table.date >= date_filter) ) result_df = filtered_df.groupBy("category").agg( {"amount": "sum"} ) return { "results": { "filtered_results": result_df } } except Exception as e: print(f"Error in analysis: {str(e)}") raise