

# AWS Glue 스키마 레지스트리
<a name="schema-registry"></a>

**참고**  
AWS Glue Schema Registry가 중동(UAE) 리전의 AWS Glue 콘솔에서 지원되지 않습니다.

AWS Glue 스키마 레지스트리를 사용하면 데이터 스트림 스키마를 중앙에서 검색, 제어 및 발전시킬 수 있습니다. *스키마*는 데이터 레코드의 구조와 포맷을 정의합니다. AWS Glue 스키마 레지스트리를 사용하면 Apache Kafka, [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/), [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/), [Apache Flink용 Amazon Managed Service for Apache Flink](https://aws.amazon.com/kinesis/data-analytics/), [AWS Lambda](https://aws.amazon.com/lambda/)와의 편리한 통합을 사용하여 데이터 스트리밍 애플리케이션에서 스키마를 관리하고 적용할 수 있습니다.

스키마 레지스트리는 AVRO(v1.11.4) 데이터 형식, [Everit 라이브러리](https://github.com/everit-org/json-schema)를 사용한 JSON 스키마 검증이 포함된 스키마에 대한 [JSON 스키마 포맷](https://json-schema.org/)을 사용하는 JSON 데이터 형식(사양 Draft-04, Draft-06 및 Draft-07), `extensions` 또는 `groups`에 대한 지원이 없는 프로토콜 버퍼(Protobuf) 버전 proto2 및 proto3, 기타 데이터 형식과 언어를 제공할 예정인 Java 언어 지원 등을 지원합니다. 지원되는 기능에는 호환성, 메타데이터를 통한 스키마 소싱, 스키마 자동 등록, IAM 호환성, 저장 및 데이터 전송을 줄이기 위한 선택적 ZLIB 압축이 포함됩니다. 스키마 레지스트리는 서버리스이며 무료입니다.

생산자와 소비자 간의 데이터 포맷 계약으로 스키마를 사용하면 데이터 거버넌스가 개선되고 데이터 품질이 향상되며 데이터 소비자가 호환 가능한 업스트림 변경 사항에 탄력적으로 대처할 수 있습니다.

스키마 레지스트리를 사용하면 직렬화 및 역직렬화를 위해 서로 다른 시스템이 스키마를 공유할 수 있습니다. 예를 들어 데이터 생산자와 소비자가 있다고 가정합니다. 생산자는 데이터를 게시할 때 스키마를 알고 있습니다. Schema Registry는 Amazon MSK 또는 Apache Kafka와 같은 특정 시스템용 serializer 및 deserializer를 제공합니다.

 자세한 내용은 [스키마 레지스트리 작동 방식](schema-registry-works.md) 섹션을 참조하세요.

**Topics**
+ [스키마](#schema-registry-schemas)
+ [레지스트리](#schema-registry-registries)
+ [스키마 버전 관리 및 호환성](#schema-registry-compatibility)
+ [오픈 소스 Serde 라이브러리](#schema-registry-serde-libraries)
+ [Schema Registry의 할당량](#schema-registry-quotas)
+ [스키마 레지스트리 작동 방식](schema-registry-works.md)
+ [스키마 레지스트리 시작하기](schema-registry-gs.md)

## 스키마
<a name="schema-registry-schemas"></a>

*스키마*는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다.

Avro에 대한 이 예제 스키마에서 포맷과 구조는 레이아웃 및 필드 이름으로 정의되고 필드 이름 포맷은 데이터 유형(예: `string`, `int`)으로 정의됩니다.

```
{
    "type": "record",
    "namespace": "ABC_Organization",
    "name": "Employee",
    "fields": [
        {
            "name": "Name",
            "type": "string"
        },
        {
            "name": "Age",
            "type": "int"
        },
        {
            "name": "address",
            "type": {
                "type": "record",
                "name": "addressRecord",
                "fields": [
                    {
                        "name": "street",
                        "type": "string"
                    },
                    {
                        "name": "zipcode",
                        "type": "int" 
                    }
                ]
            }
        }
    ]
}
```

이 JSON용 JSON 스키마 초안-07 예에서 포맷은 [JSON 스키마 조직](https://json-schema.org/)에서 정의합니다.

```
{
	"$id": "https://example.com/person.schema.json",
	"$schema": "http://json-schema.org/draft-07/schema#",
	"title": "Person",
	"type": "object",
	"properties": {
		"firstName": {
			"type": "string",
			"description": "The person's first name."
		},
		"lastName": {
			"type": "string",
			"description": "The person's last name."
		},
		"age": {
			"description": "Age in years which must be equal to or greater than zero.",
			"type": "integer",
			"minimum": 0
		}
	}
}
```

Protobuf에 대한 이 예제에서는 [프로토콜 버퍼 언어 버전 2(proto2)](https://developers.google.com/protocol-buffers/docs/reference/proto2-spec)로 형식을 정의합니다.

```
syntax = "proto2";

package tutorial;

option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";

message Person {
  optional string name = 1;
  optional int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    optional string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}
```

## 레지스트리
<a name="schema-registry-registries"></a>

*레지스트리*는 스키마의 논리적 컨테이너입니다. 레지스트리를 사용하면 스키마를 구성하고 애플리케이션에 대한 액세스 제어를 관리할 수 있습니다. 레지스트리에는 레지스트리 내의 스키마 작업에 대한 다양한 액세스 권한을 구성하고 설정할 수 있는 Amazon 리소스 이름(ARN)이 있습니다.

기본 레지스트리를 사용하거나 필요한 만큼 새 레지스트리를 생성할 수 있습니다.


**AWS Glue Schema Registry 계층 구조**  

|  | 
| --- |
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/schema-registry.html)  | 
|  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/schema-registry.html)  | 

## 스키마 버전 관리 및 호환성
<a name="schema-registry-compatibility"></a>

각 스키마에는 여러 버전이 있을 수 있습니다. 버전 관리는 스키마에 적용되는 호환성 규칙에 의해 관리됩니다. 새 스키마 버전 등록 요청은 성공하기 전에 Schema Registry에서 이 규칙에 대해 검사합니다.

체크포인트로 표시된 스키마 버전은 스키마의 새 버전 등록 호환성을 결정하는 데 사용됩니다. 스키마가 처음 생성되면 기본 체크포인트가 첫 번째 버전이 됩니다. 스키마가 더 많은 버전으로 발전함에 따라 CLI/SDK를 사용하여 일련의 제약 조건을 준수하는 `UpdateSchema` API를 사용하는 스키마 버전으로 체크포인트를 변경할 수 있습니다. 콘솔에서 스키마 정의 또는 호환성 모드를 편집하면 기본적으로 체크포인트가 최신 버전으로 변경됩니다.

호환성 모드를 사용하면 시간이 지남에 따라 스키마가 어떻게 발전할 수 있는지 여부를 제어할 수 있습니다. 이러한 모드는 데이터를 생성하고 소비하는 애플리케이션 간의 계약을 형성합니다. 새 버전의 스키마가 레지스트리에 제출되면 스키마 이름에 적용된 호환성 규칙을 사용하여 새 버전을 수락할 수 있는지 여부를 결정합니다. NONE, DISABLED, BACKWARD, BACKWARD\$1ALL, FORWARD, FORWARD\$1ALL, FULL, FULL\$1ALL의 8가지 호환 모드가 있습니다.

Avro 데이터 포맷에서 필드는 선택 사항이거나 필수일 수 있습니다. 선택적 필드는 `Type`이 null을 포함하는 필드입니다. 필수 필드에는 `Type`으로 null이 없습니다.

Protobuf 데이터 형식에서 필드가 proto2 구문에서는 선택 사항(반복 포함)이거나 필수일 수 있지만 proto3 구문에서는 모든 필드가 선택 사항입니다(반복 포함). 모든 호환성 규칙은 프로토콜 버퍼 사양에 대한 이해와 [Google 프로토콜 버퍼 설명서](https://developers.google.com/protocol-buffers/docs/overview#updating)의 지침에 따라 결정됩니다.
+ *NONE*: 호환 모드가 적용되지 않습니다. 개발 시나리오에서 또는 스키마에 적용할 호환성 모드를 모르는 경우 이 선택 사항을 사용할 수 있습니다. 추가된 모든 새 버전은 호환성 검사를 거치지 않고 수락됩니다.
+ *DISABLED*: 이 호환성 선택 항목은 특정 스키마에 대한 버전 관리를 방지합니다. 새 버전을 추가할 수 없습니다.
+ *BACKWARD*: 이 호환성 선택 항목은 소비자가 현재 및 이전 스키마 버전을 모두 읽을 수 있도록 하므로 권장됩니다. 이 선택 항목을 사용하여 필드를 삭제하거나 선택적 필드를 추가할 때 이전 스키마 버전과의 호환성을 확인할 수 있습니다. BACKWARD의 일반적인 사용 사례는 애플리케이션이 가장 최근 스키마에 대해 생성된 경우입니다.

**AVRO**  
예를 들어 이름(필수), 성(필수), 이메일(필수) 및 전화번호(선택 사항)로 정의된 스키마가 있다고 가정합니다.

  다음 스키마 버전에서 필수 이메일 필드를 제거하면 성공적으로 등록됩니다. BACKWARD 호환성을 위해서는 소비자가 현재 및 이전 스키마 버전을 읽을 수 있어야 합니다. 이전 메시지의 추가 이메일 필드가 무시되므로 소비자는 새 스키마를 읽을 수 있습니다.

  필수 필드를 추가하는 제안된 새 스키마 버전이 있는 경우(예: 우편 번호) 이는 BACKWARD 호환성으로 성공적으로 등록되지 않습니다. 새 버전의 소비자는 필수 우편 번호 필드가 누락되어 스키마 변경 전에 이전 메시지를 읽을 수 없습니다. 그러나 새 스키마에서 우편 번호 필드가 선택 사항으로 설정된 경우 소비자가 선택적 우편 번호 필드 없이 이전 스키마를 읽을 수 있으므로 제안된 버전이 성공적으로 등록됩니다.

**JSON**  
예를 들어 이름(선택 사항), 성(선택 사항), 이메일(선택 사항) 및 전화 번호(선택 사항)로 정의된 스키마 버전이 있다고 가정합니다.

  다음 스키마 버전이 선택적 전화번호 속성을 추가하는 경우 원래 스키마 버전이 `additionalProperties` 필드를 false로 설정하여 추가 속성을 허용하지 않는 한 성공적으로 등록됩니다. BACKWARD 호환성을 위해서는 소비자가 현재 및 이전 스키마 버전을 읽을 수 있어야 합니다. 소비자는 전화번호 속성이 존재하지 않는 원래 스키마로 생성된 데이터를 읽을 수 있습니다.

  선택적 전화 번호 속성을 추가하는 제안된 새 스키마 버전이 있는 경우 원래 스키마 버전이 `additionalProperties` 필드를 true로 설정하면(즉, 추가 속성을 허용할 때) BACKWARD 호환성으로 성공적으로 등록되지 않습니다. 새 버전의 소비자는 다른 유형(예: 숫자 대신 문자열)의 전화번호 속성이 있는 데이터를 읽을 수 없으므로 스키마가 변경되기 전에 이전 메시지를 읽을 수 없습니다.

**PROTOBUF**  
예를 들어 proto2 구문에서 `first name`(필수), `last name`(필수), `email`(필수), `phone number`(선택) 필드를 포함하는 Message `Person`으로 정의된 스키마 버전이 있다고 가정합니다.

  AVRO 시나리오에서처럼 다음 스키마 버전에서 필수 `email` 필드를 제거하면 성공적으로 등록됩니다. BACKWARD 호환성을 위해서는 소비자가 현재 및 이전 스키마 버전을 읽을 수 있어야 합니다. 이전 메시지의 추가 `email` 필드가 무시되므로 소비자는 새 스키마를 읽을 수 있습니다.

  필수 항목(예: `zip code`)을 추가하는 제안된 새 스키마 버전이 있는 경우, 이는 이전 버전과의 호환성으로 성공적으로 등록되지 않습니다. 새 버전의 소비자는 필수 `zip code` 필드가 누락되어 스키마 변경 전에 이전 메시지를 읽을 수 없습니다. 그러나 새 스키마에서 `zip code` 필드가 선택 사항으로 설정된 경우 소비자가 선택적 `zip code` 필드 없이 이전 스키마를 읽을 수 있으므로 제안된 버전이 성공적으로 등록됩니다.

  gRPC 사용 사례의 경우 새 RPC 서비스 또는 RPC 메서드 추가는 이전 버전과 호환되는 변경 사항입니다. 예를 들어 RPC 서비스 `MyService`에서 두 개의 RPC 메서드 `Foo` 및 `Bar`를 사용하여 정의한 스키마 버전이 있다고 가정합니다.

  다음 스키마 버전에서 `Baz`라는 새 RPC 메서드를 추가하는 경우 성공적으로 등록됩니다. 새로 추가된 RPC 메서드 `Baz`가 선택 사항이므로 소비자는 이전 버전과의 호환성에 따라 원래 스키마로 생성된 데이터를 읽을 수 있습니다.

  기존 RPC 메서드 `Foo`를 제거하는 제안된 새 스키마 버전이 있는 경우, 이는 이전 버전과의 호환성으로 성공적으로 등록되지 않습니다. 새 버전의 소비자는 gRPC 애플리케이션에서 RPC 메서드 `Foo`가 없는 데이터를 이해하거나 읽을 수 없으므로 스키마 변경 전에 이전 메시지를 읽을 수 없습니다.
+ *BACKWARD\$1ALL*: 이 호환성 선택을 통해 소비자는 현재 및 모든 이전 스키마 버전을 모두 읽을 수 있습니다. 이 선택 항목을 사용하여 필드를 삭제하거나 선택적 필드를 추가할 때 모든 이전 스키마 버전과의 호환성을 확인할 수 있습니다.
+ *FORWARD*: 이 호환성 선택을 통해 소비자는 현재 및 후속 스키마 버전을 모두 읽을 수 있지만 반드시 이후 버전은 아닙니다. 이 선택 항목 사용하여 필드를 추가하거나 선택적 필드를 삭제할 때 마지막 스키마 버전과의 호환성을 확인할 수 있습니다. FORWARD의 일반적인 사용 사례는 애플리케이션이 이전 스키마에 대해 생성되었고 더 최신 스키마를 처리할 수 있어야 하는 경우입니다.

**AVRO**  
예를 들어 이름(필수), 성(필수), 이메일(선택 사항)로 정의된 스키마 버전이 있다고 가정합니다.

  필수 항목(예: 전화번호)을 추가하는 새 스키마 버전이 있는 경우 성공적으로 등록됩니다. FORWARD 호환성을 위해서는 소비자가 이전 버전을 사용하여 새 스키마로 생성된 데이터를 읽을 수 있어야 합니다.

  필수 이름 필드를 삭제하는 제안된 스키마 버전이 있는 경우 FORWARD 호환성으로 성공적으로 등록되지 않습니다. 이전 버전의 소비자는 필수 이름 필드가 누락되어 제안된 스키마를 읽을 수 없습니다. 그러나 이름 필드가 원래 선택 사항인 경우 제안된 새 스키마는 선택 사항인 이름 필드가 없는 새 스키마를 기반으로 소비자가 데이터를 읽을 수 있으므로 성공적으로 등록됩니다.

**JSON**  
예를 들어 이름(선택 사항), 성(선택 사항), 이메일(선택 사항) 및 전화 번호(선택 사항)로 정의된 스키마 버전이 있다고 가정합니다.

  선택적 전화 번호 속성을 제거하는 새 스키마 버전이 있는 경우 새 스키마 버전에서 `additionalProperties` 필드를 false로 설정하여 추가 속성을 허용하지 않는 한 성공적으로 등록됩니다. FORWARD 호환성을 위해서는 소비자가 이전 버전을 사용하여 새 스키마로 생성된 데이터를 읽을 수 있어야 합니다.

  선택적 전화번호 속성을 삭제하는 제안된 스키마 버전이 있는 경우 새 스키마 버전이 `additionalProperties` 필드를 true로 설정하면 즉, 추가 속성을 허용할 때 FORWARD 호환성으로 성공적으로 등록되지 않습니다. 이전 버전의 소비자는 다른 유형의 전화번호 속성(예: 숫자 대신 문자열)을 가질 수 있으므로 제안된 스키마를 읽을 수 없습니다.

**PROTOBUF**  
예를 들어 proto2 구문에서 `first name`(필수), `last name`(필수), `email`(선택 사항) 필드를 포함하는 Message `Person`으로 정의된 스키마 버전이 있다고 가정합니다.

  AVRO 시나리오처럼 필수 항목(예: `phone number`)을 추가하는 새 스키마 버전이 있는 경우 성공적으로 등록됩니다. FORWARD 호환성을 위해서는 소비자가 이전 버전을 사용하여 새 스키마로 생성된 데이터를 읽을 수 있어야 합니다.

  필수 `first name` 필드를 삭제하는 제안된 스키마 버전이 있는 경우 FORWARD 호환성으로 성공적으로 등록되지 않습니다. 이전 버전의 소비자는 필수 `first name` 필드가 누락되어 제안된 스키마를 읽을 수 없습니다. 그러나 `first name` 필드가 원래 선택 사항인 경우 제안된 새 스키마는 선택 사항인 `first name` 필드가 없는 새 스키마를 기반으로 소비자가 데이터를 읽을 수 있으므로 성공적으로 등록됩니다.

  gRPC 사용 사례의 경우 RPC 서비스 또는 RPC 메서드 제거는 다음 버전과 호환되는 변경 사항입니다. 예를 들어 RPC 서비스 `MyService`에서 두 개의 RPC 메서드 `Foo` 및 `Bar`를 사용하여 정의한 스키마 버전이 있다고 가정합니다.

  다음 스키마 버전에서 `Foo`라는 기존 RPC 메서드를 삭제하는 경우 소비자가 이전 버전을 사용하여 새 스키마로 생성된 데이터를 읽을 수 있으므로 FORWARD 호환성에 따라 성공적으로 등록됩니다. `Baz`라는 RPC 메서드를 추가하는 제안된 새 스키마 버전이 있는 경우 FORWARD 호환성으로 성공적으로 등록되지 않습니다. 이전 버전의 소비자는 `Baz`라는 RPC 메서드가 누락되어 제안된 스키마를 읽을 수 없습니다.
+ *FORWARD\$1ALL*: 이 호환성 선택을 통해 소비자는 새로 등록된 스키마의 생산자가 작성한 데이터를 읽을 수 있습니다. 필드를 추가하거나 선택적 필드를 삭제하고 모든 이전 스키마 버전과의 호환성을 확인해야 할 때 이 선택 사항을 사용할 수 있습니다.
+ *FULL*: 이 호환성 선택을 통해 소비자는 이전 또는 다음 버전의 스키마를 사용하여 생산자가 작성한 데이터를 읽을 수 있지만 이전 또는 이후 버전은 아닙니다. 이 선택 항목 사용하여 선택적 필드를 추가하거나 제거할 때 마지막 스키마 버전과의 호환성을 확인할 수 있습니다.
+ *FULL\$1ALL*: 이 호환성 선택을 통해 소비자는 모든 이전 스키마 버전을 사용하여 생산자가 작성한 데이터를 읽을 수 있습니다. 이 선택 항목을 사용하여 선택적 필드를 추가하거나 제거할 때 모든 이전 스키마 버전과의 호환성을 확인할 수 있습니다.

## 오픈 소스 Serde 라이브러리
<a name="schema-registry-serde-libraries"></a>

AWS는 데이터 직렬화 및 역직렬화를 위한 프레임워크로 오픈 소스 Serde 라이브러리를 제공합니다. 이러한 라이브러리의 오픈 소스 설계를 통해 일반적인 오픈 소스 애플리케이션 및 프레임워크가 프로젝트에서 이러한 라이브러리를 지원할 수 있습니다.

Serde 라이브러리의 작동 방식에 대한 자세한 내용은 [스키마 레지스트리 작동 방식](schema-registry-works.md) 섹션을 참조하세요.

## Schema Registry의 할당량
<a name="schema-registry-quotas"></a>

AWS에서 제한이라고도 하는 할당량은 AWS 계정의 리소스, 작업, 항목의 최댓값입니다. 다음은 AWS Glue의 Schema Registry에 대한 소프트 제한입니다.

**스키마 버전 메타데이터 키-값 페어**  
AWS 리전별로 SchemaVersion당 최대 10개의 키-값 페어가 있을 수 있습니다.

[QuerySchemaVersionMetadata 작업(Python: query\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-QuerySchemaVersionMetadata) 또는 [PutSchemaVersionMetadata 작업(Python: put\$1schema\$1version\$1metadata)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-PutSchemaVersionMetadata) API를 사용하여 키-값 메타데이터 페어를 보거나 설정할 수 있습니다.

다음은 AWS Glue의 Schema Registry에 대한 하드 제한입니다.

**레지스트리**  
이 계정의 경우 AWS 리전당 최대 100개의 레지스트리를 보유할 수 있습니다.

**SchemaVersion**  
이 계정에 대해 AWS 리전당 최대 10,000개의 스키마 버전을 보유할 수 있습니다.

각 새 스키마는 새 스키마 버전을 생성하므로 각 스키마에 버전이 하나만 있는 경우 이론상 지역별로 계정당 최대 10,000개의 스키마를 보유할 수 있습니다.

**스키마 페이로드**  
스키마 페이로드의 크기 제한은 170KB입니다.

# 스키마 레지스트리 작동 방식
<a name="schema-registry-works"></a>

이 섹션에서는 스키마 레지스트리의 직렬화 및 역직렬화 프로세스의 작동 방식에 대해 설명합니다.

1. 스키마 등록: 스키마가 레지스트리에 이미 존재하지 않는 경우 대상 이름과 동일한 스키마 이름(예: test\$1topic, test\$1stream, prod\$1firehose)으로 스키마를 등록하거나 생산자가 스키마에 대한 사용자 정의 이름을 제공할 수 있습니다. 생산자는 또한 소스: msk\$1kafka\$1topic\$1A와 같은 메타데이터로 스키마에 키-값 페어를 추가하거나 스키마 생성 시 스키마에 AWS 태그를 적용할 수 있습니다. 스키마가 등록되면 Schema Registry는 스키마 버전 ID를 serializer에 반환합니다. 스키마가 존재하지만 serializer가 존재하지 않는 새 버전을 사용하는 경우 Schema Registry는 스키마 참조를 호환성 규칙으로 확인하여 새 버전을 새 버전으로 등록하기 전에 새 버전이 호환되는지 확인합니다.

   스키마를 등록하는 방법에는 수동 등록과 자동 등록의 두 가지가 있습니다. AWS Glue 콘솔이나 CLI/SDK를 통해 수동으로 스키마를 등록할 수 있습니다.

   serializer 설정에서 자동 등록이 켜져 있으면 스키마의 자동 등록이 수행됩니다. 생산자 구성에 `REGISTRY_NAME`이 제공되지 않으면 자동 등록은 기본 레지스트리(default-registry) 아래에 새 스키마 버전을 등록합니다. 자동 등록 속성 지정에 대한 자세한 내용은 [SerDe 라이브러리 설치](schema-registry-gs-serde.md) 섹션을 참조하세요.

1. Serializer는 스키마에 대해 데이터 레코드를 검증합니다. 데이터를 생성하는 애플리케이션이 해당 스키마를 등록한 경우 Schema Registry serializer는 애플리케이션이 생성하는 레코드가 등록된 스키마와 일치하는 필드 및 데이터 유형으로 구성되었는지 확인합니다. 레코드의 스키마가 등록된 스키마와 일치하지 않는 경우 serializer는 예외를 반환하고 애플리케이션은 레코드를 대상으로 전달하지 못합니다.

   스키마가 없고 생산자 구성을 통해 스키마 이름이 제공되지 않은 경우 스키마는 주제 이름(Apache Kafka 또는 Amazon MSK의 경우) 또는 스트림 이름(Kinesis Data Streams의 경우)과 동일한 이름으로 생성됩니다.

   모든 레코드에는 스키마 정의와 데이터가 있습니다. 스키마 정의는 Schema Registry의 기존 스키마 및 버전에 대해 쿼리됩니다.

   기본적으로 생산자는 등록된 스키마의 스키마 정의 및 스키마 버전 ID를 캐시합니다. 레코드의 스키마 버전 정의가 캐시에서 사용 가능한 것과 일치하지 않는 경우 생산자는 Schema Registry를 사용하여 스키마를 검증하려고 시도합니다. 스키마 버전이 유효하면 해당 버전 ID와 정의가 생산자에서 로컬로 캐시됩니다.

   [SerDe 라이브러리 설치](schema-registry-gs-serde.md)의 3단계에서 선택적 생산자 속성 내에서 기본 캐시 기간(24시간)을 조정할 수 있습니다.

1. 레코드 직렬화 및 전달: 레코드가 스키마를 준수하는 경우 serializer는 각 레코드를 스키마 버전 ID로 장식하고 선택한 데이터 포맷(AVRO, JSON, Protobuf 또는 다른 포맷 곧 제공 예정)을 기반으로 레코드를 직렬화하고 레코드를 압축(선택적 생산자 구성), 목적지로 전달합니다.

1. 소비자는 데이터를 역직렬화합니다. 이 데이터를 읽는 소비자는 레코드 페이로드에서 스키마 버전 ID를 구문 분석하는 Schema Registry deserializer 라이브러리를 사용합니다.

1. Deserializer는 Schema Registry에서 스키마를 요청할 수 있습니다. Deserializer가 특정 스키마 버전 ID를 가진 레코드를 처음 본 경우에는 스키마 버전 ID를 사용하여 deserializer가 Schema Registry에서 스키마를 요청하고 소비자에서 로컬로 스키마를 캐시합니다. Schema Registry가 레코드를 역직렬화할 수 없는 경우 소비자는 레코드에서 데이터를 기록하고 계속 진행하거나 애플리케이션을 중지할 수 있습니다.

1. Deserializer는 스키마를 사용하여 레코드를 역직렬화합니다. Deserializer가 Schema Registry에서 스키마 버전 ID를 검색할 때 deserializer는 레코드를 압축 해제하고(생산자가 보낸 레코드가 압축된 경우) 스키마를 사용하여 레코드를 역직렬화합니다. 이제 애플리케이션이 레코드를 처리합니다.

**참고**  
암호화: 클라이언트는 HTTPS를 통한 TLS 암호화를 사용하여 전송 중 데이터를 암호화하는 API 호출을 통해 Schema Registry와 통신합니다. Schema Registry에 저장된 스키마는 항상 서비스 관리형 AWS Key Management Service(AWS KMS) 키를 사용하여 저장 시 암호화됩니다.

**참고**  
사용자 권한 부여: 스키마 레지스트리는 자격 증명 기반 IAM 정책을 지원합니다.

# 스키마 레지스트리 시작하기
<a name="schema-registry-gs"></a>

다음 섹션은 Schema Registry를 설정하고 사용하는 방법에 대한 개요 및 설명입니다. 스키마 레지스트리 개념 및 구성 요소에 대한 자세한 내용은 [AWS Glue 스키마 레지스트리](schema-registry.md) 섹션을 참조하세요.

**Topics**
+ [SerDe 라이브러리 설치](schema-registry-gs-serde.md)
+ [AWS Glue Schema Registry와 통합](schema-registry-integrations.md)
+ [서드 파티 Schema Registry에서 AWS Glue Schema Registry로 마이그레이션](schema-registry-integrations-migration.md)

# SerDe 라이브러리 설치
<a name="schema-registry-gs-serde"></a>

SerDe 라이브러리는 데이터 직렬화 및 역직렬화를 위한 프레임워크를 제공합니다.

데이터를 생성하는 애플리케이션("serializer"로 통칭)을 위한 오픈 소스 serializer를 설치합니다. Serializer는 직렬화, 압축 및 Schema Registry와의 상호 작용을 처리합니다. Serializer는 Amazon MSK와 같은 Schema Registry 호환 대상에 기록되는 레코드에서 스키마를 자동으로 추출합니다. 마찬가지로 데이터를 사용하는 애플리케이션에 오픈 소스 deserializer를 설치합니다.

# Java 구현
<a name="schema-registry-gs-serde-java"></a>

**참고**  
전제 조건: 다음 단계를 완료하기 전에 Amazon Managed Streaming for Apache Kafka(Amazon MSK) 또는 Apache Kafka 클러스터가 실행 중이어야 합니다. 생산자와 소비자는 Java 8 이상에서 실행 중이어야 합니다.

생산자와 소비자에 라이브러리를 설치하려면

1. 생산자와 소비자의 pom.xml 파일 내에서 아래 코드를 통해 이 종속성을 추가합니다.

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.5</version>
   </dependency>
   ```

   또는 [AWS Glue Schema Registry Github 리포지토리](https://github.com/awslabs/aws-glue-schema-registry)를 복제할 수 있습니다.

1. 다음 필수 속성으로 생산자를 설정합니다.

   ```
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
   properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
   ```

   기존 스키마가 없으면 자동 등록을 설정해야 합니다(다음 단계). 적용하려는 스키마가 있는 경우 "my-schema"를 스키마 이름으로 바꿉니다. 또한 스키마 자동 등록이 꺼져 있는 경우 "registry-name"을 제공해야 합니다. 스키마가 "default-registry" 아래에 생성된 경우 레지스트리 이름을 생략할 수 있습니다.

1. (선택 사항) 이러한 선택적 생산자 속성을 설정합니다. 자세한 속성 설명은 [ReadMe 파일](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)을 참조하세요.

   ```
   props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false"
   props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams)
   props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
   props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours)
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
   props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
   props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
   ```

   자동 등록은 기본 레지스트리("default-registry") 아래에 스키마 버전을 등록합니다. 이전 단계에서 `SCHEMA_NAME`을 지정하지 않으면 주제 이름이 `SCHEMA_NAME`으로 유추됩니다.

   호환성 모드에 대한 자세한 내용은 [스키마 버전 관리 및 호환성](schema-registry.md#schema-registry-compatibility) 섹션을 참조하세요.

1. 다음 필수 속성으로 소비자를 설정합니다.

   ```
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
   props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 리전
   props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
   ```

1. (선택 사항) 이러한 선택적 소비자 속성을 설정합니다. 자세한 속성 설명은 [ReadMe 파일](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)을 참조하세요.

   ```
   properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000
   props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200
   props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario
   ```

# C\$1 구현
<a name="schema-registry-gs-serde-csharp"></a>

**참고**  
전제 조건: 다음 단계를 완료하기 전에 Amazon Managed Streaming for Apache Kafka(Amazon MSK) 또는 Apache Kafka 클러스터가 실행 중이어야 합니다. 생산자와 소비자는 .NET 8.0 이상에서 실행 중이어야 합니다.

## 설치
<a name="schema-registry-gs-serde-csharp-install"></a>

C\$1 애플리케이션의 경우 다음 방법 중 하나를 사용하여 AWS Glue Schema Registry SerDe NuGet 패키지를 설치합니다.

**.NET CLI:**  
다음 명령을 실행하여 패키지 설치:

```
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
```

여기서 `<rid>`는 `1.0.0-linux-x64`, `1.0.0-linux-musl-x64` 또는 `1.0.0-linux-arm64`

**PackageReference(.csproj 파일):**  
프로젝트 파일에 다음을 추가:

```
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
```

여기서 `<rid>`는 `1.0.0-linux-x64`, `1.0.0-linux-musl-x64` 또는 `1.0.0-linux-arm64`

## 구성 파일 설정
<a name="schema-registry-gs-serde-csharp-config"></a>

필요한 설정을 사용하여 구성 속성 파일(예: `gsr-config.properties`) 생성:

**최소 구성:**  
다음 예제에서는 최소 구성 샘플을 보여줍니다.

```
region=us-east-1
registry.name=default-registry
dataFormat=AVRO
schemaAutoRegistrationEnabled=true
```

## Kafka SerDes에 C\$1 Glue Schema 클라이언트 라이브러리 사용
<a name="schema-registry-gs-serde-csharp-kafka"></a>

**샘플 Serializer 사용:**  
다음 예제에서는 Serializer 사용 방법을 보여줍니다.

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH);
var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName);
// send serialized bytes to Kafka using producer.Produce(serialized)
```

**샘플 Deserializer 사용:**  
다음 예제에서는 Deserializer 사용 방법을 보여줍니다.

```
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>";
var dataConfig = new GlueSchemaRegistryDataFormatConfiguration(
    new Dictionary<string, dynamic>
    {
        {
            GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor
        }
    }
);
var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig);

// read message from Kafka using serialized = consumer.Consume()
var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
```

## KafkaFlow for SerDes에 C\$1 Glue Schema 클라이언트 라이브러리 사용
<a name="schema-registry-gs-serde-csharp-kafkaflow"></a>

**샘플 Serializer 사용:**  
다음 예제에서는 Serializer를 사용하여 KafkaFlow를 구성하는 방법을 보여줍니다.

```
services.AddKafka(kafka => kafka
    .UseConsoleLog()
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "localhost:9092" })
        .AddProducer<CustomerProducer>(producer => producer
            .DefaultTopic("customer-events")
            .AddMiddlewares(m => m
                .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>(
                    () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties")
                )
            )
        )
    )
);
```

**샘플 Deserializer 사용:**  
다음 예제에서는 Deserializer를 사용하여 KafkaFlow를 구성하는 방법을 보여줍니다.

```
.AddConsumer(consumer => consumer
    .Topic("customer-events")
    .WithGroupId("customer-group")
    .WithBufferSize(100)
    .WithWorkersCount(10)
    .AddMiddlewares(middlewares => middlewares
        .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>(
            () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties")
        )
        .AddTypedHandlers(h => h.AddHandler<CustomerHandler>())
    )
)
```

## 선택적 생산자 속성
<a name="schema-registry-gs-serde-csharp-optional"></a>

추가 선택적 속성을 사용하여 구성 파일을 확장할 수 있습니다.

```
# Auto-registration (if not passed, uses "false")
schemaAutoRegistrationEnabled=true

# Schema name (if not passed, uses topic name)
schema.name=my-schema

# Registry name (if not passed, uses "default-registry")
registry.name=my-registry

# Cache settings
cacheTimeToLiveMillis=86400000
cacheSize=200

# Compatibility mode (if not passed, uses BACKWARD)
compatibility=FULL

# Registry description
description=This registry is used for several purposes.

# Compression (if not passed, records are sent uncompressed)
compressionType=ZLIB
```

## 지원되는 날짜 형식
<a name="schema-registry-gs-serde-supported-formats"></a>

Java 및 C\$1 구현 모두 동일한 데이터 형식을 지원합니다.
+ *AVRO*: Apache Avro 이진 형식
+ *JSON*: JSON 스키마 형식
+ *PROTOBUF*: 프로토콜 버퍼 형식

## 참고
<a name="schema-registry-gs-serde-csharp-notes"></a>
+ 라이브러리를 시작하려면 [https://www.nuget.org/packages/AWS.Glue.SchemaRegistry](https://www.nuget.org/packages/AWS.Glue.SchemaRegistry) 페이지 방문
+ 소스 코드: [https://github.com/awslabs/aws-glue-schema-registry](https://github.com/awslabs/aws-glue-schema-registry)

# 레지스트리 생성
<a name="schema-registry-gs3"></a>

기본 레지스트리를 사용하거나 AWS Glue API 또는 AWS Glue 콘솔을 사용하여 필요한 만큼 새 레지스트리를 생성할 수 있습니다.

**AWS Glue API**  
다음 단계를 사용하여 AWS Glue API로 이 태스크를 수행할 수 있습니다.

AWS Glue Schema Registry API에 AWS CLI를 사용하려면 AWS CLI를 최신 버전으로 업데이트해야 합니다.

 새 레지스트리를 추가하려면 [CreateRegistry 작업(Python: create\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateRegistry) API를 사용합니다. 생성할 레지스트리의 이름으로 `RegistryName`을 지정합니다. 최대 길이는 255이며 문자, 숫자, 하이픈, 밑줄, 달러 기호 또는 해시 표시만 포함합니다.

[URI 주소 여러 줄 문자열 패턴](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-common.html#aws-glue-api-common-_string-patterns)과 일치하는 2048바이트 이하의 문자열로 `Description`을 지정합니다.

선택적으로 키-값 페어의 맵 배열로 레지스트리에 대해 하나 이상의 `Tags`를 지정합니다.

```
aws glue create-registry --registry-name registryName1 --description description
```

레지스트리가 생성되면 API 응답의 `RegistryArn`에서 볼 수 있는 Amazon 리소스 이름(ARN)이 할당됩니다. 이제 레지스트리를 생성했으므로 해당 레지스트리에 대해 하나 이상의 스키마를 생성합니다.

**AWS Glue 콘솔**  
AWS Glue 콘솔에서 새 레지스트리를 추가하려면

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**스키마 레지스트리(Schema registries)**]를 선택합니다.

1. [**레지스트리 추가(Add registry)**]를 선택합니다.

1. 문자, 숫자, 하이픈 또는 밑줄로 구성된 레지스트리의 [**레지스트리 이름(Registry name)**]을 입력합니다. 이 이름은 변경할 수 없습니다.

1. 레지스트리에 대한 [**설명(Description)**](선택 사항)을 입력합니다.

1. 선택적으로 레지스트리에 하나 이상의 태그를 적용합니다. [**새 태그 추가(Add new tag)**]를 선택하고 [**태그 키(Tag key)**]와 [**태그 값(Tag value)**](선택 사항)을 지정합니다.

1. [**레지스트리 추가(Add registry)**]를 선택합니다.

![\[레지스트리 생성의 예.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/schema_reg_create_registry.png)


레지스트리가 생성되면 [**스키마 레지스트리(Schema registries)**]의 목록에서 레지스트리를 선택하여 볼 수 있는 Amazon 리소스 이름(ARN)이 할당됩니다. 이제 레지스트리를 생성했으므로 해당 레지스트리에 대해 하나 이상의 스키마를 생성합니다.

# JSON에 대한 특정 레코드(JAVA POJO) 처리
<a name="schema-registry-gs-json-java-pojo"></a>

POJO(Plain Old Java Object)를 사용하고 객체를 레코드로 전달할 수 있습니다. 이는 AVRO에서 특정 레코드의 개념과 유사합니다. [mbknor-jackson-jsonschema](https://github.com/mbknor/mbknor-jackson-jsonSchema)는 전달된 POJO에 대한 JSON 스키마를 생성할 수 있습니다. 이 라이브러리는 JSON 스키마에 추가 정보를 삽입할 수도 있습니다.

AWS Glue Schema Registry 라이브러리는 스키마에 삽입된 "className" 필드를 사용하여 완전히 분류된 클래스 이름을 제공합니다. "className" 필드는 deserializer에서 해당 클래스의 객체로 역직렬화하는 데 사용됩니다.

```
 Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}
```

# 스키마 생성
<a name="schema-registry-gs4"></a>

AWS Glue API 또는 AWS Glue 콘솔을 사용하여 스키마를 생성할 수 있습니다.

**AWS Glue API**  
다음 단계를 사용하여 AWS Glue API로 이 태스크를 수행할 수 있습니다.

새 스키마를 추가하려면 [CreateSchema 작업(Python: create\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-CreateSchema) API를 사용합니다.

스키마에 대한 레지스트리를 나타내려면 `RegistryId` 구조를 지정합니다. 또는 기본 레지스트리를 사용하려면 `RegistryId`를 생략합니다.

문자, 숫자, 하이픈, 밑줄로 구성된 `SchemaName`을 지정하고 `DataFormat`은 **AVRO** 또는 **JSON**으로 지정합니다. `DataFormat`은 스키마에 한 번 설정되면 변경할 수 없습니다.

`Compatibility` 모드를 지정합니다.
+ [*뒤로(권장)(Backward (recommended))*] - 소비자가 현재 버전과 이전 버전을 모두 읽을 수 있습니다.
+ [*모두 뒤로(Backward all)*] - 소비자가 현재 및 모든 이전 버전을 읽을 수 있습니다.
+ [*앞으로(Forward)*] - 소비자가 현재 및 후속 버전을 모두 읽을 수 있습니다.
+ [*모두 앞으로(Forward all)*] - 소비자가 현재 버전과 모든 후속 버전을 읽을 수 있습니다.
+ [*전체(Full)*] - 뒤로 및 앞으로의 조합입니다.
+ [*모두 전체(Full all)*] - 모두 뒤로 및 모두 앞으로의 조합입니다.
+ [*없음(None)*] - 호환성 확인이 수행되지 않습니다.
+ [*사용 중지됨(Disabled)*] - 이 스키마에 대한 버전 관리를 방지합니다.

선택적으로 스키마에 `Tags`를 지정합니다.

Avro, JSON 또는 Protobuf 데이터 포맷으로 스키마를 정의하려면 `SchemaDefinition`을(를) 지정합니다. 예제를 참조하세요.

Avro 데이터 포맷의 경우:

```
aws glue create-schema --registry-id RegistryName="registryName1" --schema-name testschema --compatibility NONE --data-format AVRO --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1" --schema-name testschema --compatibility NONE --data-format AVRO  --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}"
```

JSON 데이터 포맷의 경우:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaJson --compatibility NONE --data-format JSON --schema-definition "{\"$schema\": \"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}"
```

Protobuf 데이터 형식의 경우:

```
aws glue create-schema --registry-id RegistryName="registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

```
aws glue create-schema --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName" --schema-name testSchemaProtobuf --compatibility NONE --data-format PROTOBUF --schema-definition "syntax = \"proto2\";package org.test;message Basic { optional int32 basic = 1;}"
```

**AWS Glue 콘솔**  
AWS Glue 콘솔을 사용하여 새 스키마를 추가하려면

1. AWS 관리 콘솔에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**스키마(Schemas)**]를 선택합니다.

1. [**스키마 추가(Add schema)**]를 선택합니다.

1. 문자, 숫자, 하이픈, 밑줄, 달러 기호 또는 해시마크로 구성된 [**스키마 이름(Schema name)**]을 입력합니다. 이 이름은 변경할 수 없습니다.

1. 드롭다운 메뉴에서 스키마가 저장될 [**레지스트리(Registry)**]를 선택합니다. 상위 레지스트리는 생성 후에 변경할 수 없습니다.

1. [**데이터 포맷(Data format)**]을 [*Apache Avro*] 또는 [*JSON*]으로 둡니다. 이 포맷은 이 스키마의 모든 버전에 적용됩니다.

1. [**호환성 모드(Compatibility mode)**]를 선택합니다.
   + [*뒤로(권장)(Backward (recommended))*] - 수신자가 현재 버전과 이전 버전을 모두 읽을 수 있습니다.
   + [*모두 뒤로(Backward All)*] - 수신자가 현재 및 모든 이전 버전을 읽을 수 있습니다.
   + [*앞으로(Forward)*] - 발신자가 현재 버전과 이전 버전을 모두 쓸 수 있습니다.
   + [*모두 앞으로(Forward All)*] - 발신자가 현재 버전과 모든 이전 버전을 모두 쓸 수 있습니다.
   + [*전체(Full)*] - 뒤로 및 앞으로의 조합입니다.
   + [*모두 전체(Full All)*] - 모두 뒤로 및 모두 앞으로의 조합입니다.
   + [*없음(None)*] - 호환성 확인이 수행되지 않습니다.
   + [*사용 중지됨(Disabled)*] - 이 스키마에 대한 버전 관리를 방지합니다.

1. 최대 250자의 레지스트리에 대한 선택적 [**설명(Description)**]을 입력합니다.  
![\[스키마 생성의 예.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/schema_reg_create_schema.png)

1. 선택적으로 스키마에 하나 이상의 태그를 적용합니다. [**새 태그 추가(Add new tag)**]를 선택하고 [**태그 키(Tag key)**]와 [**태그 값(Tag value)**](선택 사항)을 지정합니다.

1. [**첫 번째 스키마 버전(First schema version)**] 상자에 초기 스키마를 입력하거나 붙여넣습니다.

   Avro 포맷은 [Avro 데이터 포맷 작업](#schema-registry-avro) 섹션을 참조하세요.

   JSON 포맷은 [JSON 데이터 포맷 작업](#schema-registry-json) 섹션을 참조하세요.

1. 선택적으로 [**메타데이터 추가(Add metadata)**]를 선택하여 스키마 버전에 주석을 달거나 분류할 버전 메타데이터를 추가합니다.

1. [**스키마 및 버전 생성(Create schema and version)**]을 선택합니다.

![\[스키마 생성의 예.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/schema_reg_create_schema2.png)


스키마가 생성되고 [**스키마(Schemas)**] 아래의 목록에 나타납니다.

## Avro 데이터 포맷 작업
<a name="schema-registry-avro"></a>

Avro는 데이터 직렬화 및 데이터 교환 서비스를 제공합니다. Avro는 데이터 정의를 JSON 포맷으로 저장하므로 읽고 해석하기 쉽습니다. 데이터 자체는 바이너리 포맷으로 저장됩니다.

Apache Avro 스키마 정의에 대한 자세한 내용은 [Apache Avro 사양](http://avro.apache.org/docs/current/spec.html)을 참조하세요.

## JSON 데이터 포맷 작업
<a name="schema-registry-json"></a>

데이터는 JSON 포맷으로 직렬화 할 수 있습니다. [JSON 스키마 포맷](https://json-schema.org/)은 JSON 스키마 포맷의 표준을 정의합니다.

# 스키마 또는 레지스트리 업데이트
<a name="schema-registry-gs5"></a>

생성되면 스키마, 스키마 버전 또는 레지스트리를 편집할 수 있습니다.

## 레지스트리 업데이트
<a name="schema-registry-gs5a"></a>

AWS Glue API 또는 AWS Glue 콘솔을 사용하여 레지스트리를 업데이트할 수 있습니다. 기존 레지스트리의 이름은 편집할 수 없습니다. 레지스트리에 대한 설명을 편집할 수 있습니다.

**AWS Glue API**  
기존 레지스트리를 업데이트하려면[UpdateRegistry 작업(Python: update\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateRegistry) API를 사용합니다.

업데이트할 레지스트리를 나타내는 `RegistryId` 구조를 지정합니다. 레지스트리에 대한 설명을 변경하려면 `Description`을 전달합니다.

```
aws glue update-registry --description updatedDescription --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

**AWS Glue 콘솔**  
AWS Glue 콘솔을 사용하여 레지스트리를 업데이트하려면

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**스키마 레지스트리(Schema registries)**]를 선택합니다.

1. 해당 확인란을 선택하여 레지스트리 목록에서 레지스트리를 선택합니다.

1. [**작업(Action)**] 메뉴에서 [**레지스트리 편집(Edit registry)**]을 선택합니다.

# 스키마 업데이트
<a name="schema-registry-gs5b"></a>

스키마에 대한 설명 또는 호환성 설정을 업데이트할 수 있습니다.

기존 스키마를 업데이트하려면[UpdateSchema 작업(Python: update\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-UpdateSchema) API를 사용합니다.

업데이트할 스키마를 나타내는 `SchemaId` 구조를 지정합니다. `VersionNumber` 또는 `Compatibility` 중 하나가 제공되어야 합니다.

코드 예제 11:

```
aws glue update-schema --description testDescription --schema-id SchemaName="testSchema1",RegistryName="registryName1" --schema-version-number LatestVersion=true --compatibility NONE
```

```
aws glue update-schema --description testDescription --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/testSchema1" --schema-version-number LatestVersion=true --compatibility NONE
```

# 스키마 버전 추가
<a name="schema-registry-gs5c"></a>

스키마 버전을 추가할 때 버전을 비교하여 새 스키마가 허용되는지 확인해야 합니다.

기존 스키마에 새 버전을 추가하려면 [RegisterSchemaVersion 작업(Python: register\$1schema\$1version)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-RegisterSchemaVersion) API를 사용합니다.

버전을 추가하려는 스키마를 나타내려면 `SchemaId` 구조를 지정하고 스키마를 정의하려면 `SchemaDefinition`을 지정합니다.

코드 예제 12:

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaArn="arn:aws:glue:us-east-1:901234567890:schema/registryName/testschema"
```

```
aws glue register-schema-version --schema-definition "{\"type\": \"record\", \"name\": \"r1\", \"fields\": [ {\"name\": \"f1\", \"type\": \"int\"}, {\"name\": \"f2\", \"type\": \"string\"} ]}" --schema-id SchemaName="testschema",RegistryName="testregistry"
```

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**스키마(Schemas)**]를 선택합니다.

1. 해당 확인란을 선택하여 스키마 목록에서 스키마를 선택합니다.

1. 확인란을 선택하여 목록에서 하나 이상의 스키마를 선택합니다.

1. [**작업(Action)**] 메뉴에서 [**새 버전 등록(Register new version)**]을 선택합니다.

1. [**새 버전(New version)**] 상자에 새 스키마를 입력하거나 붙여 넣습니다.

1. [**이전 버전과 비교(Compare with previous version)**]를 선택하여 이전 스키마 버전과의 차이점을 확인합니다.

1. 선택적으로 [**메타데이터 추가(Add metadata)**]를 선택하여 스키마 버전에 주석을 달거나 분류할 버전 메타데이터를 추가합니다. [**키(Key)**]와 [**값(Value)**](선택 사항)을 입력합니다.

1. [**버전 등록(Register version)**]을 선택합니다.

![\[스키마 버전 추가\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/schema_reg_add_schema_version.png)


스키마 버전이 버전 목록에 나타납니다. 버전이 호환 모드를 변경한 경우 버전이 체크포인트로 표시됩니다.

## 스키마 버전 비교의 예
<a name="schema-registry-gs5c1"></a>

[**이전 버전과 비교(Compare with previous version)**]를 선택하면 이전 버전과 새 버전이 함께 표시됩니다. 변경된 정보는 다음과 같이 강조 표시됩니다.
+ *노란색*: 변경된 정보를 나타냅니다.
+ *녹색*: 최신 버전에 추가된 내용을 나타냅니다.
+ *빨간색*: 최신 버전에서 제거된 콘텐츠를 나타냅니다.

이전 버전과 비교할 수도 있습니다.

![\[스키마 버전 비교의 예입니다.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/schema_reg_version_comparison.png)


# 스키마 또는 레지스트리 삭제
<a name="schema-registry-gs7"></a>

스키마, 스키마 버전 또는 레지스트리 삭제는 취소할 수 없는 영구적인 작업입니다.

## 스키마 삭제
<a name="schema-registry-gs7a"></a>

AWS Management Console 또는 [DeleteSchema 작업(Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API를 사용하여 레지스트리 내에서 더 이상 사용되지 않는 스키마를 삭제할 수 있습니다.

하나 이상의 스키마 삭제는 실행 취소할 수 없는 영구 작업입니다. 스키마가 이제 필요 없는지 확인합니다.

레지스트리에서 스키마를 삭제하려면 [DeleteSchema 작업(Python: delete\$1schema)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchema) API를 호출하여 스키마를 식별하는 `SchemaId` 구조를 지정합니다.

예:

```
aws glue delete-schema --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/registryName1/schemaname"
```

```
aws glue delete-schema --schema-id SchemaName="TestSchema6-deleteschemabyname",RegistryName="default-registry"
```

**AWS Glue 콘솔**  
AWS Glue 콘솔에서 스키마를 삭제하려면

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**스키마 레지스트리(Schema registries)**]를 선택합니다.

1. 레지스트리 목록에서 스키마가 포함된 레지스트리를 선택합니다.

1. 확인란을 선택하여 목록에서 하나 이상의 스키마를 선택합니다.

1. [**작업(Action)**] 메뉴에서 [**스키마 삭제(Delete schema)**]를 선택합니다.

1. 필드에 **Delete** 텍스트를 입력하여 삭제를 확인합니다.

1. **삭제**를 선택합니다.

지정한 스키마가 레지스트리에서 삭제됩니다.

## 스키마 버전 삭제
<a name="schema-registry-gs7b"></a>

스키마가 레지스트리에 누적되면 AWS Management Console 또는 [DeleteSchemaVersions 작업(Python: delete\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteSchemaVersions) API를 사용하여 원치 않는 스키마 버전을 삭제할 수 있습니다. 하나 이상의 스키마 버전 삭제는 실행 취소할 수 없는 영구 작업입니다. 스키마 버전이 이제 필요 없는지 확인합니다.

스키마 버전을 삭제할 때 다음 제약 조건에 유의합니다.
+ 체크포인트된 버전은 삭제할 수 없습니다.
+ 연속 버전의 범위는 25개를 초과할 수 없습니다.
+ 최신 스키마 버전은 보류 상태가 아니어야 합니다.

`SchemaId` 구조를 지정하여 스키마를 식별하고 `Versions`를 삭제할 버전 범위로 지정합니다. 버전 또는 버전 범위 지정에 대한 자세한 내용은 [DeleteRegistry 작업(Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) 섹션을 참조하세요. 지정한 스키마 버전이 레지스트리에서 삭제됩니다.

이 호출 후 [ListSchemaVersions 작업(Python: list\$1schema\$1versions)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-ListSchemaVersions) API를 호출하면 삭제된 버전의 상태가 나열됩니다.

예:

```
aws glue delete-schema-versions --schema-id SchemaName="TestSchema6",RegistryName="default-registry" --versions "1-1"
```

```
aws glue delete-schema-versions --schema-id SchemaArn="arn:aws:glue:us-east-2:901234567890:schema/default-registry/TestSchema6-NON-Existent" --versions "1-1"
```

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**스키마 레지스트리(Schema registries)**]를 선택합니다.

1. 레지스트리 목록에서 스키마가 포함된 레지스트리를 선택합니다.

1. 확인란을 선택하여 목록에서 하나 이상의 스키마를 선택합니다.

1. [**작업(Action)**] 메뉴에서 [**스키마 삭제(Delete schema)**]를 선택합니다.

1. 필드에 **Delete** 텍스트를 입력하여 삭제를 확인합니다.

1. **삭제**를 선택합니다.

지정한 스키마 버전이 레지스트리에서 삭제됩니다.

# 레지스트리 삭제
<a name="schema-registry-gs7c"></a>

포함된 스키마가 해당 레지스트리 아래에 더 이상 구성되지 않아야 하는 경우 레지스트리를 삭제할 수 있습니다. 이러한 스키마를 다른 레지스트리에 재할당해야 합니다.

하나 이상의 레지스트리 삭제는 실행 취소할 수 없는 영구 작업입니다. 레지스트리가 이제 필요 없는지 확인합니다.

AWS CLI를 사용하여 기본 레지스트리를 삭제할 수 있습니다.

**AWS Glue API**  
스키마와 모든 해당 버전을 포함한 전체 레지스트리를 삭제하려면 [DeleteRegistry 작업(Python: delete\$1registry)](aws-glue-api-schema-registry-api.md#aws-glue-api-schema-registry-api-DeleteRegistry) API를 호출합니다. `RegistryId` 구조를 지정하여 레지스트리를 식별합니다.

예:

```
aws glue delete-registry --registry-id RegistryArn="arn:aws:glue:us-east-2:901234567890:registry/registryName1"
```

```
aws glue delete-registry --registry-id RegistryName="TestRegistry-deletebyname"
```

삭제 작업의 상태를 얻으려면 비동기 호출 후 `GetRegistry` API를 호출할 수 있습니다.

**AWS Glue 콘솔**  
AWS Glue 콘솔에서 레지스트리를 삭제하려면

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**스키마 레지스트리(Schema registries)**]를 선택합니다.

1. 확인란을 선택하여 목록에서 레지스트리를 선택합니다.

1. [**작업(Action)**] 메뉴에서 [**레지스트리 삭제(Delete registry)**]를 선택합니다.

1. 필드에 **Delete** 텍스트를 입력하여 삭제를 확인합니다.

1. **삭제**를 선택합니다.

선택한 레지스트리가 AWS Glue에서 삭제됩니다.

## Serializer에 대한 IAM 예
<a name="schema-registry-gs1"></a>

**참고**  
AWS 관리형 정책은 일반 사용 사례에서 필요한 권한을 부여합니다. 관리형 정책으로 스키마 레지스트리 관리에 대한 자세한 내용은 [AWS Glue에 대한 AWS 관리형(미리 정의된) 정책](security-iam-awsmanpol.md#access-policy-examples-aws-managed) 섹션을 참조하세요.

Serializer의 경우 지정된 스키마 정의에 대해 `schemaVersionId`를 찾는 기능을 제공하려면 아래와 유사한 최소 정책을 만들어야 합니다. 레지스트리의 스키마를 읽으려면 레지스트리에 대한 읽기 권한이 있어야 합니다. `Resource` 절을 사용하여 읽을 수 있는 레지스트리를 제한할 수 있습니다.

코드 예제 13:

```
{
    "Sid" : "GetSchemaByDefinition",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition"
    ],
        "Resource" : ["arn:aws:glue:us-east-2:012345678:registry/registryname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-1",
                      "arn:aws:glue:us-east-2:012345678:schema/registryname-1/schemaname-2"
                     ]
}
```

또한 다음과 같은 추가 메서드를 포함하여 생산자가 새 스키마 및 버전을 생성하도록 허용할 수도 있습니다. 참고로 레지스트리 내부의 스키마를 추가/제거/발전시키기 위해 레지스트리를 검사할 수 있어야 합니다. `Resource` 절을 사용하여 검사할 수 있는 레지스트리를 제한할 수 있습니다.

코드 예제 14:

```
{
    "Sid" : "RegisterSchemaWithMetadata",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaByDefinition",
        "glue:CreateSchema",
        "glue:RegisterSchemaVersion",
        "glue:PutSchemaVersionMetadata",
    ],
    "Resource" : ["arn:aws:glue:aws-region:123456789012:registry/registryname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-1",
                  "arn:aws:glue:aws-region:123456789012:schema/registryname-1/schemaname-2"
                 ]
}
```

## Deserializer에 대한 IAM 예
<a name="schema-registry-gs1b"></a>

Deserializer(소비자 측)의 경우 deserializer가 역직렬화를 위해 Schema Registry에서 스키마를 가져올 수 있도록 아래와 유사한 정책을 생성해야 합니다. 참고로 레지스트리 내부의 스키마를 가져오기 위해 레지스트리를 검사할 수 있어야 합니다.

코드 예제 15:

```
{
    "Sid" : "GetSchemaVersion",
    "Effect" : "Allow",
    "Action" :
	[
        "glue:GetSchemaVersion"
    ],
    "Resource" : ["*"]
}
```

## AWS PrivateLink를 사용한 프라이빗 연결
<a name="schema-registry-gs-private"></a>

AWS PrivateLink를 사용하여 AWS Glue에 대한 인터페이스 VPC 엔드포인트를 정의하여 데이터 생산자의 VPC를 AWS Glue에 연결할 수 있습니다. VPC 엔드포인트를 사용하는 경우 VPC와 AWS Glue 사이의 통신은 모두 AWS 네트워크에서 수행됩니다. 자세한 내용은 [VPC 엔드포인트와 함께 AWS Glue 사용](https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoint.html)을 참조하세요.

# Amazon CloudWatch 지표에 액세스
<a name="schema-registry-gs-monitoring"></a>

Amazon CloudWatch 지표는 CloudWatch 프리 티어의 일부로 제공됩니다. CloudWatch 콘솔에서 이러한 지표에 액세스할 수 있습니다. API 수준 지표에는 CreateSchema(성공 및 대기 시간), GetSchemaByDefinition, (성공 및 대기 시간), GetSchemaVersion(성공 및 대기 시간), RegisterSchemaVersion(성공 및 대기 시간), PutSchemaVersionMetadata(성공 및 대기 시간)가 포함됩니다. 리소스 수준 지표에는 Registry.ThrottledByLimit, SchemaVersion.ThrottledByLimit, SchemaVersion.Size가 포함됩니다.

# 스키마 레지스트리용 샘플 CloudFormation 템플릿
<a name="schema-registry-integrations-cfn"></a>

다음은 CloudFormation에서 Schema Registry 리소스를 생성하기 위한 샘플 템플릿입니다. 계정에 이 스택을 생성하려면 위 템플릿을 파일 `SampleTemplate.yaml`에 복사하고 다음 명령을 실행합니다.

```
aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"
```

이 예에서는 `AWS::Glue::Registry`를 사용하여 레지스트리를 생성하고, `AWS::Glue::Schema`를 사용하여 스키마를 생성하고, `AWS::Glue::SchemaVersion`을 사용하여 스키마 버전을 생성하고, `AWS::Glue::SchemaVersionMetadata`를 사용하여 스키마 버전 메타데이터를 채웁니다.

```
Description: "A sample CloudFormation template for creating Schema Registry resources."
Resources:
  ABCRegistry:
    Type: "AWS::Glue::Registry"
    Properties:
      Name: "ABCSchemaRegistry"
      Description: "ABC Corp. Schema Registry"
      Tags:
        Project: "Foo"
  ABCSchema:
    Type: "AWS::Glue::Schema"
    Properties:
      Registry:
        Arn: !Ref ABCRegistry
      Name: "TestSchema"
      Compatibility: "NONE"
      DataFormat: "AVRO"
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
      Tags:
        Project: "Foo"
  SecondSchemaVersion:
    Type: "AWS::Glue::SchemaVersion"
    Properties:
      Schema:
        SchemaArn: !Ref ABCSchema
      SchemaDefinition: >
        {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]}
  FirstSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId
      Key: "Application"
      Value: "Kinesis"
  SecondSchemaVersionMetadata:
    Type: "AWS::Glue::SchemaVersionMetadata"
    Properties:
      SchemaVersionId: !Ref SecondSchemaVersion
      Key: "Application"
      Value: "Kinesis"
```

# AWS Glue Schema Registry와 통합
<a name="schema-registry-integrations"></a>

이 섹션에서는 AWS Glue 스키마 레지스트리와의 통합에 대해 설명합니다. 이 섹션의 예에서는 AVRO 데이터 포맷의 스키마를 보여줍니다. JSON 데이터 포맷의 스키마를 포함한 더 많은 예제는 [AWS Glue Schema Registry 오픈 소스 리포지토리](https://github.com/awslabs/aws-glue-schema-registry)의 통합 테스트 및 ReadMe 정보를 참조하세요.

**Topics**
+ [사용 사례: Amazon MSK 또는 Apache Kafka에 Schema Registry 연결](#schema-registry-integrations-amazon-msk)
+ [사용 사례: AWS Glue Schema Registry와 Amazon Kinesis Data Streams 통합](#schema-registry-integrations-kds)
+ [사용 사례: Amazon Managed Service for Apache Flink](#schema-registry-integrations-kinesis-data-analytics-apache-flink)
+ [사용 사례: AWS Lambda와 통합](#schema-registry-integrations-aws-lambda)
+ [사용 사례: AWS Glue Data Catalog](#schema-registry-integrations-aws-glue-data-catalog)
+ [사용 사례: AWS Glue 스트리밍](#schema-registry-integrations-aws-glue-streaming)
+ [사용 사례: Apache Kafka Streams](#schema-registry-integrations-apache-kafka-streams)

## 사용 사례: Amazon MSK 또는 Apache Kafka에 Schema Registry 연결
<a name="schema-registry-integrations-amazon-msk"></a>

Apache Kafka 주제에 데이터를 쓰고 있다고 가정하고 다음 단계에 따라 시작할 수 있습니다.

1. 하나 이상의 주제로 Amazon Managed Streaming for Apache Kafka(Amazon MSK) 또는 Apache Kafka 클러스터를 생성합니다. Amazon MSK 클러스터를 생성하는 경우 AWS Management Console을 사용할 수 있습니다. *Amazon Managed Streaming for Apache Kafka Developer Guide*의 [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)에서 설명하는 지침을 따릅니다.

1. 위의 [SerDe 라이브러리 설치](schema-registry-gs-serde.md) 단계를 따릅니다.

1. 스키마 레지스트리, 스키마 또는 스키마 버전을 생성하려면 이 문서의 [스키마 레지스트리 시작하기](schema-registry-gs.md) 섹션에 있는 지침을 따르세요.

1. 생산자와 소비자가 Schema Registry를 사용하여 Amazon MSK 또는 Apache Kafka 주제에서 레코드를 쓰고 읽도록 시작합니다. 예제 생산자 및 소비자 코드는 Serde 라이브러리의 [ReadMe 파일](https://github.com/awslabs/aws-glue-schema-registry/blob/master/README.md)에서 찾을 수 있습니다. 생산자의 Schema Registry 라이브러리는 자동으로 레코드를 직렬화하고 스키마 버전 ID로 레코드를 장식합니다.

1. 이 레코드의 스키마가 입력되었거나 자동 등록이 설정되어 있으면 Schema Registry에 스키마가 등록됩니다.

1. AWS Glue Schema Registry 라이브러리를 사용하여 Amazon MSK 또는 Apache Kafka 주제에서 읽는 소비자는 Schema Registry에서 스키마를 자동으로 조회합니다.

## 사용 사례: AWS Glue Schema Registry와 Amazon Kinesis Data Streams 통합
<a name="schema-registry-integrations-kds"></a>

이 통합을 위해서는 기존 Amazon Kinesis 데이터 스트림이 있어야 합니다. 자세한 내용은 *Amazon Kinesis Data Streams Developer Guide*의 [Getting Started with Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html)를 참조하세요.

Kinesis 데이터 스트림의 데이터와 상호 작용할 수 있는 두 가지 방법이 있습니다.
+ Java의 Kinesis Producer Library(KPL) 및 Kinesis Client Library(KCL) 라이브러리를 통해 다국어 지원은 제공되지 않습니다.
+ AWS SDK for Java에서 사용 가능한 `PutRecords`, `PutRecord` 및 `GetRecords` Kinesis Data Streams API를 통해.

현재 KPL/KCL 라이브러리를 사용하는 경우 해당 방법을 계속 사용하는 것이 좋습니다. 예제와 같이 Schema Registry가 통합된 업데이트된 KCL 및 KPL 버전이 있습니다. 그렇지 않으면 AWS Glue KDS API를 직접 사용하는 경우 샘플 코드를 사용하여 Schema Registry를 활용할 수 있습니다.

Schema Registry 통합은 KPL v0.14.2 이상 및 KCL v2.3 이상에서만 사용할 수 있습니다. JSON 데이터 포맷과 Schema Registry 통합은 KPL v0.14.8 이상 및 KCL v2.3.6 이상에서 사용할 수 있습니다.

### Kinesis SDK V2를 사용하여 데이터와 상호 작용
<a name="schema-registry-integrations-kds-sdk-v2"></a>

이 섹션에서는 Kinesis SDK V2를 사용한 Kinesis와의 상호 작용에 대해 설명합니다.

```
// Example JSON Record, you can construct a AVRO record also
private static final JsonDataWithSchema record = JsonDataWithSchema.builder(schemaString, payloadString);
private static final DataFormat dataFormat = DataFormat.JSON;

//Configurations for Schema Registry
GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("us-east-1");

GlueSchemaRegistrySerializer glueSchemaRegistrySerializer =
        new GlueSchemaRegistrySerializerImpl(awsCredentialsProvider, gsrConfig);
GlueSchemaRegistryDataFormatSerializer dataFormatSerializer =
        new GlueSchemaRegistrySerializerFactory().getInstance(dataFormat, gsrConfig);

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

byte[] serializedBytes = dataFormatSerializer.serialize(record);

byte[] gsrEncodedBytes = glueSchemaRegistrySerializer.encode(streamName, gsrSchema, serializedBytes);

PutRecordRequest putRecordRequest = PutRecordRequest.builder()
        .streamName(streamName)
        .partitionKey("partitionKey")
        .data(SdkBytes.fromByteArray(gsrEncodedBytes))
        .build();
shardId = kinesisClient.putRecord(putRecordRequest)
        .get()
        .shardId();

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(awsCredentialsProvider, gsrConfig);

GlueSchemaRegistryDataFormatDeserializer gsrDataFormatDeserializer =
        glueSchemaRegistryDeserializerFactory.getInstance(dataFormat, gsrConfig);

GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
        .streamName(streamName)
        .shardId(shardId)
        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
        .build();

String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest)
        .get()
        .shardIterator();

GetRecordsRequest getRecordRequest = GetRecordsRequest.builder()
        .shardIterator(shardIterator)
        .build();
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest)
        .get();

List<Object> consumerRecords = new ArrayList<>();
List<Record> recordsFromKinesis = recordsResponse.records();

for (int i = 0; i < recordsFromKinesis.size(); i++) {
    byte[] consumedBytes = recordsFromKinesis.get(i)
            .data()
            .asByteArray();

    Schema gsrSchema = glueSchemaRegistryDeserializer.getSchema(consumedBytes);
    Object decodedRecord = gsrDataFormatDeserializer.deserialize(ByteBuffer.wrap(consumedBytes),
                                                                    gsrSchema.getSchemaDefinition());
    consumerRecords.add(decodedRecord);
}
```

### KPL/KCL 라이브러리를 사용하여 데이터와 상호 작용
<a name="schema-registry-integrations-kds-libraries"></a>

이 섹션에서는 KPL/KCL 라이브러리를 사용하여 Kinesis Data Streams를 Schema Registry와 통합하는 방법을 설명합니다. KPL/KCL 사용에 대한 자세한 내용은 *Amazon Kinesis Data Streams Developer Guide*의 [Developing Producers Using the Amazon Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)를 참조하세요.

#### KPL에서 Schema Registry 설정
<a name="schema-registry-integrations-kds-libraries-kpl"></a>

1. AWS Glue Schema Registry에서 작성된 데이터, 데이터 포맷 및 스키마 이름에 대한 스키마 정의를 정의합니다.

1. 필요에 따라 `GlueSchemaRegistryConfiguration` 객체를 구성합니다.

1. `addUserRecord API`로 스키마 객체를 전달합니다.

   ```
   private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n"
   + " "type": "record",\n"
   + " "name": "User",\n"
   + " "fields": [\n"
   + " {"name": "name", "type": "string"},\n"
   + " {"name": "favorite_number", "type": ["int", "null"]},\n"
   + " {"name": "favorite_color", "type": ["string", "null"]}\n"
   + " ]\n"
   + "}";
   
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   config.setRegion("us-west-1")
   
   //[Optional] configuration for Schema Registry.
   
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
   new GlueSchemaRegistryConfiguration("us-west-1");
   
   schemaRegistryConfig.setCompression(true);
   
   config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);
   
   ///Optional configuration ends.
   
   final KinesisProducer producer =
         new KinesisProducer(config);
   
   final ByteBuffer data = getDataToSend();
   
   com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
       new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
   
   ListenableFuture<UserRecordResult> f = producer.addUserRecord(
   config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
   
   private static ByteBuffer getDataToSend() {
         org.apache.avro.Schema avroSchema =
           new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);
   
         GenericRecord user = new GenericData.Record(avroSchema);
         user.put("name", "Emily");
         user.put("favorite_number", 32);
         user.put("favorite_color", "green");
   
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
         new GenericDatumWriter<>(avroSchema).write(user, encoder);
         encoder.flush();
         return ByteBuffer.wrap(outBytes.toByteArray());
    }
   ```

#### Kinesis 클라이언트 라이브러리 설정
<a name="schema-registry-integrations-kds-libraries-kcl"></a>

Java로 Kinesis Client Library 소비자를 개발합니다. 자세한 내용은 *Amazon Kinesis Data Streams Developer Guide*의 [Developing a Kinesis Client Library Consumer in Java](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html)를 참조하세요.

1. `GlueSchemaRegistryConfiguration` 객체를 전달하여 `GlueSchemaRegistryDeserializer`의 인스턴스를 생성합니다.

1. `retrievalConfig.glueSchemaRegistryDeserializer`에 `GlueSchemaRegistryDeserializer`를 전달합니다.

1. `kinesisClientRecord.getSchema()`를 호출하여 수신 메시지의 스키마에 액세스합니다.

   ```
   GlueSchemaRegistryConfiguration schemaRegistryConfig =
       new GlueSchemaRegistryConfiguration(this.region.toString());
   
    GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
       new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);
   
    RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
    retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
   
     Scheduler scheduler = new Scheduler(
               configsBuilder.checkpointConfig(),
               configsBuilder.coordinatorConfig(),
               configsBuilder.leaseManagementConfig(),
               configsBuilder.lifecycleConfig(),
               configsBuilder.metricsConfig(),
               configsBuilder.processorConfig(),
               retrievalConfig
           );
   
    public void processRecords(ProcessRecordsInput processRecordsInput) {
               MDC.put(SHARD_ID_MDC_KEY, shardId);
               try {
                   log.info("Processing {} record(s)",
                   processRecordsInput.records().size());
                   processRecordsInput.records()
                   .forEach(
                       r ->
                           log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}",
                           r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
               } catch (Throwable t) {
                   log.error("Caught throwable while processing records. Aborting.");
                   Runtime.getRuntime().halt(1);
               } finally {
                   MDC.remove(SHARD_ID_MDC_KEY);
               }
    }
   
    private GenericRecord recordToAvroObj(KinesisClientRecord r) {
       byte[] data = new byte[r.data().remaining()];
       r.data().get(data, 0, data.length);
       org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
       DatumReader datumReader = new GenericDatumReader<>(schema);
   
       BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
       return (GenericRecord) datumReader.read(null, binaryDecoder);
    }
   ```

#### Kinesis Data Streams API를 사용하여 데이터와 상호 작용
<a name="schema-registry-integrations-kds-apis"></a>

이 섹션에서는 Kinesis Data Streams API를 사용하여 Kinesis Data Streams를 Schema Registry와 통합하는 방법을 설명합니다.

1. 다음 Maven 종속성을 업데이트합니다.

   ```
   <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <version>1.11.884</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-kinesis</artifactId>
           </dependency>
   
           <dependency>
               <groupId>software.amazon.glue</groupId>
               <artifactId>schema-registry-serde</artifactId>
               <version>1.1.5</version>
           </dependency>
   
           <dependency>
               <groupId>com.fasterxml.jackson.dataformat</groupId>
               <artifactId>jackson-dataformat-cbor</artifactId>
               <version>2.11.3</version>
           </dependency>
       </dependencies>
   ```

1. 생산자에서 Kinesis Data Streams의 `PutRecords` 또는 `PutRecord` API를 사용하여 스키마 헤더 정보를 추가합니다.

   ```
   //The following lines add a Schema Header to the record
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                   schemaName);
           GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
               new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs()));
           byte[] recordWithSchemaHeader =
               glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
   ```

1. 생산자에서 `PutRecords` 또는 `PutRecord` API를 사용하여 레코드를 데이터 스트림에 넣습니다.

1. 소비자의 헤더에서 스키마 레코드를 제거하고 Avro 스키마 레코드를 직렬화합니다.

   ```
   //The following lines remove Schema Header from record
           GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
               new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs());
           byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
           recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);
           com.amazonaws.services.schemaregistry.common.Schema awsSchema =
               glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);
           byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);
   
           //The following lines serialize an AVRO schema record
           if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
               Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
               Object genericRecord = convertBytesToRecord(avroSchema, record);
               System.out.println(genericRecord);
           }
   ```

#### Kinesis Data Streams API를 사용하여 데이터와 상호 작용
<a name="schema-registry-integrations-kds-apis-reference"></a>

다음은 `PutRecords` 및 `GetRecords` API 사용을 위한 예제 코드입니다.

```
//Full sample code
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class PutAndGetExampleWithEncodedData {
    static final String regionName = "us-east-2";
    static final String streamName = "testStream1";
    static final String schemaName = "User-Topic";
    static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc";
    KinesisApi kinesisApi = new KinesisApi();

    void runSampleForPutRecord() throws IOException {
        Object testRecord = getTestRecord();
        byte[] recordAsBytes = convertRecordToBytes(testRecord);
        String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord);

        //The following lines add a Schema Header to a record
        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(),
                schemaName);
        GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer =
            new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeader =
            glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);

        //Use PutRecords api to pass a list of records
        kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName);

        //OR
        //Use PutRecord api to pass single record
        //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName);
    }

    byte[] runSampleForGetRecord() throws IOException {
        ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName);

        //The following lines remove the schema registry header
        GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer =
            new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName));
        byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()];
        recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length);

        com.amazonaws.services.schemaregistry.common.Schema awsSchema =
            glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes);

        byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes);

        //The following lines serialize an AVRO schema record
        if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) {
            Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition());
            Object genericRecord = convertBytesToRecord(avroSchema, record);
            System.out.println(genericRecord);
        }

        return record;
    }

    private byte[] convertRecordToBytes(final Object record) throws IOException {
        ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null);
        GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record));
        datumWriter.write(record, encoder);
        encoder.flush();
        return recordAsBytes.toByteArray();
    }

    private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException {
        final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(record, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
        return genericRecord;
    }

    private Map<String, String> getMetadata() {
        Map<String, String> metadata = new HashMap<>();
        metadata.put("event-source-1", "topic1");
        metadata.put("event-source-2", "topic2");
        metadata.put("event-source-3", "topic3");
        metadata.put("event-source-4", "topic4");
        metadata.put("event-source-5", "topic5");
        return metadata;
    }

    private GlueSchemaRegistryConfiguration getConfigs() {
        GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName);
        configs.setSchemaName(schemaName);
        configs.setAutoRegistration(true);
        configs.setMetadata(getMetadata());
        return configs;
    }

    private Object getTestRecord() throws IOException {
        GenericRecord genericRecord;
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE));

        genericRecord = new GenericData.Record(avroSchema);
        genericRecord.put("name", "testName");
        genericRecord.put("favorite_number", 99);
        genericRecord.put("favorite_color", "red");

        return genericRecord;
    }
}
```

## 사용 사례: Amazon Managed Service for Apache Flink
<a name="schema-registry-integrations-kinesis-data-analytics-apache-flink"></a>

Apache Flink는 무제한 및 제한 데이터 스트림에 대한 상태 저장 계산에 널리 사용되는 오픈 소스 프레임워크 및 분산 처리 엔진입니다. Amazon Managed Service for Apache Flink는 스트리밍 데이터를 처리하는 Apache Flink 애플리케이션을 구축 및 관리할 수 있는 완전관리형 AWS 서비스입니다.

오픈 소스 Apache Flink는 다양한 소스와 싱크를 제공합니다. 예를 들어 사전 정의된 데이터 원본에는 파일, 디렉터리 및 소켓에서 읽기와 컬렉션 및 반복기에서 데이터 수집이 포함됩니다. Apache Flink DataStream 커넥터는 Apache Flink가 소스 및/또는 싱크로 Apache Kafka 또는 Kinesis와 같은 다양한 서드 파티 시스템과 인터페이스할 수 있는 코드를 제공합니다.

자세한 내용은 [Amazon Kinesis Data Analytics Developer Guide](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html)를 참조하세요.

### Apache Flink Kafka 커넥터
<a name="schema-registry-integrations-kafka-connector"></a>

Apache Flink는 정확히 1회 보장으로 Kafka 주제에서 데이터를 읽고 쓰기 위한 Apache Kafka 데이터 스트림 커넥터를 제공합니다. Flink의 Kafka 소비자인 `FlinkKafkaConsumer`는 하나 이상의 Kafka 주제에서 읽을 수 있는 액세스를 제공합니다. Apache Flink의 Kafka Producer인 `FlinkKafkaProducer`를 사용하면 하나 이상의 Kafka 주제에 대한 레코드 스트림을 작성할 수 있습니다. 자세한 내용은 [Apache Kafka 커넥터](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html)를 참조하세요.

### Apache Flink Kinesis 스트림 커넥터
<a name="schema-registry-integrations-kinesis-connector"></a>

Kinesis 데이터 스트림 커넥터는 Amazon Kinesis Data Streams에 대한 액세스를 제공합니다. `FlinkKinesisConsumer`는 동일한 AWS 서비스 리전 내에서 여러 Kinesis 스트림을 구독하는 정확히 1회 병렬 스트리밍 데이터 원본이며 작업이 실행되는 동안 스트림의 다시 샤딩을 투명하게 처리할 수 있습니다. 소비자의 각 하위 태스크는 여러 Kinesis 샤드에서 데이터 레코드를 가져오는 일을 담당합니다. 각 하위 태스크에서 가져온 샤드 수는 샤드가 다히고 Kinesis에서 생성됨에 따라 변경됩니다. `FlinkKinesisProducer`는 Kinesis Producer Library(KPL)를 사용하여 Apache Flink 스트림의 데이터를 Kinesis 스트림에 넣습니다. 자세한 내용은 [Amazon Kinesis Streams 커넥터](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kinesis.html)를 참조하세요.

자세한 내용은 [AWS Glue 스키마 Github 리포지토리](https://github.com/awslabs/aws-glue-schema-registry)를 참조하세요.

### Apache Flink와의 통합
<a name="schema-registry-integrations-apache-flink-integrate"></a>

Schema Registry와 함께 제공되는 SerDes 라이브러리는 Apache Flink와 통합됩니다. Apache Flink를 사용하려면 Apache Flink 커넥터에 연결할 수 있는 `GlueSchemaRegistryAvroSerializationSchema` 및 `GlueSchemaRegistryAvroDeserializationSchema`라는 [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java) 및 [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java](https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java) 인터페이스를 구현해야 합니다.

### Apache Flink 애플리케이션에 AWS Glue Schema Registry 종속성 추가
<a name="schema-registry-integrations-kinesis-data-analytics-dependencies"></a>

Apache Flink 애플리케이션에서 AWS Glue Schema Registry에 대한 통합 종속성을 설정하려면

1. 종속 프로그램을 `pom.xml` 파일에 추가합니다.

   ```
   <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-flink-serde</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

#### pache Flink와 Kafka 또는 Amazon MSK 통합
<a name="schema-registry-integrations-kda-integrate-msk"></a>

Kafka를 소스나 싱크로 사용하여 Apache Flink용 Managed Service for Apache Flink를 사용할 수 있습니다.

**소스형 Kafka**  
다음 다이어그램에서는 Kafka를 소스로 사용하여 Kinesis Data Streams를 Apache Flink용 Managed Service for Apache Flink와 통합하는 방법을 보여줍니다.

![\[소스형 Kafka입니다.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/gsr-kafka-source.png)


**싱크로 Kafka**  
다음 다이어그램에서는 Kafka를 싱크로 사용하여 Kinesis Data Streams를 Apache Flink용 Managed Service for Apache Flink와 통합하는 방법을 보여줍니다.

![\[싱크로 Kafka.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/gsr-kafka-sink.png)


Kafka를 소스 또는 싱크로 사용하여 Kafka(또는 Amazon MSK)를 Apache Flink용 Managed Service for Apache Flink와 통합하려면 아래와 같이 코드를 변경합니다. 굵게 표시된 코드 블록을 유사한 섹션의 해당 코드에 추가합니다.

Kafka가 소스인 경우 deserializer 코드를 사용합니다(블록 2). Kafka가 싱크인 경우 serializer 코드(블록 3)를 사용합니다.

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String topic = "topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
    topic,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
    topic,
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

#### Apache Flink와 Kinesis Data Streams 통합
<a name="schema-registry-integrations-integrate-kds"></a>

Kinesis Data Streams를 소스 또는 싱크로 사용하여 Apache Flink용 Managed Service for Apache Flink를 사용할 수 있습니다.

**소스로 Kinesis Data Streams**  
다음 다이어그램에서는 Kinesis Data Streams를 소스로 사용하여 Kinesis Data Streams를 Apache Flink용 Managed Service for Apache Flink와 통합하는 방법을 보여줍니다.

![\[소스로 Kinesis Data Streams.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/gsr-kinesis-source.png)


**싱크로 Kinesis Data Streams**  
다음 다이어그램에서는 Kinesis Data Streams를 싱크로 사용하여 Kinesis Data Streams를 Apache Flink용 Managed Service for Apache Flink와 통합하는 방법을 보여줍니다.

![\[싱크로 Kinesis Data Streams.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/gsr-kinesis-sink.png)


Kinesis Data Streams를 소스 또는 싱크로 사용하여 Kinesis Data Streams를 Apache Flink용 Managed Service for Apache Flink와 통합하려면 아래와 같이 코드를 변경합니다. 굵게 표시된 코드 블록을 유사한 섹션의 해당 코드에 추가합니다.

Kinesis Data Streams가 소스인 경우 deserializer 코드(블록 2)를 사용합니다. Kinesis Data Streams가 싱크인 경우 serializer 코드(블록 3)를 사용합니다.

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String streamName = "stream";
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "aws-region");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// block 1
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>(
    streamName,
    // block 2
    GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
    properties);

FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>(
    // block 3
    GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
    properties);
producer.setDefaultStream(streamName);
producer.setDefaultPartition("0");

DataStream<GenericRecord> stream = env.addSource(consumer);
stream.addSink(producer);
env.execute();
```

## 사용 사례: AWS Lambda와 통합
<a name="schema-registry-integrations-aws-lambda"></a>

AWS Lambda 함수를 Apache Kafka/Amazon MSK 소비자로 사용하고 AWS Glue Schema Registry를 사용하여 Avro 인코딩 메시지를 역직렬화하려면 [MSK Labs 페이지](https://amazonmsk-labs.workshop.aws/en/msklambda/gsrschemareg.html)를 방문합니다.

## 사용 사례: AWS Glue Data Catalog
<a name="schema-registry-integrations-aws-glue-data-catalog"></a>

AWS Glue 테이블은 수동으로 지정하거나 AWS Glue Schema Registry를 참조하여 지정할 수 있는 스키마를 지원합니다. Schema Registry는 데이터 카탈로그와 통합되어 데이터 카탈로그에서 AWS Glue 테이블 또는 파티션을 생성하거나 업데이트할 때 Schema Registry에 저장된 스키마를 선택적으로 사용할 수 있습니다. Schema Registry에서 스키마 정의를 식별하려면 최소한 스키마가 속한 스키마의 ARN을 알아야 합니다. 스키마 정의를 포함하는 스키마의 스키마 버전은 해당 UUID 또는 버전 번호로 참조할 수 있습니다. 버전 번호나 UUID를 몰라도 조회할 수 있는 "최신" 버전이라는 하나의 스키마 버전이 항상 있습니다.

`CreateTable` 또는 `UpdateTable` 작업을 호출할 때 Schema Registry의 기존 스키마에 `SchemaReference`가 있을 수 있는 `StorageDescriptor`를 포함하는 `TableInput` 구조를 전달합니다. 마찬가지로 `GetTable` 또는 `GetPartition` API를 호출할 때 응답에 스키마와 `SchemaReference`가 포함될 수 있습니다. 스키마 참조를 사용하여 테이블 또는 파티션이 생성되면 데이터 카탈로그는 이 스키마 참조에 대한 스키마를 가져오려고 시도합니다. Schema Registry에서 스키마를 찾을 수 없는 경우 `GetTable` 응답에서 빈 스키마를 반환합니다. 그렇지 않으면 응답에 스키마와 스키마 참조가 모두 포함됩니다.

AWS Glue 콘솔에서 작업을 수행할 수도 있습니다.

이러한 작업을 수행하고 스키마 정보를 생성, 업데이트 또는 보려면 `GetSchemaVersion` API에 대한 권한을 제공하는 호출 사용자에게 IAM 역할을 부여해야 합니다.

### 테이블 추가 또는 테이블에 대한 스키마 업데이트
<a name="schema-registry-integrations-aws-glue-data-catalog-table"></a>

기존 스키마에서 새 테이블을 추가하면 테이블이 특정 스키마 버전에 바인딩됩니다. 새 스키마 버전이 등록되면 AWS Glue 콘솔의 테이블 보기 페이지에서 또는 [UpdateTable 작업(Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API를 사용하여 이 테이블 정의를 업데이트할 수 있습니다.

#### 기존 스키마에서 테이블 추가
<a name="schema-registry-integrations-aws-glue-data-catalog-table-existing"></a>

AWS Glue 콘솔 또는 `CreateTable` API를 사용하여 레지스트리의 스키마 버전에서 AWS Glue 테이블을 생성할 수 있습니다.

**AWS Glue API**  
`CreateTable` API를 호출할 때 Schema Registry의 기존 스키마에 `SchemaReference`가 있고 `StorageDescriptor`가 포함된 `TableInput`을 전달합니다.

**AWS Glue 콘솔**  
AWS Glue 콘솔에서 테이블을 생성하려면

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**테이블(Tables)**]을 선택합니다.

1. [**테이블 추가(Add Tables)**] 메뉴에서 [**기존 스키마에서 테이블 추가(Add table from existing schema)**]를 선택합니다.

1. AWS Glue 개발자 안내서에 따라 테이블 속성 및 데이터 스토어를 구성합니다.

1. [**Glue 스키마 선택(Choose a Glue schema)**] 페이지에서 스키마가 있는 [**레지스트리(Registry)**]를 선택합니다.

1. [**스키마 이름(Schema name)**]을 선택하고 적용할 스키마의 [**버전(Version)**]을 선택합니다.

1. 스키마 미리 보기를 검토하고 [**다음(Next)**]을 선택합니다.

1. 테이블을 검토하고 생성합니다.

테이블에 적용된 스키마 및 버전은 테이블 목록의 [**Glue 스키마(Glue schema)**] 열에 나타납니다. 테이블을 보고 자세한 세부 정보를 확인할 수 있습니다.

#### 테이블에 대한 스키마 업데이트
<a name="schema-registry-integrations-aws-glue-data-catalog-table-updating"></a>

새 스키마 버전을 사용할 수 있게 되면 [UpdateTable 작업(Python: update\$1table)](aws-glue-api-catalog-tables.md#aws-glue-api-catalog-tables-UpdateTable) API 또는 AWS Glue 콘솔을 사용하여 테이블의 스키마를 업데이트할 수 있습니다.

**중요**  
AWS Glue 스키마가 수동으로 지정된 기존 테이블의 스키마를 업데이트할 때 Schema Registry에서 참조하는 새 스키마가 호환되지 않을 수 있습니다. 이로 인해 작업이 실패할 수 있습니다.

**AWS Glue API**  
`UpdateTable` API를 호출할 때 Schema Registry의 기존 스키마에 `SchemaReference`가 있고 `StorageDescriptor`가 포함된 `TableInput`을 전달합니다.

**AWS Glue 콘솔**  
AWS Glue 콘솔에서 테이블에 대한 스키마를 업데이트하려면

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue\)에서 AWS Glue 콘솔을 엽니다.

1. 탐색 창의 [**데이터 카탈로그(Data catalog)**]에서 [**테이블(Tables)**]을 선택합니다.

1. 테이블 목록에서 테이블을 봅니다.

1. 새 버전에 대해 알려주는 상자에서 [**스키마 업데이트(Update schema)**]를 클릭합니다.

1. 현재 스키마와 새 스키마의 차이점을 검토합니다.

1. [**모든 스키마 차이점 표시(Show all schema differences)**]를 선택 더 많은 세부 정보를 볼 수 있습니다.

1. [**테이블 저장(Save table)**]을 선택하여 새 버전을 수락합니다.

## 사용 사례: AWS Glue 스트리밍
<a name="schema-registry-integrations-aws-glue-streaming"></a>

AWS Glue 스트리밍은 출력 싱크에 쓰기 전에 스트리밍 소스의 데이터를 소비하고 ETL 작업을 수행합니다. 입력 스트리밍 소스는 데이터 테이블을 사용하여 지정하거나 소스 구성을 지정하여 직접 지정할 수 있습니다.

AWS Glue 스트리밍은 에 AWS Glue 스키마 레지스트리에 있는 스키마를 사용하여 생성된 스트리밍 소스에 대한 데이터 카탈로그 테이블을 지원합니다. AWS Glue 스키마 레지스트리에서 스키마를 생성하고 이 스키마를 사용하는 스트리밍 소스가 있는 AWS Glue 테이블을 생성할 수 있습니다. 이 AWS Glue 테이블을 입력 스트림의 데이터를 역직렬화하기 위한 AWS Glue 스트리밍 작업에 대한 입력으로 사용할 수 있습니다.

여기서 유의해야 할 점은 AWS Glue 스키마 레지스트리의 스키마가 변경되면 AWS Glue 스트리밍 작업을 다시 시작해야 스키마의 변경 사항이 반영된다는 것입니다.

## 사용 사례: Apache Kafka Streams
<a name="schema-registry-integrations-apache-kafka-streams"></a>

Apache Kafka Streams API는 Apache Kafka에 저장된 데이터를 처리하고 분석하기 위한 클라이언트 라이브러리입니다. 이 섹션에서는 데이터 스트리밍 애플리케이션에서 스키마를 관리하고 시행할 수 있는 AWS Glue Schema Registry와 Apache Kafka Streams의 통합에 대해 설명합니다. Apache Kafka Streams에 대한 자세한 내용은 [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/)를 참조하세요.

### SerDes 라이브러리와의 통합
<a name="schema-registry-integrations-apache-kafka-streams-integrate"></a>

Streams 애플리케이션을 구성할 수 있는 `GlueSchemaRegistryKafkaStreamsSerde` 클래스가 있습니다.

#### Kafka Streams 애플리케이션 예 코드
<a name="schema-registry-integrations-apache-kafka-streams-application"></a>

Apache Kafka Streams 애플리케이션 내에서 AWS Glue Schema Registry를 사용하려면

1. Kafka Streams 애플리케이션을 구성합니다.

   ```
   final Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   
       props.put(AWSSchemaRegistryConstants.AWS_REGION, "aws-region");
       props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
       props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
   	props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
   ```

1. 주제 avro-input에서 스트림을 생성합니다.

   ```
   StreamsBuilder builder = new StreamsBuilder();
   final KStream<String, GenericRecord> source = builder.stream("avro-input");
   ```

1. 데이터 레코드를 처리합니다(이 예에서는 favorite\$1color 값이 pink이거나 양의 값이 15인 레코드를 필터링함).

   ```
   final KStream<String, GenericRecord> result = source
       .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
       .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
   ```

1. avro-output 주제에 결과를 다시 씁니다.

   ```
   result.to("avro-output");
   ```

1. Apache Kafka Streams 애플리케이션을 시작합니다.

   ```
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
   ```

#### 구현 결과
<a name="schema-registry-integrations-apache-kafka-streams-results"></a>

이 결과는 3단계에서 favorite\$1color "pink" 또는 값 "15.0"으로 필터링된 레코드의 필터링 프로세스를 보여줍니다.

필터링 전 레코드:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}
{"name": "Jay", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
	{"id": "commute_1","amount": 15}
```

필터링 후 레코드:

```
{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"}
{"name": "Harry", "favorite_number": 10, "favorite_color": "black"}
{"name": "Hermione", "favorite_number": 1, "favorite_color": "red"}
{"name": "Ron", "favorite_number": 0, "favorite_color": "pink"}

{"id": "commute_1","amount": 3.5}
{"id": "grocery_1","amount": 25.5}
{"id": "entertainment_1","amount": 19.2}
{"id": "entertainment_2","amount": 105}
```

### 사용 사례: Apache Kafka Connect
<a name="schema-registry-integrations-apache-kafka-connect"></a>

Apache Kafka Connect와 AWS Glue Schema Registry의 통합으로 커넥터에서 스키마 정보를 가져올 수 있습니다. Apache Kafka 변환기는 Apache Kafka 내의 데이터 포맷과 이를 Apache Kafka Connect 데이터로 변환하는 방법을 지정합니다. 모든 Apache Kafka Connect 사용자는 Apache Kafka에서 로드하거나 Apache Kafka에 저장할 때 데이터를 원하는 포맷을 기반으로 이러한 변환기를 구성해야 합니다. 이러한 방식으로 Apache Kafka Connect 데이터를 AWS Glue Schema Registry(예: Avro)에서 사용되는 유형으로 변환하고 serializer를 사용하여 스키마를 등록하고 직렬화를 수행하는 변환기를 정의할 수 있습니다. 그런 다음 변환기는 deserializer를 사용하여 Apache Kafka에서 수신한 데이터를 역직렬화하고 이를 다시 Apache Kafka Connect 데이터로 변환할 수도 있습니다. 다음은 예제 워크플로 다이어그램입니다.

![\[Apache Kafka Connect 워크플로.\]](http://docs.aws.amazon.com/ko_kr/glue/latest/dg/images/schema_reg_int_kafka_connect.png)


1. [AWS Glue Schema Registry용 Github 리포지토리](https://github.com/awslabs/aws-glue-schema-registry)를 복제하여 `aws-glue-schema-registry` 프로젝트를 설치합니다.

   ```
   git clone git@github.com:awslabs/aws-glue-schema-registry.git
   cd aws-glue-schema-registry
   mvn clean install
   mvn dependency:copy-dependencies
   ```

1. *독립 실행형* 모드에서 Apache Kafka Connect를 사용하려는 경우 이 단계에 대한 아래 지침을 사용하여 **connect-standalone.properties**를 업데이트합니다. *분산* 모드에서 Apache Kafka Connect를 사용하려는 경우 동일한 지침을 사용하여 **connect-avro-distributed.properties**를 업데이트합니다.

   1. Apache Kafka 연결 속성 파일에도 다음 속성을 추가합니다.

      ```
      key.converter.region=aws-region
      value.converter.region=aws-region
      key.converter.schemaAutoRegistrationEnabled=true
      value.converter.schemaAutoRegistrationEnabled=true
      key.converter.avroRecordType=GENERIC_RECORD
      value.converter.avroRecordType=GENERIC_RECORD
      ```

   1. **kafka-run-class.sh** 아래의 [**시작 모드(Launch mode)**] 섹션에 아래 명령을 추가합니다.

      ```
      -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*"
      ```

1. **kafka-run-class.sh** 아래의 [**시작 모드(Launch mode)**] 섹션에 아래 명령을 추가합니다.

   ```
   -cp $CLASSPATH:"<your AWS GlueSchema Registry base directory>/target/dependency/*" 
   ```

   형식은 다음과 같아야 합니다.

   ```
   # Launch mode
   if [ "x$DAEMON_MODE" = "xtrue" ]; then
     nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
   else
     exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
   fi
   ```

1. bash를 사용하는 경우 아래 명령을 실행하여 bash\$1profile에 CLASSPATH를 설정합니다. 다른 셸의 경우 그에 따라 환경을 업데이트합니다.

   ```
   echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
   echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile
   echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile
   echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
   source ~/.bash_profile
   ```

1. (선택 사항) 간단한 파일 소스로 테스트하려면 파일 소스 커넥터를 복제합니다.

   ```
   git clone https://github.com/mmolimar/kafka-connect-fs.git
   cd kafka-connect-fs/
   ```

   1. 소스 커넥터 구성에서 데이터 포맷을 Avro로, 파일 리더를 `AvroFileReader`으로 편집하고 읽고 있는 파일 경로에서 예제 Avro 객체를 업데이트합니다. 예:

      ```
      vim config/kafka-connect-fs.properties
      ```

      ```
      fs.uris=<path to a sample avro object>
      policy.regexp=^.*\.avro$
      file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
      ```

   1. 소스 커넥터를 설치합니다.

      ```
      mvn clean package
      echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
      source ~/.bash_profile
      ```

   1. `<your Apache Kafka installation directory>/config/connect-file-sink.properties` 아래의 싱크 속성을 업데이트하고 주제 이름과 출력 파일 이름을 업데이트합니다.

      ```
      file=<output file full path>
      topics=<my topic>
      ```

1. 소스 커넥터를 시작합니다 (이 예에서는 파일 소스 커넥터임).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
   ```

1. 싱크 커넥터를 실행합니다(이 예에서는 파일 싱크 커넥터임).

   ```
   $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
   ```

   Kafka Connect 사용 예는 [AWS Glue Schema Registry용 Github 리포지토리](https://github.com/awslabs/aws-glue-schema-registry/tree/master/integration-tests)의 integration-tests 폴더에 있는 run-local-tests.sh 스크립트를 참조하세요.

# 서드 파티 Schema Registry에서 AWS Glue Schema Registry로 마이그레이션
<a name="schema-registry-integrations-migration"></a>

서드 파티 Schema Registry에서 AWS Glue Schema Registry로의 마이그레이션은 기존의 현재 서드 파티 Schema Registry에 종속됩니다. Apache Kafka 주제에 서드 파티 스키마 레지스트리를 사용하여 전송된 레코드가 있는 경우 소비자는 해당 레코드를 역직렬화하기 위해 서드 파티 스키마 레지스트리가 필요합니다. `AWSKafkaAvroDeserializer`는 서드 파티 deserializer를 가리키고 해당 레코드를 역직렬화하는 데 사용되는 보조 deserializer 클래스를 지정하는 기능을 제공합니다.

서드 파티 스키마의 사용 중지에는 두 가지 기준이 있습니다. 첫째, 서드 파티 스키마 레지스트리를 사용하는 Apache Kafka 주제의 레코드가 소비자에게 더 이상 필요하지 않은 경우에만 사용 중지가 발생할 수 있습니다. 둘째, 해당 주제에 대해 지정된 보존 기간에 따라 Apache Kafka 주제에서 만료로 사용 중지가 발생할 수 있습니다. 영구 보존이 있는 주제가 있는 경우 AWS Glue Schema Registry로 계속 마이그레이션할 수 있지만 서드 파티 Schema Registry를 사용 중지할 수는 없습니다. 해결 방법으로 애플리케이션 또는 Mirror Maker 2를 사용하여 현재 주제에서 읽고 AWS Glue Schema Registry를 사용하여 새 주제를 생성할 수 있습니다.

서드 파티 Schema Registry에서 AWS Glue Schema Registry로 마이그레이션하려면

1. AWS Glue Schema Registry에서 레지스트리를 생성하거나 기본 레지스트리를 사용합니다.

1. 소비자를 중지합니다. AWS Glue Schema Registry를 기본 deserializer로 포함하고 서드 파티 Schema Registry를 보조로 포함하도록 수정합니다.
   + 소비자 속성을 설정합니다. 이 예에서 secondary\$1deserializer는 다른 deserializer로 설정됩니다. 동작은 다음과 같습니다. 소비자는 Amazon MSK에서 레코드를 검색하고 먼저 `AWSKafkaAvroDeserializer`를 사용하려고 시도합니다. AWS Glue Schema Registry 스키마에 대한 Avro 스키마 ID가 포함된 매직 바이트를 읽을 수 없는 경우 `AWSKafkaAvroDeserializer`는 secondary\$1deserializer에서 제공되는 deserializer 클래스를 사용하려고 시도합니다. 보조 deserializer와 관련된 속성도 아래와 같이 schema\$1registry\$1url\$1config 및 specific\$1avro\$1reader\$1config와 같은 소비자 속성에 제공해야 합니다.

     ```
     consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion);
     consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName());
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry");
     consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
     ```

1. 소비자를 다시 시작합니다.

1. 생산자를 중지하고 생산자가 AWS Glue Schema Registry를 가리키도록 합니다.

   1. 생산자 속성을 설정합니다. 이 예에서 생산자는 기본 레지스트리 및 자동 등록 스키마 버전을 사용합니다.

      ```
      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
      producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
      producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
      ```

1. (선택 사항) 기존 스키마 및 스키마 버전을 현재 서드 파티 Schema Registry에서 AWS Glue Schema Registry로, AWS Glue Schema Registry의 기본 레지스트리 또는 AWS Glue Schema Registry의 기본이 아닌 특정 레지스트리로 수동으로 이동합니다. 이는 JSON 포맷의 서드 파티 Schema Registry에서 스키마를 내보내고 AWS Management Console 또는 AWS CLI을 사용하여 AWS Glue Schema Registry에서 새 스키마를 생성하여 수행할 수 있습니다.

    이 단계는 AWS CLI 및 AWS Management Console을 사용하여 새로 생성된 스키마 버전에 대해 이전 스키마 버전과의 호환성 검사를 사용해야 하거나 생산자가 스키마 버전 자동 등록이 설정된 상태로 새 스키마로 메시지를 보낼 때 중요할 수 있습니다.

1. 생산자를 시작합니다.