

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將檔案和 Python 程式庫匯入 Athena for Spark
<a name="notebooks-import-files-libraries"></a>

本文件提供了如何將檔案和 Python 程式庫匯入 Amazon Athena for Apache Spark 的範例。

## 考量事項與限制
<a name="notebooks-import-files-libraries-considerations-limitations"></a>
+ **Python 版本** – 目前，Athena for Spark 使用 Python 版本 3.9.16。請注意，Python 套件對 Python 次要版本敏感。
+ **Athena for Spark 架構** – Athena for Spark 在 ARM64 架構上使用 Amazon Linux 2。請注意，某些 Python 程式庫不會為此架構散發二進位檔。
+ **二進位共用物件 (SOS)** – 因為 SparkContext [addPyFile](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.addPyFile.html) 方法不會偵測二進位共用物件，因此無法在 Athena 中使用它來新增依賴於共用物件的 Python 套件。
+ **彈性分散式資料集 (RDD)** – 不支援 [RDD](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html)。
+ **資料框架** – 不支援 PySpark [DataFrame.foreach](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreach.html) 方法。

## 範例
<a name="notebooks-import-files-libraries-examples"></a>

這些範例使用下列慣例。
+ 預留位置 Amazon S3 位置 `s3://amzn-s3-demo-bucket`。將其替換成您自己的 S3 儲存貯體位置。
+ 從 Unix Shell 執行的所有程式碼區塊顯示為 *directory\$1name* `$`。例如，目錄 `/tmp` 中的命令 `ls` 及其輸出如下所示：

  ```
  /tmp $ ls
  ```

  **輸出**

  ```
  file1 file2
  ```

## 匯入用於計算的文字檔案
<a name="notebooks-import-files-libraries-importing-text-files"></a>

本節中的範例說明如何在 Athena for Spark 的筆記本中匯入用於計算的文字檔案。

### 將檔案寫入本機暫時目錄後，再將其新增至筆記本
<a name="notebooks-import-files-libraries-adding-a-file-to-a-notebook-temporary-directory"></a>

下列範例會顯示如何將檔案寫入本機暫存目錄、將其新增至筆記本，然後進行測試。

```
import os
from pyspark import SparkFiles
tempdir = '/tmp/'
path = os.path.join(tempdir, "test.txt")
with open(path, "w") as testFile:
    _ = testFile.write("5")
sc.addFile(path)

def func(iterator):
    with open(SparkFiles.get("test.txt")) as testFile:
        fileVal = int(testFile.readline())
        return [x * fileVal for x in iterator]

#Test the file
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

udf_with_import = udf(func)
df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("col", udf_with_import(col('_2'))).show()
```

**輸出**

```
Calculation completed.
+---+---+-------+
| _1| _2|    col|
+---+---+-------+
|  1|  a|[aaaaa]|
|  2|  b|[bbbbb]|
+---+---+-------+
```

### 從 Amazon S3 匯入檔案
<a name="notebooks-import-files-libraries-importing-a-file-from-s3"></a>

下列範例會示範如何將檔案從 Amazon S3 匯入筆記本中，然後進行測試。

**若要將檔案從 Amazon S3 匯入筆記本**

1. 建立一個名為 `test.txt` 的檔案，其中具有包含值 `5` 的單行。

1. 將檔案新增到 Amazon S3 中的儲存貯體。此範例使用位置 `s3://amzn-s3-demo-bucket`。

1. 使用以下程式碼，將檔案匯入您的筆記本並測試檔案。

   ```
   from pyspark import SparkFiles
   sc.addFile('s3://amzn-s3-demo-bucket/test.txt')
   
   def func(iterator):
      with open(SparkFiles.get("test.txt")) as testFile:
          fileVal = int(testFile.readline())
          return [x * fileVal for x in iterator]
          
   #Test the file
   from pyspark.sql.functions import udf
   from pyspark.sql.functions import col
   
   udf_with_import = udf(func)
   df = spark.createDataFrame([(1, "a"), (2, "b")])
   df.withColumn("col", udf_with_import(col('_2'))).show()
   ```

   **輸出**

   ```
   Calculation completed.
   +---+---+-------+
   | _1| _2|    col|
   +---+---+-------+
   |  1|  a|[aaaaa]|
   |  2|  b|[bbbbb]|
   +---+---+-------+
   ```

