

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Oracle 外部テーブルを Amazon Aurora PostgreSQL 互換に移行
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible"></a>

*Amazon Web Services、anuradha chintha、Rakesh Raghav*

## 概要
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-summary"></a>

外部テーブルにより、Oracle がデータベースの外部にあるフラットファイルに保存されているデータをクエリできます。ORACLE\$1LOADER ドライバーを使用して、SQL\$1Loader ユーティリティでロードできるあらゆる形式で保存されているデータにアクセスできます。外部テーブルではデータ操作言語 (DML) を使用できませんが、クエリ、結合、ソート操作には外部テーブルを使用できます。

Amazon Aurora PostgreSQL 互換エディションには、Oracle の外部テーブルと類似する機能はありません。代わりに、モダナイゼーションを使用して、機能要件を満たし、かつ質素なスケーラブルなソリューションを開発する必要があります。

このパターンでは、`aws_s3` 拡張を使用して、さまざまなタイプの Oracle 外部テーブルをAmazon Web Services (AWS) クラウド上の Aurora PostgreSQL 互換エディションに移行する手順を示しています。

実稼働環境に実装する前に、このソリューションを徹底的にテストすることを推奨します。

## 前提条件と制限事項
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-prereqs"></a>

**前提条件**
+ アクティブなAWS アカウント
+ AWS コマンドラインインターフェイス (AWS CLI)
+ 使用可能な Aurora PostgreSQL 互換データベースインスタンス。
+ 外部テーブルのあるオンプレミスの Oracle データベース
+ PG. クライアント API
+ データファイル 

**機能制限 **
+ このパターンには Oracle 外部テーブルの置き換えの機能がありません。ただし、手順とサンプルコードをさらに拡張して、データベースのモダナイゼーション目標を達成することができます。
+ ファイルには、`aws_s3` エクスポート関数とインポート関数で区切り文字として渡される文字を含んではなりません。

**製品バージョン**
+ Amazon S3 から PostgreSQL の RDS にインポートするには、データベースで PostgreSQL バージョン 10.7 以降を実行する必要があります。

## アーキテクチャ
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-architecture"></a>

**ソーステクノロジースタック**
+ Oracle

**ソースアーキテクチャ**

