

# Spark Connect 支持
<a name="notebooks-spark-connect"></a>

Spark Connect 是 Apache Spark 的客户端-服务器架构，它将应用程序客户端与 Spark 集群的驱动程序进程分离，从而允许支持的客户端远程连接到 Spark。Spark Connect 还支持在开发过程中直接从您最喜欢的 IDE/客户端进行交互式调试。

从 Apache Spark 3.5 发行版起，Athena 支持 Spark Connect 作为可通过 `GetSessionEndpoint` API 访问的 AWS 端点。

## API/CLI 示例（GetSessionEndpoint）
<a name="notebooks-spark-connect-api-examples"></a>

您可以使用 `GetSessionEndpoint` API 获取交互式会话的 Spark Connect 端点。

```
aws athena get-session-endpoint \
  --region "REGION" \
  --session-id "SESSION_ID"
```

此 API 将返回该会话的 Spark Connect 端点 URL。

```
{
  "EndpointUrl": "ENDPOINT_URL",
  "AuthToken": "AUTH_TOKEN",
  "AuthTokenExpirationTime": "AUTH_TOKEN_EXPIRY_TIME"
}
```

## 从自主管理型客户端连接
<a name="notebooks-spark-connect-self-managed"></a>

您可以从自主管理型客户端连接到 Athena Spark 交互式会话。

### 先决条件
<a name="notebooks-spark-connect-prerequisites"></a>

安装适用于 Spark 3.5.6 的 pyspark-connect 客户端和适用于 Python 的 AWS SDK。

```
pip install --user pyspark[connect]==3.5.6
pip install --user boto3
```

下面是一个直接向会话端点发送请求的 Python 脚本示例：

```
import boto3
import time
from pyspark.sql import SparkSession

client = boto3.client('athena', region_name='<REGION>')

# start the session
response = client.start_session(
    WorkGroup='<WORKGROUP_NAME>',
    EngineConfiguration={}
)

# wait for the session endpoint to be ready
time.sleep(5)
response = client.get_session_endpoint(SessionId=session_id)

# construct the authenticated remote url
authtoken=response['AuthToken']
endpoint_url=response['EndpointUrl']
endpoint_url=endpoint_url.replace("https", "sc")+":443/;use_ssl=true;"
url_with_headers = (
    f"{endpoint_url}"
    f"x-aws-proxy-auth={authtoken}"
)

# start the Spark session
start_time = time.time()
spark = SparkSession.builder\
    .remote(url_with_headers)\
    .getOrCreate()
 
spark.version 

#
# Enter your spark code here
#

# stop the Spark session
spark.stop()
```

以下是访问会话的实时 Spark UI 或 Spark History Server 的 Python 脚本示例：

```
Region='<REGION>'
WorkGroupName='<WORKGROUP_NAME>'
SessionId='<SESSION_ID>'
Partition='aws'
Account='<ACCOUNT_NUMBER>'

SessionARN=f"arn:{Partition}:athena:{Region}:{Account}:workgroup/{WorkGroupName}/session/{SessionId}"

# invoke the API to get the live UI/persistence UI for a session
response = client.get_resource_dashboard(
    ResourceARN=SessionARN
)
response['Url']
```