

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Oracle 외부 테이블을 Amazon Aurora PostgreSQL 호환으로 마이그레이션
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible"></a>

*anuradha chintha, Rakesh Raghav, Amazon Web Services*

## 요약
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-summary"></a>

외부 테이블을 통해 Oracle은 데이터베이스 외부에 플랫 파일로 저장된 데이터를 쿼리할 수 있습니다. ORACLE\$1LOADER 드라이버를 사용하면 SQL\$1Loader 유틸리티로 로드할 수 있는 모든 형식으로 저장된 모든 데이터에 액세스할 수 있습니다. 외부 테이블에서는 데이터 조작 언어(DML)를 사용할 수 없지만 쿼리, 조인 및 정렬 작업에는 외부 테이블을 사용할 수 있습니다.

Amazon Aurora PostgreSQL 호환 버전은 Oracle의 외부 테이블과 유사한 기능을 제공하지 않습니다. 대신 현대화를 통해 기능 요구 사항을 충족하고 경제적인 확장 가능한 솔루션을 개발해야 합니다.

이 패턴은 `aws_s3` 확장 프로그램을 사용하여 다양한 유형의 Oracle 외부 테이블을 Amazon Web Services(AWS) 클라우드의 Aurora PostgreSQL 호환 버전으로 마이그레이션하는 단계를 제공합니다.

프로덕션 환경에 구현하기 전에 이 솔루션을 철저하게 테스트하는 것이 좋습니다.

## 사전 조건 및 제한 사항
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-prereqs"></a>

**사전 조건 **
+ 활성 상태의 AWS 계정
+ AWS Command Line Interface(AWS CLI)
+ 사용 가능한 Aurora PostgreSQL 호환 데이터베이스 인스턴스입니다.
+ 외부 테이블이 있는 온프레미스 Oracle 데이터베이스
+ pg.Client API
+ 데이터 파일 

**제한 사항 **
+ 이 패턴은 Oracle 외부 테이블을 대체하는 기능을 제공하지 않습니다. 하지만 데이터베이스 현대화 목표를 달성하기 위해 단계와 샘플 코드를 더욱 개선할 수 있습니다.
+ `aws_s3` 내보내기 및 가져오기 함수에 구분자로 전달되는 문자를 파일에 포함하지 않습니다.

**제품 버전**
+ Amazon S3에서 PostgreSQL용 RDS로 가져오려면 데이터베이스에서 PostgreSQL 버전 10.7 이상을 실행 중이어야 합니다.

## 아키텍처
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-architecture"></a>

**소스 기술 스택  **
+ Oracle

**소스 아키텍처**

