文档 AWS SDK 示例 GitHub 存储库中还有更多 S AWS DK 示例
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
与 AWS SDK AddJobFlowSteps 配合使用
以下代码示例演示如何使用 AddJobFlowSteps。
- Python
-
- 适用于 Python 的 SDK(Boto3)
-
注意
还有更多相关信息 GitHub。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 添加 Spark 步骤,该步骤在添加后立即由集群运行。
def add_step(cluster_id, name, script_uri, script_args, emr_client): """ Adds a job step to the specified cluster. This example adds a Spark step, which is run by the cluster as soon as it is added. :param cluster_id: The ID of the cluster. :param name: The name of the step. :param script_uri: The URI where the Python script is stored. :param script_args: Arguments to pass to the Python script. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly added step. """ try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[ { "Name": name, "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", script_uri, *script_args, ], }, } ], ) step_id = response["StepIds"][0] logger.info("Started step with ID %s", step_id) except ClientError: logger.exception("Couldn't start step %s with URI %s.", name, script_uri) raise else: return step_id在集群上作为任务步骤运行 Amazon EMR 文件系统 (EMRFS) 命令。这可用于在集群上自动执行 EMRFS 命令,而不是通过 SSH 连接手动运行命令。
import boto3 from botocore.exceptions import ClientError def add_emrfs_step(command, bucket_url, cluster_id, emr_client): """ Add an EMRFS command as a job flow step to an existing cluster. :param command: The EMRFS command to run. :param bucket_url: The URL of a bucket that contains tracking metadata. :param cluster_id: The ID of the cluster to update. :param emr_client: The Boto3 Amazon EMR client object. :return: The ID of the added job flow step. Status can be tracked by calling the emr_client.describe_step() function. """ job_flow_step = { "Name": "Example EMRFS Command Step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": ["/usr/bin/emrfs", command, bucket_url], }, } try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[job_flow_step] ) step_id = response["StepIds"][0] print(f"Added step {step_id} to cluster {cluster_id}.") except ClientError: print(f"Couldn't add a step to cluster {cluster_id}.") raise else: return step_id def usage_demo(): emr_client = boto3.client("emr") # Assumes the first waiting cluster has EMRFS enabled and has created metadata # with the default name of 'EmrFSMetadata'. cluster = emr_client.list_clusters(ClusterStates=["WAITING"])["Clusters"][0] add_emrfs_step( "sync", "s3://elasticmapreduce/samples/cloudfront", cluster["Id"], emr_client ) if __name__ == "__main__": usage_demo()-
有关 API 的详细信息,请参阅适用AddJobFlowSteps于 Python 的AWS SDK (Boto3) API 参考。
-
- SAP ABAP
-
- 适用于 SAP ABAP 的 SDK
-
注意
还有更多相关信息 GitHub。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 TRY. " Build args list for Spark submit DATA lt_args TYPE /aws1/cl_emrxmlstringlist_w=>tt_xmlstringlist. APPEND NEW /aws1/cl_emrxmlstringlist_w( 'spark-submit' ) TO lt_args. APPEND NEW /aws1/cl_emrxmlstringlist_w( '--deploy-mode' ) TO lt_args. APPEND NEW /aws1/cl_emrxmlstringlist_w( 'cluster' ) TO lt_args. APPEND NEW /aws1/cl_emrxmlstringlist_w( iv_script_uri ) TO lt_args. APPEND LINES OF it_script_args TO lt_args. " Create step configuration DATA(lo_hadoop_jar_step) = NEW /aws1/cl_emrhadoopjarstepcfg( iv_jar = 'command-runner.jar' it_args = lt_args ). DATA(lo_step_config) = NEW /aws1/cl_emrstepconfig( iv_name = iv_name iv_actiononfailure = 'CONTINUE' io_hadoopjarstep = lo_hadoop_jar_step ). DATA lt_steps TYPE /aws1/cl_emrstepconfig=>tt_stepconfiglist. APPEND lo_step_config TO lt_steps. DATA(lo_result) = lo_emr->addjobflowsteps( iv_jobflowid = iv_cluster_id it_steps = lt_steps ). " Get first step ID DATA(lt_step_ids) = lo_result->get_stepids( ). READ TABLE lt_step_ids INDEX 1 INTO DATA(lo_step_id_obj). IF sy-subrc = 0. ov_step_id = lo_step_id_obj->get_value( ). MESSAGE |Step added with ID { ov_step_id }| TYPE 'I'. ENDIF. CATCH /aws1/cx_emrinternalservererr INTO DATA(lo_internal_error). DATA(lv_error) = lo_internal_error->if_message~get_text( ). MESSAGE lv_error TYPE 'E'. ENDTRY.-
有关 API 的详细信息,请参阅适用AddJobFlowSteps于 S AP 的AWS SDK ABAP API 参考。
-
操作
DescribeCluster