Spark Connect のサポート
Spark Connect は Apache Spark 向けのクライアントサーバーアーキテクチャであり、アプリケーションクライアントを Spark クラスターのドライバープロセスから切り離すことで、サポートされているクライアントから Spark へのリモート接続を可能にします。Spark Connect では、開発中に好みの IDE/クライアントからインタラクティブデバッグを直接実行することもできます。
Apache Spark バージョン 3.5 以降のリリースバージョンでは、Athena は GetSessionEndpoint API を使用してアクセスできる AWS エンドポイントとして Spark Connect をサポートしています。
API/CLI の例(GetSessionEndpoint)
GetSessionEndpoint API を使用して、インタラクティブセッションの Spark Connect エンドポイントを取得できます。
aws athena get-session-endpoint \ --region "REGION" \ --session-id "SESSION_ID"
この API は、そのセッションの Spark Connect エンドポイント URL を返します。
{ "EndpointUrl": "ENDPOINT_URL", "AuthToken": "AUTH_TOKEN", "AuthTokenExpirationTime": "AUTH_TOKEN_EXPIRY_TIME" }
セルフマネージドクライアントからの接続
セルフマネージドクライアントから Athena Spark インタラクティブセッションに接続できます。
前提条件
Spark 3.5.6 および AWS SDK for Python 用の pyspark-connect クライアントをインストールします。
pip install --user pyspark[connect]==3.5.6 pip install --user boto3
以下に示すのは、リクエストをセッションエンドポイントに直接送信するための Python スクリプトのサンプルです。
import boto3 import time from pyspark.sql import SparkSession client = boto3.client('athena', region_name='<REGION>') # start the session response = client.start_session( WorkGroup='<WORKGROUP_NAME>', EngineConfiguration={} ) # wait for the session endpoint to be ready time.sleep(5) response = client.get_session_endpoint(SessionId=session_id) # construct the authenticated remote url authtoken=response['AuthToken'] endpoint_url=response['EndpointUrl'] endpoint_url=endpoint_url.replace("https", "sc")+":443/;use_ssl=true;" url_with_headers = ( f"{endpoint_url}" f"x-aws-proxy-auth={authtoken}" ) # start the Spark session start_time = time.time() spark = SparkSession.builder\ .remote(url_with_headers)\ .getOrCreate() spark.version # # Enter your spark code here # # stop the Spark session spark.stop()
以下に示すのは、セッションのライブ Spark UI または Spark History Server にアクセスするための Python スクリプトの例です。
Region='<REGION>' WorkGroupName='<WORKGROUP_NAME>' SessionId='<SESSION_ID>' Partition='aws' Account='<ACCOUNT_NUMBER>' SessionARN=f"arn:{Partition}:athena:{Region}:{Account}:workgroup/{WorkGroupName}/session/{SessionId}" # invoke the API to get the live UI/persistence UI for a session response = client.get_resource_dashboard( ResourceARN=SessionARN ) response['Url']