

# ジョブのブックマークを使用する
<a name="programming-etl-connect-bookmarks"></a>

Spark 用の AWS Glue はジョブのブックマークを使用して処理済みのデータを追跡します。ジョブのブックマーク機能の概要とサポート内容については、「[ジョブのブックマークを使用した処理済みデータの追跡](monitor-continuations.md)」を参照してください。ブックマークを使用して AWS Glue ジョブをプログラミングすると、ビジュアルジョブでは実現できない柔軟性が得られます。
+  JDBC から読み込む場合、AWS Glue スクリプトのブックマークキーとして使用する列を指定できます。
+  各メソッド呼び出しに適用する `transformation_ctx` を選択できます。

常に適切に設定されたパラメータでスクリプトの先頭に `job.init` を呼び出して、末尾に `job.commit` を呼び出します。これら 2 つの関数は、ブックマークサービスを初期化し、サービスの状態変化を更新します。ブックマークは呼び出さないと機能しません。

## ブックマークキーの指定
<a name="programming-etl-connect-bookmarks-columns"></a>

JDBC ワークフローの場合、ブックマークはキーフィールドの値をブックマークされた値と比較することで、ジョブがどの行を読み込んたかを追跡します。これは Amazon S3 ワークフローには必要ありませんし、適用可能でもありません。ビジュアルエディタを使用せずに AWS Glue スクリプトを記述する場合、どの列をブックマークで追跡するかを指定できます。複数の列を指定することもできます。ユーザー定義のブックマークキーを指定する場合、値の順序にギャップがあってもかまいません。

**警告**  
ユーザー定義のブックマークキーを使用する場合、キーはそれぞれ厳密に一定間隔で増減する必要があります。複合キーに追加のフィールドを選択しても、「マイナーバージョン」や「リビジョン番号」などの概念のフィールドは、値がデータセット全体で再利用されるため、この基準を満たしません。

`jobBookmarkKeys` および `jobBookmarkKeysSortOrder` は以下の方法で指定できます。
+ `create_dynamic_frame.from_catalog` — `additional_options` を使用する。
+ `create_dynamic_frame.from_options` — `connection_options` を使用する。

## 変換コンテキスト
<a name="monitor-continuations-implement-context"></a>

AWS Glue PySpark の動的フレームの多くのメソッドには、`transformation_ctx` というオプションのパラメータが含まれています。このパラメータは ETL 演算子インスタンスの一意の識別子です。`transformation_ctx` パラメータは指定された演算子に対するジョブのブックマーク内の状態情報を識別するために使用されます。具体的には、AWS Glue では `transformation_ctx` を使用してブックマーク状態に対するキーにインデックスを付けます。

**警告**  
`transformation_ctx` は、スクリプト内の特定のソースのブックマーク状態を検索するためのキーとして機能します。ブックマークを正しく機能させるためには、ソースと関連する `transformation_ctx` の一貫性を常に保つ必要があります。ソースプロパティを変更したり、`transformation_ctx` の名前を変更したりすると、前のブックマークが無効になり、タイムスタンプベースのフィルタリングで正しい結果が得られない場合があります。

ジョブのブックマークが正しく機能するように、ジョブのブックマークのパラメータを有効にし、`transformation_ctx` パラメータを設定します。`transformation_ctx` パラメータを渡さない場合、メソッドで使用されている動的フレームやテーブルに対してジョブのブックマークは有効になりません。例えば、ETL ジョブで 2 つの Amazon S3 ソースを読み取って結合する場合、ブックマークを有効にするメソッドに対してのみ `transformation_ctx` パラメータを渡すことができます。1 つのジョブについてジョブのブックマークをリセットした場合、`transformation_ctx` の使用には関係なく、このジョブに関連付けられているすべての変換がリセットされます。

`DynamicFrameReader` クラスの詳細については、「[DynamicFrameReader クラス](aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.md)」を参照してください。PySpark 拡張の詳細については、「[AWS Glue PySpark 拡張機能リファレンス](aws-glue-programming-python-extensions.md)」を参照してください 

## 例
<a name="monitor-continuations-implement-examples"></a>

**Example**  
Amazon S3 データソース用に生成されたスクリプトの例を次に示します。ジョブのブックマークを使用するために必要なスクリプトの部分は、斜体で表示されています。これらの要素の詳細については、[GlueContext クラス](aws-glue-api-crawler-pyspark-extensions-glue-context.md) API および [DynamicFrameWriter クラス](aws-glue-api-crawler-pyspark-extensions-dynamic-frame-writer.md) API を参照してください。  

```
# Sample Script
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "database",
    table_name = "relatedqueries_csv",
    transformation_ctx = "datasource0"
)

applymapping1 = ApplyMapping.apply(
    frame = datasource0,
    mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")],
    transformation_ctx = "applymapping1"
)

datasink2 = glueContext.write_dynamic_frame.from_options(
    frame = applymapping1,
    connection_type = "s3",
    connection_options = {"path": "s3://input_path"},
    format = "json",
    transformation_ctx = "datasink2"
)


job.commit()
```

**Example**  
JDBC ソース用に生成されたスクリプトの例を以下に示します。ソーステーブルは、`empno` 列がプライマリキーである従業員テーブルです。デフォルトでは、ブックマークキーが指定されていない場合、ジョブはブックマークキーとしてシーケンシャルプライマリキーを使用しますが、`empno` は必ずしもシーケンシャルではなく、値にギャップがある可能性があるため、デフォルトのブックマークキーとして適切ではありません。したがって、スクリプトは `empno` を明示的にブックマークキーとして指定します。コードのその部分は、斜体で表示されます。  

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "hr",
    table_name = "emp",
    transformation_ctx = "datasource0",
    additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}
)

applymapping1 = ApplyMapping.apply(
    frame = datasource0,
    mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")],
    transformation_ctx = "applymapping1"
)

datasink2 = glueContext.write_dynamic_frame.from_options(
    frame = applymapping1,
    connection_type = "s3",
    connection_options = {"path": "s3://hr/employees"},
    format = "csv",
    transformation_ctx = "datasink2"
)

job.commit()
```