## 新增 Python 檔案
<a name="notebooks-import-files-libraries-adding-python-files"></a>

本節中的範例說明如何將 Python 檔案和程式庫新增到 Athena 中的 Spark 筆記本。

### 新增 Python 檔案並註冊 UDF
<a name="notebooks-import-files-libraries-adding-python-files-and-registering-a-udf"></a>

下列範例會示範如何將 Python 檔案從 Amazon S3 匯入您的筆記本，並註冊 UDF。

**若要將 Python 檔案新增至您的筆記本並註冊 UDF**

1. 使用您自己的 Amazon S3 位置，建立包含下列內容的檔案 `s3://amzn-s3-demo-bucket/file1.py`：

   ```
   def xyz(input):
       return 'xyz  - udf ' + str(input);
   ```

1. 在同一個 S3 位置中，建立包含下列內容的檔案 `s3://amzn-s3-demo-bucket/file2.py`：

   ```
   from file1 import xyz
   def uvw(input):
       return 'uvw -> ' + xyz(input);
   ```

1. 在 Athena for Spark 筆記本中，執行下列命令。

   ```
   sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py')
   sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py')
   
   def func(iterator):
       from file2 import uvw
       return [uvw(x) for x in iterator]
   
   from pyspark.sql.functions import udf
   from pyspark.sql.functions import col
   
   udf_with_import = udf(func)
   
   df = spark.createDataFrame([(1, "a"), (2, "b")])
   
   df.withColumn("col", udf_with_import(col('_2'))).show(10)
   ```

   **輸出**

   ```
   Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status...
   Calculation completed.
   +---+---+--------------------+
   | _1| _2|                 col|
   +---+---+--------------------+
   | 1 |  a|[uvw -> xyz - ud... |
   | 2 |  b|[uvw -> xyz - ud... |
   +---+---+--------------------+
   ```

### 匯入 Python .zip 檔案
<a name="notebooks-import-files-libraries-importing-a-python-zip-file"></a>

您可以使用 Python `addPyFile` 和 `import` 方法將 Python .zip 檔案匯入您的筆記本。

**注意**  
您匯入至 Athena Spark 的 `.zip` 檔案可能只包含 Python 套件。例如，不支援包含 C 型檔案的套件。

**若要將 Python `.zip` 檔案匯入您的筆記本**

1. 在您的本機電腦上的桌面目錄中 (如 `\tmp`)，建立名為 `moduletest` 的目錄。

1. 在 `moduletest` 目錄中，建立名為 `hello.py` 的檔案，內含下列內容：

   ```
   def hi(input):
       return 'hi ' + str(input);
   ```

1. 在同一個目錄中，新增名稱為 `__init__.py` 的空檔案。

   如果您列出目錄內容，則其現在應如下所示。

   ```
   /tmp $ ls moduletest
   __init__.py       hello.py
   ```

1. 使用 `zip` 命令將兩個模組檔案放入名為 `moduletest.zip` 的檔案。

   ```
   moduletest $ zip -r9 ../moduletest.zip *
   ```

1. 將 `.zip` 檔案上傳至 Amazon S3 中的儲存貯體。

1. 使用以下程式碼，將 Python`.zip` 檔案匯入您的筆記本。

   ```
   sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip')
   
   from moduletest.hello import hi
   
   from pyspark.sql.functions import udf
   from pyspark.sql.functions import col
   
   hi_udf = udf(hi)
   
   df = spark.createDataFrame([(1, "a"), (2, "b")])
   
   df.withColumn("col", hi_udf(col('_2'))).show()
   ```

   **輸出**

   ```
   Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status...
   Calculation completed.
   +---+---+----+
   | _1| _2| col|
   +---+---+----+
   |  1|  a|hi a|
   |  2|  b|hi b|
   +---+---+----+
   ```

### 將 Python 程式庫的兩個版本匯入為單獨模組
<a name="notebooks-import-files-libraries-importing-two-library-versions"></a>

下列程式碼範例說明如何從 Amazon S3 中的某個位置新增和匯入兩個不同版本的 Python 程式庫做為兩個獨立的模組。程式碼會從 S3 新增每個程式庫檔案、匯入檔案，然後列印程式庫版本以驗證匯入。

```
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip')
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip')

import simplejson_v3_15
print(simplejson_v3_15.__version__)
```

**輸出**

```
3.15.0
```

