

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將 Oracle 外部資料表遷移至 Amazon Aurora PostgreSQL 相容
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible"></a>

*anuradha chintha 和 Rakesh Raghav，Amazon Web Services*

## 總結
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-summary"></a>

外部資料表可讓 Oracle 查詢以一般檔案存放在資料庫外部的資料。您可以使用 ORACLE\$1LOADER 驅動程式來存取以 SQL\$1Loader 公用程式可載入的任何格式儲存的任何資料。您無法在外部資料表上使用資料處理語言 (DML)，但您可以使用外部資料表進行查詢、聯結和排序操作。

Amazon Aurora PostgreSQL 相容版本不提供類似於 Oracle 中外部資料表的功能。反之，您必須使用現代化來開發符合功能需求的可擴展解決方案，而且是無法的。

此模式提供使用 `aws_s3`擴充功能，將不同類型的 Oracle 外部資料表遷移至 Amazon Web Services (AWS) 雲端上的 Aurora PostgreSQL 相容版本的步驟。

我們建議在生產環境中實作此解決方案之前，先徹底測試此解決方案。

## 先決條件和限制
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-prereqs"></a>

**先決條件**
+ 作用中的 AWS 帳戶
+ AWS 命令列界面 (AWS CLI)
+ 可用的 Aurora PostgreSQL 相容資料庫執行個體。
+ 具有外部資料表的現場部署 Oracle 資料庫
+ pg.Client API
+ 資料檔案 

**限制**
+ 此模式不提供可取代 Oracle 外部資料表的功能。不過，您可以進一步增強步驟和範本程式碼，以實現資料庫現代化目標。
+ 檔案不應包含在`aws_s3`匯出和匯入函數中以分隔符號傳遞的字元。

**產品版本**
+ 若要從 Amazon S3 匯入 RDS for PostgreSQL，資料庫必須執行 PostgreSQL 10.7 版或更新版本。

## Architecture
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-architecture"></a>

**來源技術堆疊**
+ Oracle

**來源架構**

