기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Managed Service for Apache Flink에서 싱크를 사용하여 데이터 쓰기
애플리케이션 코드에서는 Kinesis Data Streams 및 DynamoDB 같은 AWS 서비스를 포함한 외부 시스템에 데이터를 쓰기 위해 어떤 Apache Flink 싱크
Apache Flink는 파일과 소켓용 싱크를 제공하며 사용자 지정 싱크를 구현할 수도 있습니다. 지원되는 여러 싱크 중 다음 싱크가 자주 사용됩니다.
Kinesis 데이터 스트림 사용
Apache Flink는 Apache Flink 설명서에서 Kinesis Data Streams 커넥터
입력 및 출력에 Kinesis 데이터 스트림을 사용하는 애플리케이션의 예는 자습서: Managed Service for Apache Flink에서 DataStream API 사용 시작하기 섹션을 참조하세요.
Apache Kafka 및 Amazon Managed Streaming for Apache Kafka(MSK) 사용
Apache Flink Kafka 커넥터
Amazon S3 사용
Amazon S3 버킷에 객체를 쓰는 데 Apache Flink StreamingFileSink를 사용할 수 있습니다.
S3에 객체를 쓰는 방법에 대한 예는 예: Amazon S3 버킷에 쓰기 섹션을 참조하세요.
Firehose 사용
FlinkKinesisFirehoseProducer는 Firehose 서비스를 사용하여 애플리케이션 출력을 저장하기 위한 안정적이고 확장 가능한 Apache Flink 싱크입니다. 이 섹션에서는 Maven 프로젝트를 설정하여 FlinkKinesisFirehoseProducer를 생성하고 사용하는 방법을 설명합니다.
FlinkKinesisFirehoseProducer 생성
다음 코드 예는 FlinkKinesisFirehoseProducer를 생성하는 방법을 보여 줍니다.
Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
FlinkKinesisFirehoseProducer 코드 예
다음 코드 예제는 FlinkKinesisFirehoseProducer를 생성 및 구성하고 Apache Flink 데이터 스트림에서 Firehose 서비스로 데이터를 전송하는 방법을 보여줍니다.
package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }
Firehose 싱크를 사용하는 방법에 대한 전체 자습서는 예제: Firehose에 쓰기 섹션을 참조하세요.