

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 Oracle 外部表迁移到 Amazon Aurora PostgreSQL-Compatible
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible"></a>

*anuradha chintha 和 Rakesh Raghav，Amazon Web Services*

## Summary
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-summary"></a>

外部表使 Oracle 能够查询存储在数据库外部的平面文件中的数据。您可以使用 ORACLE\$1LOADER 驱动程序访问以任何格式存储的、可由 SQL\$1Loader 实用程序加载的任何数据。不能对外部表使用数据操作语言 (DML)，但可以使用外部表进行查询、联接和排序操作。

Amazon Aurora PostgreSQL-Compatible 不提供与 Oracle 中外部表相似的功能。相反，您必须使用现代化来开发符合功能要求且节俭的可扩展解决方案。

此模式提供了使用 `aws_s3` 扩展程序将不同类型的 Oracle 外部表迁移到 Amazon Web Services（AWS）Cloud 上 Aurora PostgreSQL-Compatible Edition 的步骤。

建议在生产环境中实施该解决方案之前，对方案进行全面测试。

## 先决条件和限制
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-prereqs"></a>

**先决条件**
+ 一个有效的 Amazon Web Services account
+ AWS 命令行界面（AWS CLI）
+ 可用的 Aurora PostgreSQL-Compatible 数据库实例。
+ 带有外部表的本地 Oracle 数据库
+ pg.Client API
+ 数据文件 

**限制**
+ 这种模式不提供可替代 Oracle 外部表的功能。但是，可以进一步增强步骤和示例代码，以实现数据库现代化目标。
+ 文件不应包含在 `aws_s3` 导出和导入函数中作为分隔符传递的字符。

**产品版本**
+ 要从 Amazon S3 导入到 RDS for PostgreSQL，数据库必须运行 PostgreSQL 版本 10.7 或更高版本。

## 架构
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-architecture"></a>

**源技术堆栈**
+ Oracle

**源架构**