![\[前往內部部署 Oracle 資料庫中目錄和資料表的資料檔案圖表。\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/3fbc507d-b0fa-4e05-b999-043dc7327ed7.png)


**目標技術堆疊**
+ Amazon Aurora PostgreSQL 相容
+ Amazon CloudWatch
+ AWS Lambda
+ AWS Secrets Manager
+ Amazon Simple Notification Service (Amazon SNS)
+ Amazon Simple Storage Service (Amazon S3)

**目標架構**

下圖顯示解決方案的高階表示法。

![\[描述在圖表後面。\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/5421540e-d2e3-4361-89cc-d8415fcb21fd.png)


1. 檔案會上傳至 S3 儲存貯體。

1. Lambda 函數已啟動。

1. Lambda 函數會啟動資料庫函數呼叫。

1. Secrets Manager 提供資料庫存取的登入資料。

1. 根據資料庫函數，會建立 SNS 警示。

**自動化和擴展**

外部資料表的任何新增或變更都可以使用中繼資料維護來處理。

## 工具
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-tools"></a>
+ [Amazon Aurora PostgreSQL 相容](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_AuroraOverview.html) – Amazon Aurora PostgreSQL 相容版本是全受管、PostgreSQL 相容且 ACID 相容的關係資料庫引擎，結合了高階商業資料庫的速度和可靠性，以及開放原始碼資料庫的成本效益。
+ [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-welcome.html) – AWS Command Line Interface (AWS CLI) 是管理 AWS 服務的統一工具。只需下載和設定一個工具，您就可以從命令列控制多個 AWS 服務，並透過指令碼將其自動化。
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) – Amazon CloudWatch 會監控 Amazon S3 資源和使用率。
+ [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html) – AWS Lambda 是一種無伺服器運算服務，支援執行程式碼，無需佈建或管理伺服器、建立工作負載感知叢集擴展邏輯、維護事件整合，或管理執行時間。在此模式中，每當檔案上傳至 Amazon S3 時，Lambda 都會執行資料庫函數。
+ [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) – AWS Secrets Manager 是一種用於憑證儲存和擷取的服務。使用 Secrets Manager，您可以使用以程式設計方式呼叫 Secrets Manager 擷取秘密的 API，取代程式碼中的硬式編碼登入資料，包括密碼。
+ [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) – Amazon Simple Storage Service (Amazon S3) 提供儲存層來接收和存放檔案，以供取用和往返 Aurora PostgreSQL 相容叢集傳輸。
+ [aws\$1s3](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Procedural.Importing.html#aws_s3.table_import_from_s3) – `aws_s3`延伸模組整合 Amazon S3 和 Aurora PostgreSQL 相容。
+ [Amazon SNS](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) – Amazon Simple Notification Service (Amazon SNS) 會協調和管理發佈者和用戶端之間的訊息傳遞或傳送。在此模式中，Amazon SNS 用於傳送通知。

**Code**

每當檔案放入 S3 儲存貯體時，都必須從處理應用程式或 Lambda 函數建立和呼叫資料庫函數。如需詳細資訊，請參閱 程式碼 （已連接）。

## 史詩
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-epics"></a>

### 建立外部檔案
<a name="create-an-external-file"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 將外部檔案新增至來源資料庫。 | 建立外部檔案，並將其移至 `oracle`目錄。 | DBA | 

### 設定目標 (Aurora PostgreSQL 相容）
<a name="configure-the-target-aurora-postgresql-compatible"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 建立 Aurora PostgreSQL 資料庫。 | 在 Amazon Aurora PostgreSQL 相容叢集中建立資料庫執行個體。 | DBA | 
| 建立結構描述、aws\$1s3 延伸模組和資料表。 | 在*其他資訊*區段`ext_tbl_scripts`中使用 下的程式碼。資料表包括實際資料表、預備資料表、錯誤和日誌資料表，以及可轉移。 | DBA、開發人員 | 
| 建立 資料庫函數。 | 若要建立資料庫函數，請使用*其他資訊*區段中函數下的`load_external_table_latest`程式碼。 | DBA、開發人員 | 

### 建立並設定 Lambda 函式
<a name="create-and-configure-the-lambda-function"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 建立角色。 | 建立具有存取 Amazon S3 和 Amazon Relational Database Service (Amazon RDS) 許可的角色。此角色將指派給 Lambda 以執行模式。 | DBA | 
| 建立 Lambda 函數。 | 建立從 Amazon S3 讀取檔案名稱的 Lambda 函數 （例如 `file_key = info.get('object', {}).get('key')`)，並使用檔案名稱做為輸入參數來呼叫資料庫函數 （例如 `curs.callproc("load_external_tables", [file_key])`)。根據函數呼叫結果，將會啟動 SNS 通知 （例如 `client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')`)。根據您的業務需求，您可以視需要建立具有額外程式碼的 Lambda 函數。如需詳細資訊，請參閱 [Lambda 文件](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html)。 | DBA | 
| 設定 S3 儲存貯體事件觸發。 | 設定機制，針對 S3 儲存貯體中的所有物件建立事件呼叫 Lambda 函數。 | DBA | 
| 建立秘密。 | 使用 Secrets Manager 建立資料庫登入資料的秘密名稱。在 Lambda 函數中傳遞秘密。 | DBA | 
| 上傳 Lambda 支援檔案。 | 上傳 .zip 檔案，其中包含 Lambda 支援套件和連接的 Python 指令碼，以連線至 Aurora PostgreSQL 相容。Python 程式碼會呼叫您在資料庫中建立的 函數。 | DBA | 
| 建立 SNS 主題。 | 建立 SNS 主題以傳送郵件，確保資料載入成功或失敗。 | DBA | 

### 新增與 Amazon S3 的整合
<a name="add-integration-with-amazon-s3"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 建立 S3 儲存貯體。 | 在 Amazon S3 主控台上，使用不包含正斜線的唯一名稱建立 S3 儲存貯體。S3 儲存貯體名稱全域唯一，且命名空間由所有 AWS 帳戶共用。 | DBA | 
| 建立 IAM 政策。 | 若要建立 AWS Identity and Access Management (IAM) 政策，請使用*其他資訊*區段`s3bucketpolicy_for_import`中 下的程式碼。 | DBA | 
| 建立角色。 | 為 Aurora PostgreSQL 相容建立兩個角色，一個用於匯入的角色，另一個用於匯出的角色。將對應的政策指派給角色。 | DBA | 
| 將角色連接至 Aurora PostgreSQL 相容叢集。 | 在**管理角色**下，將匯入和匯出角色連接至 Aurora PostgreSQL 叢集。 | DBA | 
| 為 Aurora PostgreSQL 相容建立支援物件。 | 對於資料表指令碼，請在*其他資訊*區段`ext_tbl_scripts`中使用 下的程式碼。對於自訂函數，請在*其他資訊*區段`load_external_Table_latest`中使用 下的程式碼。 | DBA | 

### 處理測試檔案
<a name="process-a-test-file"></a>


