

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# ユーザー定義関数を最適化する
<a name="optimize-user-defined-functions"></a>

PySpark のユーザー定義関数 (UDF) と `RDD.map` は、パフォーマンスを大幅に低下させることがあります。これは、Spark の基盤となる Scala 実装で Python コードを正確に表現するために必要なオーバーヘッドが原因です。

PySpark ジョブのアーキテクチャを次の図に示します。PySpark を使用すると、Spark ドライバーは Py4j ライブラリを使用して Python から Java メソッドを呼び出します。Spark SQL または DataFrame の組み込み関数を呼び出す場合、関数は最適化された実行計画を使用して各エグゼキュターの JVM 上で実行されるため、Python と Scala の間でパフォーマンスの差はほとんどありません。



![\[Spark コンテキストは Py4J を使用して Spark ドライバーに接続し、ドライバーはワーカーノードに接続します。\]](http://docs.aws.amazon.com/ja_jp/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/worker-nodes.png)


`map/ mapPartitions/ udf` の使用などを使った独自の Python ロジックを使用すると、タスクは Python ランタイム環境で実行されます。2 つの環境を管理することで、オーバーヘッドコストが発生します。さらに、JVM ランタイム環境の組み込み関数で使用するには、メモリ上のデータを変換する必要があります。*Pickle* は、JVM と Python のランタイム間でデータをやり取りする際に、デフォルトで使用されるシリアル化形式です。ただし、このシリアル化と逆シリアル化のコストは非常に高いため、Java または Scala で記述された UDF の方が、Python の UDF よりも高速です。

PySpark におけるシリアル化と逆シリアル化のオーバーヘッドを回避するには、次の点を検討してください。
+ **Spark SQL の組み込み関数を使用する** – 独自の UDF やマップ関数を Spark SQL または DataFrame の組み込み関数に置き換えることを検討してください。Spark SQL または DataFrame の組み込み関数を実行する場合、タスクは各エグゼキュターの JVM 上で処理されるため、Python と Scala の間でパフォーマンスの差はほとんどありません。
+ **UDF を Scala または Java で実装する** – Java または Scala で記述された UDF は JVM 上で実行されるため、それらの言語による実装を検討してください。
+ **ベクトル化されたワークロードには Apache Arrow ベースの UDF を使用する** – Arrow ベースの UDF の使用を検討してください。この機能は、ベクトル化された UDF (Pandas UDF) とも呼ばれます。[Apache Arrow](https://arrow.apache.org/) は、JVM と Python プロセス間でデータを効率的に転送するために AWS Glue が使用できる言語に依存しないインメモリデータ形式です。これは現在、Pandas や NumPy のデータを扱う Python ユーザーにとって最も有益です。

  Arrow は列指向 (ベクトル化) 形式です。その使用は自動ではなく、最大限に活用したり互換性を確保したりするには、設定やコードにわずかな変更が必要になる可能性があります。詳細と制限については、「[Apache Arrow in PySpark](https://spark.apache.org/docs/latest/api/python/tutorial/sql/arrow_pandas.html)」を参照してください。

  次の例では、標準の Python、ベクトル化された UDF、Spark SQL の基本的な増分 UDF を比較します。

## 標準の Python UDF
<a name="python-udf"></a>

サンプルの時間 3.20 (秒) です。

**コードの例**

```
# DataSet
df = spark.range(10000000).selectExpr("id AS a","id AS b")

# UDF Example
def plus(a,b):
    return a+b
spark.udf.register("plus",plus)

df.selectExpr("count(plus(a,b))").collect()
```

**実行計画**

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(pythonUDF0#124)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580]
+- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)])
+- Project [pythonUDF0#124]
+- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124]
+- Project [id#114L AS a#116L, id#114L AS b#117L]
+- Range (0, 10000000, step=1, splits=16)
```

## ベクトル化された UDF
<a name="vectorized-udf"></a>

サンプルの時間 0.59 (秒) です。

ベクトル化された UDF は、前の UDF のサンプルと比べて 5 倍高速です。`Physical Plan` を確認すると、`ArrowEvalPython` が表示されており、このアプリケーションが Apache Arrow によってベクトル化されていることがわかります。ベクトル化された UDF を有効にするには、コードで `spark.sql.execution.arrow.pyspark.enabled = true` を指定する必要があります。

**コードの例**

```
# Vectorized UDF
from pyspark.sql.types import LongType
from pyspark.sql.functions import count, pandas_udf

# Enable Apache Arrow Support
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# DataSet
df = spark.range(10000000).selectExpr("id AS a","id AS b")

# Annotate pandas_udf to use Vectorized UDF
@pandas_udf(LongType())
def pandas_plus(a,b):
    return a+b
spark.udf.register("pandas_plus",pandas_plus)

df.selectExpr("count(pandas_plus(a,b))").collect()
```

**実行計画**

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985]
+- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L])
+- Project [pythonUDF0#1082L]
+- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200
+- Project [id#1072L AS a#1074L, id#1072L AS b#1075L]
+- Range (0, 10000000, step=1, splits=16)
```

## Spark SQL
<a name="spark-sql"></a>

サンプルの時間 0.087 (秒) です。

タスクが Python ランタイムを介さずに各エグゼキュターの JVM 上で実行されるため、Spark SQL はベクトル化された UDF よりもはるかに高速です。UDF を組み込み関数に置き換えられる場合は、そうすることをお勧めします。

**コードの例**

```
df.createOrReplaceTempView("test")
spark.sql("select count(a+b) from test").collect()
```

## ビッグデータに pandas を使用する
<a name="pandas"></a>

[pandas](https://pandas.pydata.org/docs/) にすでに精通していて、ビッグデータに Spark を使用する場合は、Spark. AWS Glue 4.0 で pandas API を使用できます。開始するには、公式ノートブック「[Quickstart: Pandas API on Spark](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html)」を使用できます。詳細については、[PySpark のドキュメント](https://spark.apache.org/docs/latest/api/python/index.html)を参照してください。