![\[온프레미스 Oracle 데이터베이스의 디렉터리 및 테이블로 이동하는 데이터 파일의 다이어그램입니다.\]](http://docs.aws.amazon.com/ko_kr/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/3fbc507d-b0fa-4e05-b999-043dc7327ed7.png)


**대상 기술 스택  **
+ Amazon Aurora PostgreSQL 호환
+ Amazon CloudWatch()
+ AWS Lambda
+ AWS Secrets Manager
+ Amazon Simple Notification Service(SNS)
+ Amazon Simple Storage Service(Amazon S3)

**대상 아키텍처**

다음은 솔루션의 고급 표현을 보이는 다이어그램입니다.

![\[다이어그램 다음에 설명이 있습니다.\]](http://docs.aws.amazon.com/ko_kr/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/5421540e-d2e3-4361-89cc-d8415fcb21fd.png)


1. S3 버킷에 파일이 업로드됩니다.

1. Lambda 함수가 시작됩니다.

1. Lambda 함수는 DB 함수 직접 호출을 시작합니다.

1. Secrets Manager는 데이터베이스 액세스를 위한 보안 인증 정보를 제공합니다.

1. DB 함수에 따라 SNS 경보가 생성됩니다.

**자동화 및 규모 조정**

외부 테이블의 추가 또는 변경은 메타데이터 유지 관리를 통해 처리할 수 있습니다.

## 도구
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-tools"></a>
+ [Amazon Aurora PostgreSQL 호환](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_AuroraOverview.html)–Amazon Aurora PostgreSQL 호환 버전은 완전 관리형이며 PostgreSQL과 호환되고 ACID를 준수하는 관계형 데이터베이스 엔진으로 하이엔드 상용 데이터베이스의 속도와 신뢰성에 오픈 소스 데이터베이스의 비용 효율성을 결합합니다.
+ [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-welcome.html)–AWS Command Line Interface(AWS CLI)는 AWS 서비스를 관리하는 통합 도구입니다. 도구 하나만 다운로드하여 구성하면 여러 AWS 서비스를 명령줄에서 관리하고 스크립트를 통해 자동화할 수 있습니다.
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html)–Amazon CloudWatch는 Amazon S3 리소스 및 사용률을 모니터링합니다.
+ [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html) – AWS Lambda는 서버를 프로비저닝하거나 관리하지 않고도 코드를 실행하고, 워크로드를 인식하는 클러스터 확장 로직을 생성하고, 이벤트 통합을 유지 관리하거나 런타임을 관리할 수 있도록 지원하는 서버리스 컴퓨팅 서비스입니다. 이 패턴에서 Lambda는 파일이 Amazon S3에 업로드될 때마다 데이터베이스 함수를 실행합니다.
+ [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html)-AWS Secrets Manager는 보안 인증 정보 저장 및 검색을 위한 서비스입니다. Secrets Manager를 사용하면 보안 암호를 프로그래밍 방식으로 검색하도록 Secrets Manager에 API 직접 호출을 보내서 암호를 포함한 하드 코딩된 보안 인증 정보를 바꿀 수 있습니다.
+ [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html)–Amazon Simple Storage Service(S3)는 Aurora PostgreSQL 호환 클러스터에서 사용하고 송수신할 파일을 수신하고 저장하는 스토리지 계층을 제공합니다.
+ [aws\$1s3](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Procedural.Importing.html#aws_s3.table_import_from_s3)-`aws_s3` 확장은 Amazon S3와 Aurora PostgreSQL 호환을 통합합니다.
+ [Amazon SNS](https://docs.aws.amazon.com/sns/latest/dg/welcome.html)-Amazon Simple Notification Service(SNS)는 게시자와 클라이언트 간에 메시지를 전달 또는 전송하는 것을 조정하고 관리합니다. 이 패턴에서는 Amazon SNS를 사용하여 알림을 전송합니다.

**코드**

파일이 S3 버킷에 배치될 때마다 처리 애플리케이션 또는 Lambda 함수에서 DB 함수를 생성하고 직접 호출해야 합니다. 자세한 내용은 코드(첨부)를 참조하세요.

## 에픽
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-epics"></a>

### 외부 파일 생성
<a name="create-an-external-file"></a>


| 작업 | 설명 | 필요한 기술 | 
| --- | --- | --- | 
| 외부 파일을 소스 데이터베이스에 추가합니다. | 외부 파일을 생성하여 `oracle` 디렉터리로 이동합니다. | DBA | 

### 대성 구성 (Aurora PostgreSQL 호환)
<a name="configure-the-target-aurora-postgresql-compatible"></a>


| 작업 | 설명 | 필요한 기술 | 
| --- | --- | --- | 
| Aurora PostgreSQL 데이터베이스를 생성합니다. | Amazon Aurora PostgreSQL 호환 클러스터에서 DB 인스턴스를 생성합니다. | DBA | 
| 스키마, aws\$1s3 확장 프로그램, 테이블을 생성합니다. | *추가 정보* 섹션의 `ext_tbl_scripts`에 있는 코드를 사용합니다. 테이블은 실제 테이블, 스테이징 테이블, 오류 및 로그 테이블, 메타테이블을 포함합니다. | DBA, 개발자 | 
| DB 함수를 생성합니다. | DB 함수를 생성하려면 *추가 정보* 섹션의 `load_external_table_latest` 함수 아래에 있는 코드를 사용합니다. | DBA, 개발자 | 

### Lambda 함수 생성 및 구성
<a name="create-and-configure-the-lambda-function"></a>


| 작업 | 설명 | 필요한 기술 | 
| --- | --- | --- | 
| 역할을 생성합니다. | Amazon S3 및 Amazon Relational Database Service(Amazon RDS)에 액세스할 수 있는 권한이 있는 역할을 생성합니다. 이 역할은 패턴 실행을 위해 Lambda에 할당됩니다. | DBA | 
| Lambda 함수를 생성합니다. | Amazon S3에서 파일 이름(예: `file_key = info.get('object', {}).get('key')`)을 읽고 파일 이름을 입력 파라미터로 사용하여 DB 함수(예: `curs.callproc("load_external_tables", [file_key])`)를 직접 호출하는 Lambda 함수를 생성합니다.함수 호출 결과에 따라 SNS 알림이 시작됩니다 (예: `client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')`).비즈니스 요구 사항에 따라 필요한 경우 추가 코드를 사용하여 Lambda 함수를 생성할 수 있습니다. 자세한 내용은 [Lambda 설명서](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html).를 참조하세요. | DBA | 
| S3 버킷 이벤트 트리거를 구성합니다. | S3 버킷의 모든 객체 생성 이벤트에 대해 Lambda 함수를 호출하는 메커니즘을 구성합니다. | DBA | 
| 보안 암호를 생성합니다. | Secrets Manager를 사용하여 데이터베이스 보안 인증 정보의 보안 암호 이름을 생성합니다. Lambda 함수로 시크릿을 전달합니다. | DBA | 
| Lambda 지원 파일을 업로드합니다. | Lambda 지원 패키지와 Aurora PostgreSQL 호환 버전에 연결하기 위해 첨부된 Python 스크립트가 포함된.zip 파일을 업로드합니다. Python 코드는 데이터베이스에서 만든 함수를 호출합니다. | DBA | 
| SNS 주제를 생성합니다. | SNS 주제를 만들어 데이터 로드의 성공 또는 실패에 대한 메일을 보냅니다. | DBA | 

### Amazon S3와 통합 추가
<a name="add-integration-with-amazon-s3"></a>


| 작업 | 설명 | 필요한 기술 | 
| --- | --- | --- | 
| S3 버킷을 생성합니다. | Amazon S3 콘솔에서 선행 슬래시가 없는 고유한 이름의 S3 버킷을 생성합니다. S3 버킷 이름은 전역 수준에서 고유하며, 네임스페이스는 모든 AWS 계정이 공유합니다. | DBA | 
| IAM 정책을 생성합니다. | AWS Identity and Access Management(IAM) 정책을 생성하려면 *추가 정보* 섹션의 `s3bucketpolicy_for_import`에 있는 코드를 사용합니다. | DBA | 
| 역할을 생성합니다. | Aurora PostgreSQL 호환 버전을 위한 두 개의 역할을 가져오기에 대한 역할 하나와 내보내기에 대한 역할 하나로 생성합니다. 역할에 해당하는 정책을 할당합니다. | DBA | 
| Aurora PostgreSQL 호환 클러스터에 역할을 연결합니다. | **역할 관리**에서 Aurora PostgreSQL 클러스터에 역할 가져오기 및 내보내기를 연결합니다. | DBA | 
| Aurora PostgreSQL 호환 버전에 대한 지원 객체를 생성합니다. | 테이블 스크립트의 경우 *추가 정보* 섹션의 `ext_tbl_scripts`에 있는 코드를 사용합니다.사용자 지정 함수의 경우 *추가 정보* 섹션의 `load_external_Table_latest`에 있는 코드를 사용합니다. | DBA | 

### 테스트 파일 처리
<a name="process-a-test-file"></a>


| 작업 | 설명 | 필요한 기술 | 
| --- | --- | --- | 
| S3 버킷에 파일을 업로드합니다. | 테스트 파일을 S3 버킷에 업로드하려면 AWS CLI에서 콘솔이나 다음 명령을 사용합니다. <pre>aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps</pre>파일이 업로드되는 즉시 버킷 이벤트가 Lambda 함수를 시작합니다. 이 함수는 Aurora PostgreSQL 호환 함수를 실행합니다. | DBA | 
| 데이터와 로그 및 오류 파일을 확인합니다. | Aurora PostgreSQL 호환 함수는 파일을 메인 테이블에 로드하고 S3 버킷에 `.log` 및 `.bad` 파일을 생성합니다. | DBA | 
| 솔루션을 모니터링합니다. | Amazon CloudWatch 콘솔에서 Lambda 함수를 모니터링합니다. | DBA | 

## 관련 리소스
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-resources"></a>
+ [Amazon S3 통합](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/oracle-s3-integration.html)
+ [Amazon S3](https://aws.amazon.com/s3/)
+ [Amazon Aurora PostgreSQL 호환 버전 사용](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.AuroraPostgreSQL.html)
+ [Lambda](https://aws.amazon.com/lambda/)
+ [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/)()
+ [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/)
+ [SNS 알림 설정](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/US_SetupSNS.html)

## 추가 정보
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-additional"></a>

**ext\$1table\$1scripts**

```
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
    table_name_stg character varying(100) ,
    table_name character varying(100)  ,
    col_list character varying(1000)  ,
    data_type character varying(100)  ,
    col_order numeric,
    start_pos numeric,
    end_pos numeric,
    no_position character varying(100)  ,
    date_mask character varying(100)  ,
    delimeter character(1)  ,
    directory character varying(100)  ,
    file_name character varying(100)  ,
    header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
    col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
    error_details text,
    file_name character varying(100),
    processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
    file_name character varying(50) COLLATE pg_catalog."default",
    processed_date timestamp without time zone,
    tot_rec_count numeric,
    proc_rec_count numeric,
    error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
```

**s3bucketpolicy\$1for import**

```
---Import role policy
--Create an IAM policy to allow, Get,  and list actions on S3 bucket
 {
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3import",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest",
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
--Export Role policy
--Create an IAM policy to allow, put,  and list actions on S3 bucket
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3export",
            "Action": [
                "S3:PutObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
```

**샘플 DB 함수 load\$1external\$1tables\$1latest**

```
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
 RETURNS character varying
 LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
 v_final_sql TEXT;
 pi_ext_table TEXT;
 r refCURSOR;
 v_sqlerrm text;
 v_chunk numeric;
 i integer;
 v_col_list TEXT;
 v_postion_list CHARACTER VARYING(1000);
 v_len  integer;
 v_delim varchar;
 v_file_name CHARACTER VARYING(1000);
 v_directory CHARACTER VARYING(1000);
 v_table_name_stg CHARACTER VARYING(1000);
 v_sql_col TEXT;
 v_sql TEXT;
 v_sql1 TEXT;
 v_sql2 TEXT;
 v_sql3 TEXT;
 v_cnt integer;
 v_sql_dynamic TEXT;
 v_sql_ins TEXT;
 proc_rec_COUNT integer;
 error_rec_COUNT integer;
 tot_rec_COUNT integer;
 v_rec_val integer;
 rec record;
 v_col_cnt integer;
 kv record;
 v_val text;
 v_header text;
 j integer;
 ERCODE VARCHAR(5);
 v_region text;
 cr CURSOR FOR
 SELECT distinct DELIMETER,
   FILE_NAME,
   DIRECTORY
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
   AND DELIMETER IS NOT NULL;


 cr1 CURSOR FOR
   SELECT   col_list,
   data_type,
   start_pos,
   END_pos,
   concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
   no_position,date_mask
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
 order by col_order asc;
cr2 cursor FOR
SELECT  distinct table_name,table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);


BEGIN
 -- PERFORM utl_file_utility.init();
   v_region := 'us-east-1';
   /* find tab details from file name */


   --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;
  -- DELETE FROM  log_table WHERE file_name= pi_filename;


 BEGIN


   SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);
 EXCEPTION
   WHEN NO_DATA_FOUND THEN
    raise notice 'error 1,%',sqlerrm;
    pi_ext_table := null;
    v_table_name_stg := null;
      RAISE USING errcode = 'NTFIP' ;
    when others then
        raise notice 'error others,%',sqlerrm;
 END;
 j :=1 ;
  
for rec in  cr2
 LOOP




  pi_ext_table     := rec.table_name;
  v_table_name_stg := rec.table_name_stg;
  v_col_list := null;


 IF pi_ext_table IS NOT NULL
  THEN
    --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;
   EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;




       SELECT distinct DELIMETER INTO STRICT v_delim
       FROM  meta_EXTERNAL_TABLE
       WHERE table_name = pi_ext_table;


       IF v_delim IS NOT NULL THEN
     SELECT distinct DELIMETER,
       FILE_NAME,
       DIRECTORY ,
       concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
     INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table
       AND DELIMETER IS NOT NULL;


     IF    upper(v_delim) = 'CSV'
     THEN
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
       v_table_name_stg,''','''',
       ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
       v_directory,''',''',v_file_name,''', ''',v_region,'''))');
       ELSE
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
           v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
          aws_commons.create_s3_uri
           ( ''',v_directory, ''',''',
           v_file_name, ''',',
            '''',v_region,''')
          )');
          raise notice 'v_sql , %',v_sql;
       begin
        EXECUTE  v_sql;
       EXCEPTION
         WHEN OTHERS THEN
           raise notice 'error 1';
         RAISE USING errcode = 'S3IMP' ;
       END;


       select count(col_list) INTO v_col_cnt
       from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;






        -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');


       execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');




       i :=1;
       FOR rec in cr1
       loop
       v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
       v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
   --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                  coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


       i :=i+1;
       end loop;


         -- raise notice 'v_sql 3, %',v_sql3;


       SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
       SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;


       SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
       SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;


       SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
       SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;


       END IF;
      raise notice 'v_delim , %',v_delim;


     EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;


    raise notice 'stg cnt , %',v_cnt;


    /* if upper(v_delim) = 'CSV' then
       v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
     else
      -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       END IF;*/


v_chunk := v_cnt/100;




for i in 1..101
loop
     BEGIN
    -- raise notice 'v_sql , %',v_sql;
       -- raise notice 'Chunk number , %',i;
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');


     v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
     -- raise notice 'select statement , %',v_sql_ins;
          -- v_sql := null;
     -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
     --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );


     -- raise notice 'insert statement , %',v_sql;


    raise NOTICE 'CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'CHUNK END %',v_chunk;


     EXECUTE v_sql;


  EXCEPTION
       WHEN OTHERS THEN
       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');
         -- raise notice 'Chunk number for cursor , %',i;


    raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'Cursor -  CHUNK END %',v_chunk;
         v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');


         v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
        -- raise notice 'v_final_sql %',v_final_sql;
         v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';
           begin






           open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
           loop
           begin
           fetch r into v_rec;
           EXIT WHEN NOT FOUND;




           v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');
            execute v_sql;


           exception
            when others then
          v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';


               execute v_sql;
             continue;
           end ;
           end loop;
           close r;
           exception
           when others then
         raise;
           end ; $a$');
      -- raise notice ' inside excp v_sql %',v_sql;
          execute v_sql;
      --  raise notice 'v_sql %',v_sql;
       END;
  END LOOP;
     ELSE


     SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
       INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table                  ;
     v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
       v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
      aws_commons.create_s3_uri
       ( ''',v_directory, ''',''',
       v_file_name, ''',',
        '''',v_region,''')
      )');
         EXECUTE  v_sql;


     FOR rec in cr1
     LOOP


      IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
      THEN
        v_rec_val := 1;
      ELSE


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


      END IF;
      v_col_list := concat_ws('',v_col_list ,v_sql1);
     END LOOP;




           SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
           SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;


           v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');




           v_sql_dynamic := v_sql_col;


           EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;




         IF v_rec_val = 1 THEN
             v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;


         ELSE
               v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
           END IF;


     BEGIN
       EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
           EXCEPTION
              WHEN OTHERS THEN
          IF v_rec_val = 1 THEN
                  v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
                ELSE
                 v_final_sql := ' SELECT col1 from';
               END IF;
       v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';
             begin
             open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
             loop
             begin
             if   v_rec_val = 1 then
             fetch r into line_number,col1;
             else
             fetch r into col1;
             end if;


             EXIT WHEN NOT FOUND;
              if v_rec_val = 1 then
              select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              else
                select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              end if;


             insert into  ',pi_ext_table,' select v_rec.*;
              exception
              when others then
               INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
               continue;
              end ;
               end loop;
             close r;
              exception
              when others then
              raise;
              end ; $a$');
         execute v_sql;


     END;


         END IF;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;


   INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);


   raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );


raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );




   END IF;
 j := j+1;
 END LOOP;


       RETURN 'OK';
EXCEPTION
    WHEN  OTHERS THEN
  raise notice 'error %',sqlerrm;
   ERCODE=SQLSTATE;
   IF ERCODE = 'NTFIP' THEN
     v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
   ELSIF ERCODE = 'S3IMP' THEN
    v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
   ELSE
      v_sqlerrm := sqlerrm;
   END IF;


 select distinct directory into v_directory from  meta_EXTERNAL_TABLE;




 raise notice 'exc v_directory, %',v_directory;


   raise notice 'exc pi_filename, %',pi_filename;


   raise notice 'exc v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );
    RETURN null;
END;
$function$
```