| 任務 | Description | 所需的技能 | 
| --- | --- | --- | 
| 將檔案上傳至 S3 儲存貯體。 | 若要將測試檔案上傳至 S3 儲存貯體，請使用主控台或在 AWS CLI 中使用下列命令。 <pre>aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps</pre>一旦上傳檔案，儲存貯體事件就會啟動 Lambda 函數，執行 Aurora PostgreSQL 相容函數。 | DBA | 
| 檢查資料以及日誌和錯誤檔案。 | Aurora PostgreSQL 相容函數會將檔案載入主資料表，並在 S3 儲存貯體中建立 `.log`和 `.bad` 檔案。 | DBA | 
| 監控解決方案。 | 在 Amazon CloudWatch 主控台中，監控 Lambda 函數。 | DBA | 

## 相關資源
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-resources"></a>
+ [Amazon S3 整合](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/oracle-s3-integration.html)
+ [Amazon S3](https://aws.amazon.com/s3/)
+ [使用 Amazon Aurora PostgreSQL 相容版本](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.AuroraPostgreSQL.html)
+ [AWS Lambda](https://aws.amazon.com/lambda/)
+ [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/)
+ [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/)
+ [設定 Amazon SNS 通知](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/US_SetupSNS.html)

## 其他資訊
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-additional"></a>

**ext\$1table\$1scripts**

```
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
    table_name_stg character varying(100) ,
    table_name character varying(100)  ,
    col_list character varying(1000)  ,
    data_type character varying(100)  ,
    col_order numeric,
    start_pos numeric,
    end_pos numeric,
    no_position character varying(100)  ,
    date_mask character varying(100)  ,
    delimeter character(1)  ,
    directory character varying(100)  ,
    file_name character varying(100)  ,
    header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
    col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
    error_details text,
    file_name character varying(100),
    processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
    file_name character varying(50) COLLATE pg_catalog."default",
    processed_date timestamp without time zone,
    tot_rec_count numeric,
    proc_rec_count numeric,
    error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
```

**s3bucketpolicy\$1for 匯入**

```
---Import role policy
--Create an IAM policy to allow, Get,  and list actions on S3 bucket
 {
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3import",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest",
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
--Export Role policy
--Create an IAM policy to allow, put,  and list actions on S3 bucket
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3export",
            "Action": [
                "S3:PutObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
```

**資料庫函數 load\$1external\$1tables\$1latest 範例**

```
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
 RETURNS character varying
 LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
 v_final_sql TEXT;
 pi_ext_table TEXT;
 r refCURSOR;
 v_sqlerrm text;
 v_chunk numeric;
 i integer;
 v_col_list TEXT;
 v_postion_list CHARACTER VARYING(1000);
 v_len  integer;
 v_delim varchar;
 v_file_name CHARACTER VARYING(1000);
 v_directory CHARACTER VARYING(1000);
 v_table_name_stg CHARACTER VARYING(1000);
 v_sql_col TEXT;
 v_sql TEXT;
 v_sql1 TEXT;
 v_sql2 TEXT;
 v_sql3 TEXT;
 v_cnt integer;
 v_sql_dynamic TEXT;
 v_sql_ins TEXT;
 proc_rec_COUNT integer;
 error_rec_COUNT integer;
 tot_rec_COUNT integer;
 v_rec_val integer;
 rec record;
 v_col_cnt integer;
 kv record;
 v_val text;
 v_header text;
 j integer;
 ERCODE VARCHAR(5);
 v_region text;
 cr CURSOR FOR
 SELECT distinct DELIMETER,
   FILE_NAME,
   DIRECTORY
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
   AND DELIMETER IS NOT NULL;


 cr1 CURSOR FOR
   SELECT   col_list,
   data_type,
   start_pos,
   END_pos,
   concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
   no_position,date_mask
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
 order by col_order asc;
cr2 cursor FOR
SELECT  distinct table_name,table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);


BEGIN
 -- PERFORM utl_file_utility.init();
   v_region := 'us-east-1';
   /* find tab details from file name */


   --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;
  -- DELETE FROM  log_table WHERE file_name= pi_filename;


 BEGIN


   SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);
 EXCEPTION
   WHEN NO_DATA_FOUND THEN
    raise notice 'error 1,%',sqlerrm;
    pi_ext_table := null;
    v_table_name_stg := null;
      RAISE USING errcode = 'NTFIP' ;
    when others then
        raise notice 'error others,%',sqlerrm;
 END;
 j :=1 ;
  
for rec in  cr2
 LOOP




  pi_ext_table     := rec.table_name;
  v_table_name_stg := rec.table_name_stg;
  v_col_list := null;


 IF pi_ext_table IS NOT NULL
  THEN
    --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;
   EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;




       SELECT distinct DELIMETER INTO STRICT v_delim
       FROM  meta_EXTERNAL_TABLE
       WHERE table_name = pi_ext_table;


       IF v_delim IS NOT NULL THEN
     SELECT distinct DELIMETER,
       FILE_NAME,
       DIRECTORY ,
       concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
     INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table
       AND DELIMETER IS NOT NULL;


     IF    upper(v_delim) = 'CSV'
     THEN
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
       v_table_name_stg,''','''',
       ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
       v_directory,''',''',v_file_name,''', ''',v_region,'''))');
       ELSE
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
           v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
          aws_commons.create_s3_uri
           ( ''',v_directory, ''',''',
           v_file_name, ''',',
            '''',v_region,''')
          )');
          raise notice 'v_sql , %',v_sql;
       begin
        EXECUTE  v_sql;
       EXCEPTION
         WHEN OTHERS THEN
           raise notice 'error 1';
         RAISE USING errcode = 'S3IMP' ;
       END;


       select count(col_list) INTO v_col_cnt
       from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;






        -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');


       execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');




       i :=1;
       FOR rec in cr1
       loop
       v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
       v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
   --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                  coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


       i :=i+1;
       end loop;


         -- raise notice 'v_sql 3, %',v_sql3;


       SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
       SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;


       SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
       SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;


       SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
       SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;


       END IF;
      raise notice 'v_delim , %',v_delim;


     EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;


    raise notice 'stg cnt , %',v_cnt;


    /* if upper(v_delim) = 'CSV' then
       v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
     else
      -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       END IF;*/


v_chunk := v_cnt/100;




for i in 1..101
loop
     BEGIN
    -- raise notice 'v_sql , %',v_sql;
       -- raise notice 'Chunk number , %',i;
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');


     v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
     -- raise notice 'select statement , %',v_sql_ins;
          -- v_sql := null;
     -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
     --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );


     -- raise notice 'insert statement , %',v_sql;


    raise NOTICE 'CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'CHUNK END %',v_chunk;


     EXECUTE v_sql;


  EXCEPTION
       WHEN OTHERS THEN
       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');
         -- raise notice 'Chunk number for cursor , %',i;


    raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'Cursor -  CHUNK END %',v_chunk;
         v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');


         v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
        -- raise notice 'v_final_sql %',v_final_sql;
         v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';
           begin






           open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
           loop
           begin
           fetch r into v_rec;
           EXIT WHEN NOT FOUND;




           v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');
            execute v_sql;


           exception
            when others then
          v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';


               execute v_sql;
             continue;
           end ;
           end loop;
           close r;
           exception
           when others then
         raise;
           end ; $a$');
      -- raise notice ' inside excp v_sql %',v_sql;
          execute v_sql;
      --  raise notice 'v_sql %',v_sql;
       END;
  END LOOP;
     ELSE


     SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
       INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table                  ;
     v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
       v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
      aws_commons.create_s3_uri
       ( ''',v_directory, ''',''',
       v_file_name, ''',',
        '''',v_region,''')
      )');
         EXECUTE  v_sql;


     FOR rec in cr1
     LOOP


      IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
      THEN
        v_rec_val := 1;
      ELSE


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


      END IF;
      v_col_list := concat_ws('',v_col_list ,v_sql1);
     END LOOP;




           SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
           SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;


           v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');




           v_sql_dynamic := v_sql_col;


           EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;




         IF v_rec_val = 1 THEN
             v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;


         ELSE
               v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
           END IF;


     BEGIN
       EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
           EXCEPTION
              WHEN OTHERS THEN
          IF v_rec_val = 1 THEN
                  v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
                ELSE
                 v_final_sql := ' SELECT col1 from';
               END IF;
       v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';
             begin
             open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
             loop
             begin
             if   v_rec_val = 1 then
             fetch r into line_number,col1;
             else
             fetch r into col1;
             end if;


             EXIT WHEN NOT FOUND;
              if v_rec_val = 1 then
              select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              else
                select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              end if;


             insert into  ',pi_ext_table,' select v_rec.*;
              exception
              when others then
               INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
               continue;
              end ;
               end loop;
             close r;
              exception
              when others then
              raise;
              end ; $a$');
         execute v_sql;


     END;


         END IF;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;


   INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);


   raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );


raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );




   END IF;
 j := j+1;
 END LOOP;


       RETURN 'OK';
EXCEPTION
    WHEN  OTHERS THEN
  raise notice 'error %',sqlerrm;
   ERCODE=SQLSTATE;
   IF ERCODE = 'NTFIP' THEN
     v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
   ELSIF ERCODE = 'S3IMP' THEN
    v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
   ELSE
      v_sqlerrm := sqlerrm;
   END IF;


 select distinct directory into v_directory from  meta_EXTERNAL_TABLE;




 raise notice 'exc v_directory, %',v_directory;


   raise notice 'exc pi_filename, %',pi_filename;


   raise notice 'exc v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );
    RETURN null;
END;
$function$
```