![\[オンプレミスの Oracle データベースのディレクトリとテーブルに送られるデータファイルの図表。\]](http://docs.aws.amazon.com/ja_jp/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/3fbc507d-b0fa-4e05-b999-043dc7327ed7.png)


**ターゲットテクノロジースタック**
+ Amazon Aurora PostgreSQL- 互換
+ Amazon CloudWatch
+ AWS Lambda
+ AWS Secrets Manager
+ Amazon Simple Notiﬁcation Service (Amazon SNS)
+ Amazon Simple Storage Service (Amazon S3)

**ターゲットアーキテクチャ **

以下の図表は、このソリューションの概要を示しています。

![\[図表の後に説明があります。\]](http://docs.aws.amazon.com/ja_jp/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/5421540e-d2e3-4361-89cc-d8415fcb21fd.png)


1. ファイルが S3 バケットにアップロードされます。

1. Lambda 関数が初期化されます。

1. Lambda 関数は DB 関数呼び出しを開始します。

1. Secrets Manager は、データベースにアクセスするための認証情報を提供します。

1. DB 関数によって、SNS アラームが作成されます。

**自動化とスケール**

外部テーブルへの追加や変更は、メタデータのメンテナンスで処理できます。

## ツール
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-tools"></a>
+ 「[Amazon Aurora PostgreSQL-互換](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_AuroraOverview.html)」 — Amazon Aurora PostgreSQL 互換エディションは、フルマネージド型で PostgreSQL 互換で、ACID 準拠のリレーショナルデータベースエンジンです。ハイエンドの商用データベースのスピードと信頼性を、オープンソースデータベースの高いコスト効率を経験できます。
+ 「[AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-welcome.html)」 — AWS コマンドラインインターフェイス (AWS CLI)は、AWS のサービスを管理するための統合ツールです。ダウンロードおよび設定用の１つのツールのみを使用して、コマンドラインから複数の AWS サービスを制御し、スクリプトを使用してこれらを自動化することができます。
+ 「[Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html)」 — Amazon CloudWatch は Amazon S3 のリソースと利用状況を監視します。
+ 「[AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html)」 — AWS Lambdaは、サーバーのプロビジョニングや管理、ワークロードに対応したクラスタースケーリングロジックの作成、イベント統合の維持、あるいはランタイムの管理などを行うことなくコードを実行でき、サーバーレスコンピューティングサービスです。このパターンでは、ファイルが Amazon S3 にアップロードされるたびに、Lambda がデータベース関数を実行します。
+ 「[AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html)」 — AWS Secrets Manager は、認証情報を保存および取得するためのサービスです。Secrets Manager を使用して、コードにハードコードされた認証情報 (パスワードを含む) を Secrets Manager への API コールに置き換えて、シークレットをプログラムで取得できます。
+ 「[Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html)」 — Amazon Simple Storage Service (Amazon S3) は、Aurora PostgreSQL 互換クラスターとの間で消費および送信するファイルを受信および保存するためのストレージレイヤーを提供します。
+ 「[aws\$1s3](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Procedural.Importing.html#aws_s3.table_import_from_s3)」 — `aws_s3` の拡張は Amazon S3 と Aurora PostgreSQL 互換を統合します。
+ 「[Amazon SNS](https://docs.aws.amazon.com/sns/latest/dg/welcome.html)」 — Amazon Simple Notiﬁcation Service (Amazon SNS)は、パブリッシャーやクライアントの間のメッセージ配信や送信を調整および管理します。このパターンでは、Amazon SNS を使用して通知を送信します。

**Code **

ファイルを S3 バケットに配置するたびに、DB 関数を作成して処理アプリケーションまたは Lambda 関数から呼び出す必要があります。詳細については、コード (添付) を参照してください。

## エピック
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-epics"></a>

### 外部ファイルの作成
<a name="create-an-external-file"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| 外部ファイルをソースデータベースに追加します。 | 外部ファイルを作成し、`oracle` ディレクトリに移動します。 | DBA | 

### ターゲットの設定 (Aurora PostgreSQL 互換の) 設定
<a name="configure-the-target-aurora-postgresql-compatible"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| Aurora PostgreSQL データベースを作成します。 | Amazon Aurora PostgreSQL 互換クラスターに DB インスタンスを作成します。 | DBA | 
| スキーマ、aws\$1s3 エクステンション、テーブルを作成します。 | 「*追加情報*」セクションの `ext_tbl_scripts` にあるコードを使用します。テーブルには、実際のテーブル、ステージングテーブル、エラーとログテーブル、およびメタテーブルが含まれます。 | DBA、開発者 | 
| DB関数を作成します。 | DB 関数を作成するには、*追加情報*セクションの `load_external_table_latest` 関数の下のコードを使用します。 | DBA、開発者 | 

### Lambda 関数を作成して設定する
<a name="create-and-configure-the-lambda-function"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| ロールを作成します。 | Amazon S3 と Amazon Relational Database Service　(Amazon RDS) にアクセスする権限を持つロールを作成します。このロールは、パターンを実行するための Lambda に割り当てられます。 | DBA | 
| Lambda 関数の作成 | Amazon S3 (例: `file_key = info.get('object', {}).get('key')`)からファイル名を読み取り、そのファイル名を入力パラメータとして DB 関数 (例、`curs.callproc("load_external_tables", [file_key])`) を呼び出す Lambda 関数を作成します。関数呼び出しの結果に応じて、SNS 通知が開始されます (例、`client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')` )。ビジネスニーズに基づいて、必要に応じて追加のコードを使用して Lambda 関数を作成できます。詳細については、Lambda の「[ドキュメント](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html)」 を参照してください。 | DBA | 
| S3 バケットイベントトリガーを設定します。 | S3 バケットのすべてのオブジェクト作成イベントで Lambda 関数を呼び出すメカニズムを設定します。 | DBA | 
| シークレットを作成します。 | Secrets Manager を使用して、データベース認証情報のシークレット名を作成します。Lambda 関数にシークレットを渡します。 | DBA | 
| Lambda サポートファイルをアップロードします。 | Lambda サポートパッケージを Aurora PostgreSQL 互換に接続するための添付の Python スクリプトを含む.zip ファイルをアップロードします。Python コードは、データベースで作成した関数を呼び出します。 | DBA | 
| SNS トピックを作成します。 | SNS トピックを作成して、データロードの成功または失敗のメールを送信します。 | DBA | 

### Amazon S3 との統合を追加する
<a name="add-integration-with-amazon-s3"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| S3 バケットを作成する。 | Amazon S3 コンソールで、先頭にスラッシュを含まない一意の名前で S3 バケットを作成します。S3 バケット名はグローバルに一意であり、名前空間はすべての AWS アカウントによって共有されます。 | DBA | 
| IAM ポリシーを作成します。 | AWS 識別とアクセス管理(IAM) ポリシーを作成するには、*追加情報*セクションの `s3bucketpolicy_for_import` の配下のコードを使用します。 | DBA | 
| ロールを作成します。 | Aurora PostgreSQL 互換用に 2 つのロールを作成します。1 つはインポート用で、もう 1 つはエクスポート用です。対応するポリシーをロールに割り当てます。 | DBA | 
| ロールを Aurora PostgreSQL 互換クラスターにアタッチします。 | **ロールの管理**で、Aurora PostgreSQL クラスターにインポートロールとエクスポートロールをアタッチします。 | DBA | 
| Aurora PostgreSQL 互換のサポートオブジェクトを作成します。 | テーブルスクリプトについては、*追加情報*セクション `ext_tbl_scripts` の配下にあるコードを使用します。カスタム関数については、*追加情報*セクションの `load_external_Table_latest` の配下にあるコードを使用します。 | DBA | 

### テストファイルを処理
<a name="process-a-test-file"></a>


| タスク | 説明 | 必要なスキル | 
| --- | --- | --- | 
| S3 バケットにファイルをアップロードします。 | テストファイルを S3 バケットにアップロードするには、コンソールまたは AWS CLIにある以下のコマンドを使用します。 <pre>aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps</pre>ファイルがアップロードされるとすぐに、バケットイベントによって Lambda 関数が開始されますが、それによりAurora PostgreSQL 互換関数が実行されます。 | DBA | 
| データ、ログ、エラーファイルを確認してください。 | Aurora PostgreSQL 互換関数はファイルをメインテーブルに読み込み、S3 バケットに `.log` と `.bad` のファイルを作成します。 | DBA | 
| ソリューションを監視します。 | Amazon CloudWatch コンソールで、Lambda 関数をモニタリングします。 | DBA | 

## 関連リソース
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-resources"></a>
+ 「[Amazon S3 統合](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/oracle-s3-integration.html)」
+ 「[Amazon S3](https://aws.amazon.com/s3/)」
+ 「[Amazon Aurora PostgreSQL-互換エディションと連携](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.AuroraPostgreSQL.html)」
+ 「[AWS Lambda](https://aws.amazon.com/lambda/)」
+ 「[Amazon CloudWatch](https://aws.amazon.com/cloudwatch/)」
+ 「[AWS Secrets Manager](https://aws.amazon.com/secrets-manager/)」
+ 「[Amazon SNS 通知のセットアップ](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/US_SetupSNS.html)」

## 追加情報
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-additional"></a>

**ext\$1table\$1scripts**

```
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
    table_name_stg character varying(100) ,
    table_name character varying(100)  ,
    col_list character varying(1000)  ,
    data_type character varying(100)  ,
    col_order numeric,
    start_pos numeric,
    end_pos numeric,
    no_position character varying(100)  ,
    date_mask character varying(100)  ,
    delimeter character(1)  ,
    directory character varying(100)  ,
    file_name character varying(100)  ,
    header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
    col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
    error_details text,
    file_name character varying(100),
    processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
    file_name character varying(50) COLLATE pg_catalog."default",
    processed_date timestamp without time zone,
    tot_rec_count numeric,
    proc_rec_count numeric,
    error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
```

**s3bucketpolicy\$1for import**

```
---Import role policy
--Create an IAM policy to allow, Get,  and list actions on S3 bucket
 {
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3import",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest",
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
--Export Role policy
--Create an IAM policy to allow, put,  and list actions on S3 bucket
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3export",
            "Action": [
                "S3:PutObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
```

**Sample DB function load\$1external\$1tables\$1latest**

```
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
 RETURNS character varying
 LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
 v_final_sql TEXT;
 pi_ext_table TEXT;
 r refCURSOR;
 v_sqlerrm text;
 v_chunk numeric;
 i integer;
 v_col_list TEXT;
 v_postion_list CHARACTER VARYING(1000);
 v_len  integer;
 v_delim varchar;
 v_file_name CHARACTER VARYING(1000);
 v_directory CHARACTER VARYING(1000);
 v_table_name_stg CHARACTER VARYING(1000);
 v_sql_col TEXT;
 v_sql TEXT;
 v_sql1 TEXT;
 v_sql2 TEXT;
 v_sql3 TEXT;
 v_cnt integer;
 v_sql_dynamic TEXT;
 v_sql_ins TEXT;
 proc_rec_COUNT integer;
 error_rec_COUNT integer;
 tot_rec_COUNT integer;
 v_rec_val integer;
 rec record;
 v_col_cnt integer;
 kv record;
 v_val text;
 v_header text;
 j integer;
 ERCODE VARCHAR(5);
 v_region text;
 cr CURSOR FOR
 SELECT distinct DELIMETER,
   FILE_NAME,
   DIRECTORY
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
   AND DELIMETER IS NOT NULL;


 cr1 CURSOR FOR
   SELECT   col_list,
   data_type,
   start_pos,
   END_pos,
   concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
   no_position,date_mask
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
 order by col_order asc;
cr2 cursor FOR
SELECT  distinct table_name,table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);


BEGIN
 -- PERFORM utl_file_utility.init();
   v_region := 'us-east-1';
   /* find tab details from file name */


   --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;
  -- DELETE FROM  log_table WHERE file_name= pi_filename;


 BEGIN


   SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);
 EXCEPTION
   WHEN NO_DATA_FOUND THEN
    raise notice 'error 1,%',sqlerrm;
    pi_ext_table := null;
    v_table_name_stg := null;
      RAISE USING errcode = 'NTFIP' ;
    when others then
        raise notice 'error others,%',sqlerrm;
 END;
 j :=1 ;
  
for rec in  cr2
 LOOP




  pi_ext_table     := rec.table_name;
  v_table_name_stg := rec.table_name_stg;
  v_col_list := null;


 IF pi_ext_table IS NOT NULL
  THEN
    --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;
   EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;




       SELECT distinct DELIMETER INTO STRICT v_delim
       FROM  meta_EXTERNAL_TABLE
       WHERE table_name = pi_ext_table;


       IF v_delim IS NOT NULL THEN
     SELECT distinct DELIMETER,
       FILE_NAME,
       DIRECTORY ,
       concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
     INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table
       AND DELIMETER IS NOT NULL;


     IF    upper(v_delim) = 'CSV'
     THEN
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
       v_table_name_stg,''','''',
       ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
       v_directory,''',''',v_file_name,''', ''',v_region,'''))');
       ELSE
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
           v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
          aws_commons.create_s3_uri
           ( ''',v_directory, ''',''',
           v_file_name, ''',',
            '''',v_region,''')
          )');
          raise notice 'v_sql , %',v_sql;
       begin
        EXECUTE  v_sql;
       EXCEPTION
         WHEN OTHERS THEN
           raise notice 'error 1';
         RAISE USING errcode = 'S3IMP' ;
       END;


       select count(col_list) INTO v_col_cnt
       from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;






        -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');


       execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');




       i :=1;
       FOR rec in cr1
       loop
       v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
       v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
   --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                  coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


       i :=i+1;
       end loop;


         -- raise notice 'v_sql 3, %',v_sql3;


       SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
       SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;


       SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
       SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;


       SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
       SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;


       END IF;
      raise notice 'v_delim , %',v_delim;


     EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;


    raise notice 'stg cnt , %',v_cnt;


    /* if upper(v_delim) = 'CSV' then
       v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
     else
      -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       END IF;*/


v_chunk := v_cnt/100;




for i in 1..101
loop
     BEGIN
    -- raise notice 'v_sql , %',v_sql;
       -- raise notice 'Chunk number , %',i;
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');


     v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
     -- raise notice 'select statement , %',v_sql_ins;
          -- v_sql := null;
     -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
     --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );


     -- raise notice 'insert statement , %',v_sql;


    raise NOTICE 'CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'CHUNK END %',v_chunk;


     EXECUTE v_sql;


  EXCEPTION
       WHEN OTHERS THEN
       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');
         -- raise notice 'Chunk number for cursor , %',i;


    raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'Cursor -  CHUNK END %',v_chunk;
         v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');


         v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
        -- raise notice 'v_final_sql %',v_final_sql;
         v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';
           begin






           open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
           loop
           begin
           fetch r into v_rec;
           EXIT WHEN NOT FOUND;




           v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');
            execute v_sql;


           exception
            when others then
          v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';


               execute v_sql;
             continue;
           end ;
           end loop;
           close r;
           exception
           when others then
         raise;
           end ; $a$');
      -- raise notice ' inside excp v_sql %',v_sql;
          execute v_sql;
      --  raise notice 'v_sql %',v_sql;
       END;
  END LOOP;
     ELSE


     SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
       INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table                  ;
     v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
       v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
      aws_commons.create_s3_uri
       ( ''',v_directory, ''',''',
       v_file_name, ''',',
        '''',v_region,''')
      )');
         EXECUTE  v_sql;


     FOR rec in cr1
     LOOP


      IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
      THEN
        v_rec_val := 1;
      ELSE


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


      END IF;
      v_col_list := concat_ws('',v_col_list ,v_sql1);
     END LOOP;




           SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
           SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;


           v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');




           v_sql_dynamic := v_sql_col;


           EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;




         IF v_rec_val = 1 THEN
             v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;


         ELSE
               v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
           END IF;


     BEGIN
       EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
           EXCEPTION
              WHEN OTHERS THEN
          IF v_rec_val = 1 THEN
                  v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
                ELSE
                 v_final_sql := ' SELECT col1 from';
               END IF;
       v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';
             begin
             open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
             loop
             begin
             if   v_rec_val = 1 then
             fetch r into line_number,col1;
             else
             fetch r into col1;
             end if;


             EXIT WHEN NOT FOUND;
              if v_rec_val = 1 then
              select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              else
                select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              end if;


             insert into  ',pi_ext_table,' select v_rec.*;
              exception
              when others then
               INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
               continue;
              end ;
               end loop;
             close r;
              exception
              when others then
              raise;
              end ; $a$');
         execute v_sql;


     END;


         END IF;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;


   INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);


   raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );


raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );




   END IF;
 j := j+1;
 END LOOP;


       RETURN 'OK';
EXCEPTION
    WHEN  OTHERS THEN
  raise notice 'error %',sqlerrm;
   ERCODE=SQLSTATE;
   IF ERCODE = 'NTFIP' THEN
     v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
   ELSIF ERCODE = 'S3IMP' THEN
    v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
   ELSE
      v_sqlerrm := sqlerrm;
   END IF;


 select distinct directory into v_directory from  meta_EXTERNAL_TABLE;




 raise notice 'exc v_directory, %',v_directory;


   raise notice 'exc pi_filename, %',pi_filename;


   raise notice 'exc v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );
    RETURN null;
END;
$function$
```