

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 사용자 정의 함수 최적화
<a name="optimize-user-defined-functions"></a>

PySpark의 사용자 정의 함수(UDF) 및 `RDD.map`은 종종 성능을 크게 저하시킬 수 있습니다. Spark의 기본 Scala 구현에서 Python 코드를 정확하게 나타내는 데 필요한 오버헤드 때문입니다.

다음 다이어그램은 의 아키텍처입니다. PySpark를 사용하는 경우 Spark 드라이버는 Py4j 라이브러리를 사용하여 Python에서 Java 메서드를 직접 호출합니다. Spark SQL 또는 DataFrame 기본 제공 함수를 직접 호출할 때 함수는 최적화된 실행 계획을 사용하여 각 실행기의 JVM에서 실행되므로 Python과 Scala 간에 성능 차이가 거의 없습니다.



![\[Spark 컨텍스트는 Py4J를 사용하여 Spark 드라이버에 연결하고 드라이버는 워커 노드에 연결합니다.\]](http://docs.aws.amazon.com/ko_kr/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/worker-nodes.png)


`map/ mapPartitions/ udf` 사용 등 자체 Python 로직을 사용하는 경우 태스크는 Python 런타임 환경에서 실행됩니다. 두 환경을 관리하면 오버헤드 비용이 발생합니다. 또한 JVM 런타임 환경의 기본 제공 함수에서 사용할 수 있도록 메모리의 데이터를 변환해야 합니다. *Pickle*은 JVM과 Python 런타임 간 교환에 기본적으로 사용되는 직렬화 형식입니다. 그러나 이 직렬화 및 역직렬화 비용은 매우 높으므로 Java 또는 Scala로 작성된 UDF는 Python UDF보다 더 빠릅니다.

PySpark에서 직렬화 및 역직렬화 오버헤드를 방지하려면 다음을 고려합니다.
+ **기본 제공 Spark SQL 함수 사용** - 자체 UDF 또는 맵 함수를 Spark SQL 또는 DataFrame 기본 제공 함수로 대체하는 방법을 고려합니다. Spark SQL 또는 DataFrame 기본 제공 함수를 실행할 때 태스크는 각 실행기의 JVM에서 처리되므로 Python과 Scala 간에 성능 차이가 거의 없습니다.
+ **Scala 또는 Java에서 UDF 구현** - JVM에서 실행되므로 Java 또는 Scala로 작성된 UDF를 사용하는 방법을 고려합니다.
+ **벡터화된 워크로드에 Apache Arrow 기반 UDF 사용** - Arrow 기반 UDF를 사용하는 방법을 고려합니다. 이 기능을 벡터화된 UDF(Pandas UDF)라고도 합니다. [Apache Arrow](https://arrow.apache.org/)는 JVM과 Python 프로세스 간에 데이터를 효율적으로 전송하는 데 사용할 AWS Glue 수 있는 언어에 구애받지 않는 인 메모리 데이터 형식입니다. 이는 현재 Pandas 또는 NumPy 데이터로 작업하는 Python 사용자에게 가장 유용합니다.

  Arrow는 열 형식(벡터화된 형식)입니다. 이 기능의 사용은 자동이 아니며 최대한 활용하고 호환성을 보장하기 위해 구성 또는 코드를 약간 변경해야 할 수 있습니다. 자세한 내용과 제한 사항은 [Apache Arrow in PySpark](https://spark.apache.org/docs/latest/api/python/tutorial/sql/arrow_pandas.html)를 참조하세요.

  다음 예제에서는 표준 Python, 벡터화된 UDF 및 Spark SQL에서 기본 증분 UDF를 비교합니다.

## 표준 Python UDF
<a name="python-udf"></a>

예제 시간은 3.20(초)입니다.

**예제 코드**

```
# DataSet
df = spark.range(10000000).selectExpr("id AS a","id AS b")

# UDF Example
def plus(a,b):
    return a+b
spark.udf.register("plus",plus)

df.selectExpr("count(plus(a,b))").collect()
```

**실행 계획**

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(pythonUDF0#124)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580]
+- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)])
+- Project [pythonUDF0#124]
+- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124]
+- Project [id#114L AS a#116L, id#114L AS b#117L]
+- Range (0, 10000000, step=1, splits=16)
```

## 벡터화된 UDF
<a name="vectorized-udf"></a>

예제 시간은 0.59(초)입니다.

벡터화된 UDF는 이전 UDF 예제보다 5배 빠릅니다. `Physical Plan`을 확인하면 `ArrowEvalPython`를 볼 수 있으며, 이는 이 애플리케이션이 Apache Arrow에 의해 벡터화되었음을 의미합니다. 벡터화된 UDF를 활성화하려면 코드에서 `spark.sql.execution.arrow.pyspark.enabled = true`를 지정해야 합니다.

**예제 코드**

```
# Vectorized UDF
from pyspark.sql.types import LongType
from pyspark.sql.functions import count, pandas_udf

# Enable Apache Arrow Support
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# DataSet
df = spark.range(10000000).selectExpr("id AS a","id AS b")

# Annotate pandas_udf to use Vectorized UDF
@pandas_udf(LongType())
def pandas_plus(a,b):
    return a+b
spark.udf.register("pandas_plus",pandas_plus)

df.selectExpr("count(pandas_plus(a,b))").collect()
```

**실행 계획**

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985]
+- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L])
+- Project [pythonUDF0#1082L]
+- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200
+- Project [id#1072L AS a#1074L, id#1072L AS b#1075L]
+- Range (0, 10000000, step=1, splits=16)
```

## Spark SQL
<a name="spark-sql"></a>

예제 시간은 0.087(초)입니다.

Python 런타임 없이 각 실행기의 JVM에서 태스크가 실행되므로 Spark SQL은 벡터화된 UDF보다 훨씬 빠릅니다. UDF를 기본 제공 함수로 교체할 수 있는 경우 교체하는 것이 좋습니다.

**예제 코드**

```
df.createOrReplaceTempView("test")
spark.sql("select count(a+b) from test").collect()
```

## 빅 데이터에 panda 사용
<a name="pandas"></a>

이미 [pandas](https://pandas.pydata.org/docs/)에 익숙하고 빅 데이터에 Spark를 사용하려는 경우 Spark. AWS Glue 4.0 이상에서 pandas API를 사용할 수 있습니다. 시작하기 위해 공식 노트북 [Quickstart: Pandas API on Spark](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html)를 사용할 수 있습니다. 자세한 내용은 [PySpark 설명서](https://spark.apache.org/docs/latest/api/python/index.html)를 참조하세요.