Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Bekerja dengan parameter dalam templat PySpark analisis
Parameter meningkatkan fleksibilitas templat PySpark analisis Anda dengan memungkinkan nilai yang berbeda diberikan pada waktu pengiriman pekerjaan. Parameter dapat diakses melalui objek konteks yang diteruskan ke fungsi entrypoint Anda.
catatan
Parameter adalah string yang disediakan pengguna yang dapat berisi konten arbitrer.
-
Tinjau kode untuk memastikan parameter ditangani dengan aman untuk mencegah perilaku tak terduga dalam analisis Anda.
-
Desain penanganan parameter untuk bekerja dengan aman terlepas dari nilai parameter apa yang disediakan pada waktu pengiriman.
Mengakses parameter
Parameter tersedia di context['analysisParameters'] kamus. Semua nilai parameter adalah string.
contoh Mengakses parameter dengan aman
def entrypoint(context): # Access parameters from context parameters = context['analysisParameters'] threshold = parameters['threshold'] table_name = parameters['table_name'] # Continue with analysis using parameters spark = context['sparkSession'] input_df = context['referencedTables'][table_name] # Convert threshold value threshold_val = int(threshold) # Use parameter in DataFrame operation filtered_df = input_df.filter(input_df.amount > threshold_val) return { "results": { "output": filtered_df } }
Praktik terbaik keamanan parameter
Awas
Parameter adalah string yang disediakan pengguna yang dapat berisi konten arbitrer. Anda harus menangani parameter dengan aman untuk mencegah kerentanan keamanan dalam kode analisis Anda.
Pola penanganan parameter yang tidak aman untuk dihindari:
-
Menjalankan parameter sebagai kode - Jangan pernah menggunakan
eval()atauexec()pada nilai parameter# UNSAFE - Don't do this eval(parameters['expression']) # Can execute arbitrary code -
Interpolasi string SQL - Jangan pernah menggabungkan parameter langsung ke string SQL
# UNSAFE - Don't do this sql = f"SELECT * FROM table WHERE column = '{parameters['value']}'" # SQL injection risk -
Operasi jalur file yang tidak aman - Jangan pernah menggunakan parameter secara langsung dalam operasi sistem file tanpa validasi
# UNSAFE - Don't do this file_path = f"/data/{parameters['filename']}" # Path traversal risk
Pola penanganan parameter aman:
-
Gunakan parameter dalam DataFrame operasi — Spark DataFrames menangani nilai parameter dengan aman
# SAFE - Use parameters in DataFrame operations threshold = int(parameters['threshold']) filtered_df = input_df.filter(input_df.value > threshold) -
Validasi nilai parameter - Periksa apakah parameter memenuhi format yang diharapkan sebelum digunakan
# SAFE - Validate parameters before use def validate_date(date_str): try: from datetime import datetime datetime.strptime(date_str, '%Y-%m-%d') return True except ValueError: return False date_param = parameters['date_filter'] or '2024-01-01' if not validate_date(date_param): raise ValueError(f"Invalid date format: {date_param}") -
Gunakan daftar yang diizinkan untuk nilai parameter - Jika memungkinkan, validasi parameter terhadap nilai baik yang diketahui
# SAFE - Use allowlists allowed_columns = ['column1', 'column2', 'column3'] column_param = parameters['column_name'] if column_param not in allowed_columns: raise ValueError(f"Invalid column: {column_param}") -
Konversi tipe dengan penanganan kesalahan - Konversi parameter string ke tipe yang diharapkan dengan aman
# SAFE - Convert with error handling try: batch_size = int(parameters['batch_size'] or '1000') if batch_size <= 0 or batch_size > 10000: raise ValueError(f"Batch size must be between 1 and 10000") except ValueError as e: print(f"Invalid parameter: {e}") raise
penting
Ingat bahwa parameter melewati tinjauan kode saat pelari pekerjaan memberikan nilai yang berbeda. Rancang penanganan parameter Anda agar bekerja dengan aman terlepas dari nilai parameter apa yang disediakan.
Contoh parameter lengkap
contoh Menggunakan parameter dengan aman dalam PySpark skrip
def entrypoint(context): try: # Access Spark session and tables spark = context['sparkSession'] input_table = context['referencedTables']['sales_data'] # Access parameters - fail fast if analysisParameters missing parameters = context['analysisParameters'] # Validate and convert numeric parameter (handles empty strings with default) try: threshold = int(parameters['threshold'] or '100') if threshold <= 0: raise ValueError("Threshold must be positive") except (ValueError, TypeError) as e: print(f"Invalid threshold parameter: {e}") raise # Validate date parameter (handles empty strings with default) date_filter = parameters['start_date'] or '2024-01-01' from datetime import datetime try: datetime.strptime(date_filter, '%Y-%m-%d') except ValueError: raise ValueError(f"Invalid date format: {date_filter}") # Use parameters safely in DataFrame operations filtered_df = input_table.filter( (input_table.amount > threshold) & (input_table.date >= date_filter) ) result_df = filtered_df.groupBy("category").agg( {"amount": "sum"} ) return { "results": { "filtered_results": result_df } } except Exception as e: print(f"Error in analysis: {str(e)}") raise