![\[数据文件进入本地 Oracle 数据库中目录和表的示意图。\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/3fbc507d-b0fa-4e05-b999-043dc7327ed7.png)


**目标技术堆栈**
+ 兼容 Amazon Aurora PostgreSQL
+ Amazon CloudWatch
+ AWS Lambda
+ AWS Secrets Manager
+ Amazon Simple Notification Service(Amazon SNS)
+ Amazon Simple Storage Service（Amazon S3）

**目标架构**

下图高度概括此解决方案。

![\[图后面有详细描述。\]](http://docs.aws.amazon.com/zh_cn/prescriptive-guidance/latest/patterns/images/pattern-img/555e69af-36fc-4ff5-b66c-af22b4cf262a/images/5421540e-d2e3-4361-89cc-d8415fcb21fd.png)


1. 文件上传到 S3 存储桶。

1. Lambda 函数已启动。

1. Lambda 函数启动数据库函数调用。

1. Secrets Manager 提供访问数据库的凭证。

1. 根据数据库功能，创建 SNS 警报。

**自动化和扩缩**

对外部表的任何添加或更改都可以通过元数据维护来处理。

## 工具
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-tools"></a>
+ [Amazon Aurora PostgreSQL-Compatible](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_AuroraOverview.html) – Amazon Aurora PostgreSQL-Compatible Edition 是一个完全托管式、兼容 PostgreSQL 和 ACID 的关系数据库引擎，结合了高端商用数据库的速度和可靠性，同时还具有开源数据库的成本效益。
+ [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-welcome.html) – AWS 命令行界面（AWS CLI）是用于管理 Amazon Web Services 的统一工具。只通过一个工具进行下载和配置，您就可以使用命令行控制多个 Amazon Web Services 并利用脚本来自动执行这些服务。
+ [亚马逊 CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/WhatIsCloudWatch.html) — 亚马逊 CloudWatch 监控亚马逊 S3 的资源和利用率。
+ [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html) – AWS Lambda 是一种无服务器计算服务，支持在不预置或管理服务器的情况下运行代码、创建工作负载感知型集群扩展逻辑、维护事件集成或管理运行时。在这种模式下，每当文件上传到 Amazon S3 时，Lambda 都会运行数据库函数。
+ [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) – AWS Secrets Manager 是一项用于凭证存储和检索的服务。使用 Secrets Manager，您可以将代码中的硬编码凭证（包括密码）替换为对 Secrets Manager 的 API 调用，以便以编程方式检索密钥。
+ [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) – Amazon Simple Storage Service（Amazon S3）提供了一个存储层，用于接收和存储文件，以便在 Aurora PostgreSQL-Compatible 集群之间使用和传输这些文件。
+ [aws\$1s3](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Procedural.Importing.html#aws_s3.table_import_from_s3) – 该 `aws_s3` 扩展程序集成了 Amazon S3 和 Aurora PostgreSQL-Compatible。
+ [Amazon SNS](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) – Amazon Simple Notification Service（Amazon SNS）可协调和管理发布者和客户端之间消息的传送或发送。在这种模式中，Amazon SNS 用于发送通知。

**代码**

每当在 S3 存储桶中放置文件时，都必须创建数据库函数并从处理应用程序或 Lambda 函数中调用。有关详细信息，请参阅代码（附件）。

## 操作说明
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-epics"></a>

### 创建外部文件
<a name="create-an-external-file"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 向源数据库添加外部文件。 | 创建外部文件，并将其移至 `oracle` 目录。 | 数据库管理员 | 

### 配置目标 (Aurora PostgreSQL-Compatible)
<a name="configure-the-target-aurora-postgresql-compatible"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 创建 Aurora PostgreSQL 数据库。 | 在您的 Amazon Aurora PostgreSQL-Compatible 集群中创建数据库实例。 | 数据库管理员 | 
| 创建架构、aws\$1s3 扩展程序和表。 | 使用*其他信息*部分 `ext_tbl_scripts` 下方的代码。这些表包括实际表、暂存表、错误和日志表以及元表。 | 数据库管理员、开发人员 | 
| 创建数据库函数。 | 要创建数据库函数，请使用*附加信息*部分 `load_external_table_latest` 函数下的代码。 | 数据库管理员、开发人员 | 

### 创建并配置 Lambda 函数
<a name="create-and-configure-the-lambda-function"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 创建角色。 | 创建有权访问 Amazon S3 和 Amazon Relational Database Service（Amazon RDS）的角色。此角色将分配给 Lambda 以运行该模式。 | 数据库管理员 | 
| 创建 Lambda 函数。 | 创建一个 Lambda 函数，该函数从 Amazon S3 读取文件名（例如 `file_key = info.get('object', {}).get('key')`），并使用文件名作为输入参数调用数据库函数（例如 `curs.callproc("load_external_tables", [file_key])`）。根据函数调用结果，启动 SNS 通知（例如 `client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')`）。根据您的业务需求，可以使用额外代码创建 Lambda 函数。有关更多信息，请参阅 [Lambda 文档](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html)。 | 数据库管理员 | 
| 配置 S3 存储桶事件触发器。 | 配置一种机制，以便为 S3 存储桶中的所有对象创建事件调用 Lambda 函数。 | 数据库管理员 | 
| 创建密钥。 | 通过使用 Secrets Manager 为数据库凭证创建密钥名称。在 Lambda 函数中传递密钥。 | 数据库管理员 | 
| 上传 Lambda 支持文件。 | 上传一个 .zip 文件，其中包含 Lambda 支持包和所附的用于连接到 Aurora PostgreSQL-Compatible 的 Python 脚本。Python 代码调用您在数据库中创建的函数。 | 数据库管理员 | 
| 创建一个 SNS 主题。 | 创建 SNS 主题以发送有关数据加载成功或失败的邮件。 | 数据库管理员 | 

### 添加与 Amazon S3 的集成
<a name="add-integration-with-amazon-s3"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 创建 S3 存储桶。 | 在 Amazon S3 控制台上，创建一个 S3 存储桶。该存储桶名称具有唯一性，且不包含前导斜杠。S3 存储桶名称是全局唯一的，并且命名空间由所有 Amazon Web Services account 共享。 | 数据库管理员 | 
| 创建 IAM policy。 | 要创建 AWS Identity and Access Management (IAM) 策略，请使用*其他信息*部分 `s3bucketpolicy_for_import` 下方的代码。 | 数据库管理员 | 
| 创建角色。 | 为 Aurora PostgreSQL-Compatible 创建两个角色：一个用于导入的角色和一个用于导出的角色。为角色分配相应的策略。 | 数据库管理员 | 
| 将角色附加到 Aurora PostgreSQL-Compatible 集群。 | 在**管理角色**下方，将导入和导出角色附加到 Aurora PostgreSQL 集群。 | 数据库管理员 | 
| 为 Aurora PostgreSQL-Compatible 创建支持对象。 | 对于表格脚本，使用*其他信息*部分 `ext_tbl_scripts` 下方的代码。对于定制脚本，使用*其他信息*部分 `load_external_Table_latest` 下方的代码。 | 数据库管理员 | 

### 处理测试文件
<a name="process-a-test-file"></a>


| Task | 说明 | 所需技能 | 
| --- | --- | --- | 
| 将文件上传到 S3 存储桶。 | 要将测试文件上传到 S3 存储桶，请使用控制台或在 AWS CLI 中使用以下命令。 <pre>aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps</pre>文件上传后，存储桶事件就会启动 Lambda 函数，该函数运行 Aurora PostgreSQL-Compatible 函数。 | 数据库管理员 | 
| 检查数据、日志和错误文件。 | Aurora PostgreSQL-Compatible 函数将文件加载到主表中，然后在 S3 存储桶中创建 `.log` 和 `.bad` 文件。 | 数据库管理员 | 
| 监控解决方案。 | 在亚马逊 CloudWatch 控制台中，监控 Lambda 函数。 | 数据库管理员 | 

## 相关的资源
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-resources"></a>
+ [Amazon S3 集成](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/oracle-s3-integration.html)
+ [Amazon S3](https://aws.amazon.com/s3/)
+ [使用 Amazon Aurora PostgreSQL-Compatible Edition](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.AuroraPostgreSQL.html)
+ [AWS Lambda](https://aws.amazon.com/lambda/)
+ [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/)
+ [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/)
+ [设置 Amazon SNS 通知](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/US_SetupSNS.html)

## 附加信息
<a name="migrate-oracle-external-tables-to-amazon-aurora-postgresql-compatible-additional"></a>

**ext\$1table\$1scripts**

```
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
    table_name_stg character varying(100) ,
    table_name character varying(100)  ,
    col_list character varying(1000)  ,
    data_type character varying(100)  ,
    col_order numeric,
    start_pos numeric,
    end_pos numeric,
    no_position character varying(100)  ,
    date_mask character varying(100)  ,
    delimeter character(1)  ,
    directory character varying(100)  ,
    file_name character varying(100)  ,
    header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
    col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
    error_details text,
    file_name character varying(100),
    processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
    file_name character varying(50) COLLATE pg_catalog."default",
    processed_date timestamp without time zone,
    tot_rec_count numeric,
    proc_rec_count numeric,
    error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
```

**s3bucketpolicy\$1for import**

```
---Import role policy
--Create an IAM policy to allow, Get,  and list actions on S3 bucket
 {
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3import",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest",
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
--Export Role policy
--Create an IAM policy to allow, put,  and list actions on S3 bucket
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "s3export",
            "Action": [
                "S3:PutObject",
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::s3importtest/*"
            ]
        }
    ]
}
```

**示例数据库函数 load\$1external\$1tables\$1latest**

```
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
 RETURNS character varying
 LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
 v_final_sql TEXT;
 pi_ext_table TEXT;
 r refCURSOR;
 v_sqlerrm text;
 v_chunk numeric;
 i integer;
 v_col_list TEXT;
 v_postion_list CHARACTER VARYING(1000);
 v_len  integer;
 v_delim varchar;
 v_file_name CHARACTER VARYING(1000);
 v_directory CHARACTER VARYING(1000);
 v_table_name_stg CHARACTER VARYING(1000);
 v_sql_col TEXT;
 v_sql TEXT;
 v_sql1 TEXT;
 v_sql2 TEXT;
 v_sql3 TEXT;
 v_cnt integer;
 v_sql_dynamic TEXT;
 v_sql_ins TEXT;
 proc_rec_COUNT integer;
 error_rec_COUNT integer;
 tot_rec_COUNT integer;
 v_rec_val integer;
 rec record;
 v_col_cnt integer;
 kv record;
 v_val text;
 v_header text;
 j integer;
 ERCODE VARCHAR(5);
 v_region text;
 cr CURSOR FOR
 SELECT distinct DELIMETER,
   FILE_NAME,
   DIRECTORY
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
   AND DELIMETER IS NOT NULL;


 cr1 CURSOR FOR
   SELECT   col_list,
   data_type,
   start_pos,
   END_pos,
   concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
   no_position,date_mask
 FROM  meta_EXTERNAL_TABLE
 WHERE table_name = pi_ext_table
 order by col_order asc;
cr2 cursor FOR
SELECT  distinct table_name,table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);


BEGIN
 -- PERFORM utl_file_utility.init();
   v_region := 'us-east-1';
   /* find tab details from file name */


   --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;
  -- DELETE FROM  log_table WHERE file_name= pi_filename;


 BEGIN


   SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
   FROM  meta_EXTERNAL_TABLE
   WHERE upper(file_name) = upper(pi_filename);
 EXCEPTION
   WHEN NO_DATA_FOUND THEN
    raise notice 'error 1,%',sqlerrm;
    pi_ext_table := null;
    v_table_name_stg := null;
      RAISE USING errcode = 'NTFIP' ;
    when others then
        raise notice 'error others,%',sqlerrm;
 END;
 j :=1 ;
  
for rec in  cr2
 LOOP




  pi_ext_table     := rec.table_name;
  v_table_name_stg := rec.table_name_stg;
  v_col_list := null;


 IF pi_ext_table IS NOT NULL
  THEN
    --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;
   EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;




       SELECT distinct DELIMETER INTO STRICT v_delim
       FROM  meta_EXTERNAL_TABLE
       WHERE table_name = pi_ext_table;


       IF v_delim IS NOT NULL THEN
     SELECT distinct DELIMETER,
       FILE_NAME,
       DIRECTORY ,
       concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
     INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table
       AND DELIMETER IS NOT NULL;


     IF    upper(v_delim) = 'CSV'
     THEN
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
       v_table_name_stg,''','''',
       ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
       v_directory,''',''',v_file_name,''', ''',v_region,'''))');
       ELSE
       v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
           v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
          aws_commons.create_s3_uri
           ( ''',v_directory, ''',''',
           v_file_name, ''',',
            '''',v_region,''')
          )');
          raise notice 'v_sql , %',v_sql;
       begin
        EXECUTE  v_sql;
       EXCEPTION
         WHEN OTHERS THEN
           raise notice 'error 1';
         RAISE USING errcode = 'S3IMP' ;
       END;


       select count(col_list) INTO v_col_cnt
       from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;






        -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');


       execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');




       i :=1;
       FOR rec in cr1
       loop
       v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
       v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
   --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'
         THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
                THEN null
                 ELSE
                  coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


       i :=i+1;
       end loop;


         -- raise notice 'v_sql 3, %',v_sql3;


       SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
       SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;


       SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
       SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;


       SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
       SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;


       END IF;
      raise notice 'v_delim , %',v_delim;


     EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;


    raise notice 'stg cnt , %',v_cnt;


    /* if upper(v_delim) = 'CSV' then
       v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
     else
      -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');
       END IF;*/


v_chunk := v_cnt/100;




for i in 1..101
loop
     BEGIN
    -- raise notice 'v_sql , %',v_sql;
       -- raise notice 'Chunk number , %',i;
       v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');


     v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
     -- raise notice 'select statement , %',v_sql_ins;
          -- v_sql := null;
     -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
     --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );


     -- raise notice 'insert statement , %',v_sql;


    raise NOTICE 'CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'CHUNK END %',v_chunk;


     EXECUTE v_sql;


  EXCEPTION
       WHEN OTHERS THEN
       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');
         -- raise notice 'Chunk number for cursor , %',i;


    raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
   raise NOTICE 'Cursor -  CHUNK END %',v_chunk;
         v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');


         v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
        -- raise notice 'v_final_sql %',v_final_sql;
         v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';
           begin






           open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
           loop
           begin
           fetch r into v_rec;
           EXIT WHEN NOT FOUND;




           v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');
            execute v_sql;


           exception
            when others then
          v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';


               execute v_sql;
             continue;
           end ;
           end loop;
           close r;
           exception
           when others then
         raise;
           end ; $a$');
      -- raise notice ' inside excp v_sql %',v_sql;
          execute v_sql;
      --  raise notice 'v_sql %',v_sql;
       END;
  END LOOP;
     ELSE


     SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
       case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
       INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
     FROM  meta_EXTERNAL_TABLE
     WHERE table_name = pi_ext_table                  ;
     v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
       v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
      aws_commons.create_s3_uri
       ( ''',v_directory, ''',''',
       v_file_name, ''',',
        '''',v_region,''')
      )');
         EXECUTE  v_sql;


     FOR rec in cr1
     LOOP


      IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
      THEN
        v_rec_val := 1;
      ELSE


       case
         WHEN upper(rec.data_type) = 'NUMERIC'
         THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
         WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
         THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                 to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
          ELSE
        v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
                THEN null
                 ELSE
                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
       END case;


      END IF;
      v_col_list := concat_ws('',v_col_list ,v_sql1);
     END LOOP;




           SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
           SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;


           v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');




           v_sql_dynamic := v_sql_col;


           EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;




         IF v_rec_val = 1 THEN
             v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;


         ELSE
               v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
           END IF;


     BEGIN
       EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);
           EXCEPTION
              WHEN OTHERS THEN
          IF v_rec_val = 1 THEN
                  v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
                ELSE
                 v_final_sql := ' SELECT col1 from';
               END IF;
       v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';
             begin
             open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
             loop
             begin
             if   v_rec_val = 1 then
             fetch r into line_number,col1;
             else
             fetch r into col1;
             end if;


             EXIT WHEN NOT FOUND;
              if v_rec_val = 1 then
              select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              else
                select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
              end if;


             insert into  ',pi_ext_table,' select v_rec.*;
              exception
              when others then
               INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
               continue;
              end ;
               end loop;
             close r;
              exception
              when others then
              raise;
              end ; $a$');
         execute v_sql;


     END;


         END IF;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;


   EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;


   INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);


   raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );


raise notice 'v_directory, %',v_directory;


   raise notice 'pi_filename, %',pi_filename;


   raise notice 'v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );




   END IF;
 j := j+1;
 END LOOP;


       RETURN 'OK';
EXCEPTION
    WHEN  OTHERS THEN
  raise notice 'error %',sqlerrm;
   ERCODE=SQLSTATE;
   IF ERCODE = 'NTFIP' THEN
     v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
   ELSIF ERCODE = 'S3IMP' THEN
    v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
   ELSE
      v_sqlerrm := sqlerrm;
   END IF;


 select distinct directory into v_directory from  meta_EXTERNAL_TABLE;




 raise notice 'exc v_directory, %',v_directory;


   raise notice 'exc pi_filename, %',pi_filename;


   raise notice 'exc v_region, %',v_region;


  perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',
   aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
   options :='FORmat csv, header, delimiter $$,$$'
   );
    RETURN null;
END;
$function$
```