```
import simplejson_v3_17_6
print(simplejson_v3_17_6.__version__)
```

**輸出**

```
3.17.6
```

### 從 PyPI 匯入 Python .zip 檔案
<a name="notebooks-import-files-libraries-importing-a-python-zip-file-from-a-github-project"></a>

此範例使用 `pip` 命令從 [Python Package Index (PyPI)](https://pypi.org/) 下載 [bpabel/piglatin](https://github.com/bpabel/piglatin) 專案的 Python .zip 檔案。

**若要從 PyPI 匯入 Python .zip 檔案**

1. 在本機桌面上，使用下列命令建立名為 `testpiglatin` 的目錄並建立虛擬環境。

   ```
   /tmp $ mkdir testpiglatin
   /tmp $ cd testpiglatin
   testpiglatin $ virtualenv .
   ```

   **輸出**

   ```
   created virtual environment CPython3.9.6.final.0-64 in 410ms
   creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False)
   seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv)
   added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1
   activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
   ```

1. 建立名為 `unpacked` 的子目錄，以啟動項目。

   ```
   testpiglatin $ mkdir unpacked
   ```

1. 使用 `pip` 命令，將專案安裝到 `unpacked` 目錄。

   ```
   testpiglatin $ bin/pip install -t $PWD/unpacked piglatin
   ```

   **輸出**

   ```
   Collecting piglatin
   Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB)
   Installing collected packages: piglatin
   Successfully installed piglatin-1.0.6
   ```

1. 檢查目錄的內容。

   ```
   testpiglatin $ ls
   ```

   **輸出**

   ```
   bin lib pyvenv.cfg unpacked
   ```

1. 變更為 `unpacked` 目錄並顯示內容。

   ```
   testpiglatin $ cd unpacked
   unpacked $ ls
   ```

   **輸出**

   ```
   piglatin piglatin-1.0.6.dist-info
   ```

1. 使用 `zip` 命令，將 piglatin 專案的內容放入名為 `library.zip` 的檔案。

   ```
   unpacked $ zip -r9 ../library.zip *
   ```

   **輸出**

   ```
   adding: piglatin/ (stored 0%)
   adding: piglatin/__init__.py (deflated 56%)
   adding: piglatin/__pycache__/ (stored 0%)
   adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%)
   adding: piglatin-1.0.6.dist-info/ (stored 0%)
   adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%)
   adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%)
   adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%)
   adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%)
   adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%)
   adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
   ```

1. (選用) 使用以下命令在本機測試匯入。

   1. 將 Python 路徑設定為 `library.zip` 檔案位置並啟動 Python。

      ```
      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip 
      /home $ python3
      ```

      **輸出**

      ```
      Python 3.9.6 (default, Jun 29 2021, 06:20:32)
      [Clang 12.0.0 (clang-1200.0.32.29)] on darwin
      Type "help", "copyright", "credits" or "license" for more information.
      ```

   1. 匯入程式庫並執行測試命令。

      ```
      >>> import piglatin
      >>> piglatin.translate('hello')
      ```

      **輸出**

      ```
      'ello-hay'
      ```

1. 使用如下命令，從 Amazon S3 新增 `.zip` 檔案，將檔案匯入 Athena 中的筆記本，然後對其進行測試。

   ```
   sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip')
   
   import piglatin
   piglatin.translate('hello')
   
   from pyspark.sql.functions import udf
   from pyspark.sql.functions import col
   
   hi_udf = udf(piglatin.translate)
   
   df = spark.createDataFrame([(1, "hello"), (2, "world")])
   
   df.withColumn("col", hi_udf(col('_2'))).show()
   ```

   **輸出**

   ```
   Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status...
   Calculation completed.
   +---+-----+--------+
   | _1|   _2|     col|
   +---+-----+--------+
   |  1|hello|ello-hay|
   |  2|world|orld-way|
   +---+-----+--------+
   ```

### 從具有相依性的 PyPI 匯入 Python .zip 檔案
<a name="notebooks-import-files-libraries-importing-a-python-zip-file-with-dependencies"></a>

此範例從 PyPI 匯入 [md2gemini](https://github.com/makeworld-the-better-one/md2gemini) 套件，其中該套件會將 Markdown 格式的文字轉換為 [Gemini](https://gemini.circumlunar.space/) 文字格式。套件具有以下[相依性](https://libraries.io/pypi/md2gemini)：

```
cjkwrap
mistune
wcwidth
```

**若要匯入具有相依性的 Python .zip 檔案**

1. 在本機電腦上，使用下列命令建立名為 `testmd2gemini` 的目錄並建立虛擬環境。

   ```
   /tmp $ mkdir testmd2gemini
   /tmp $ cd testmd2gemini
   testmd2gemini$ virtualenv .
   ```

1. 建立名為 `unpacked` 的子目錄，以啟動項目。

   ```
   testmd2gemini $ mkdir unpacked
   ```

1. 使用 `pip` 命令，將專案安裝到 `unpacked` 目錄。

   ```
   /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini
   ```

   **輸出**

   ```
   Collecting md2gemini
     Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB)
   Collecting wcwidth
     Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB)
   Collecting mistune<3,>=2.0.0
     Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB)
   Collecting cjkwrap
     Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB)
   Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini
   Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5
   ...
   ```

1. 變更為 `unpacked` 目錄並檢查內容。

   ```
   testmd2gemini $ cd unpacked
   unpacked $ ls -lah
   ```

   **輸出**

   ```
   total 16
   drwxr-xr-x  13 user1  wheel   416B Jun  7 18:43 .
   drwxr-xr-x   8 user1  wheel   256B Jun  7 18:44 ..
   drwxr-xr-x   9 user1  staff   288B Jun  7 18:43 CJKwrap-2.2.dist-info
   drwxr-xr-x   3 user1  staff    96B Jun  7 18:43 __pycache__
   drwxr-xr-x   3 user1  staff    96B Jun  7 18:43 bin
   -rw-r--r--   1 user1  staff   5.0K Jun  7 18:43 cjkwrap.py
   drwxr-xr-x   7 user1  staff   224B Jun  7 18:43 md2gemini
   drwxr-xr-x  10 user1  staff   320B Jun  7 18:43 md2gemini-1.9.0.dist-info
   drwxr-xr-x  12 user1  staff   384B Jun  7 18:43 mistune
   drwxr-xr-x   8 user1  staff   256B Jun  7 18:43 mistune-2.0.2.dist-info
   drwxr-xr-x  16 user1  staff   512B Jun  7 18:43 tests
   drwxr-xr-x  10 user1  staff   320B Jun  7 18:43 wcwidth
   drwxr-xr-x   9 user1  staff   288B Jun  7 18:43 wcwidth-0.2.5.dist-info
   ```

1. 使用 `zip` 命令，將 md2gemini 專案的內容放入名為 `md2gemini.zip` 的檔案。

   ```
   unpacked $ zip -r9 ../md2gemini *
   ```

   **輸出**

   ```
     adding: CJKwrap-2.2.dist-info/ (stored 0%)
     adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%)
     ....
     adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%)
     adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
   ```

1. (選用) 使用下列指令來測試程式庫是否可在您的本機電腦上運作。

   1. 將 Python 路徑設定為 `md2gemini.zip` 檔案位置並啟動 Python。

      ```
      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip 
      /home python3
      ```

   1. 匯入程式庫並執行測試。

      ```
      >>> from md2gemini import md2gemini
      >>> print(md2gemini('[abc](https://abc.def)'))
      ```

      **輸出**

      ```
      https://abc.def abc
      ```

1. 使用以下命令，從 Amazon S3 新增 `.zip` 檔案，將檔案匯入 Athena 中的筆記本，然後執行 UDF 測試。

   ```
   # (non udf test)
   sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip')
   from md2gemini import md2gemini
   print(md2gemini('[abc](https://abc.def)'))
   ```

   **輸出**

   ```
   Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status...
   Calculation completed.
   => https://abc.def (https://abc.def/) abc
   ```

1. 使用下列命令執行 UDF 測試。

   ```
   # (udf test)
   
   from pyspark.sql.functions import udf
   from pyspark.sql.functions import col
   from md2gemini import md2gemini
   
   
   hi_udf = udf(md2gemini)
   df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")])
   df.withColumn("col", hi_udf(col('_2'))).show()
   ```

   **輸出**

   ```
   Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status...
   Calculation completed.
   +---+--------------------+--------------------+
   | _1|                  _2|                 col|
   +---+--------------------+--------------------+
   |  1|[first website](h...|=> https://abc.de...|
   |  2|[second website](...|=> https://aws.co...|
   +---+--------------------+--------------------+
   ```