当前位置:   article > 正文

Oracle数据库和Informatica ETL逻辑及视图、用户定义函数和存储过程迁移到AWS的Hive数据仓库基本流程_informatica etl数据迁移

informatica etl数据迁移

首先,我们需要注册AWS帐号,并启用EC2(Elastic Cloud Computing,高性能云计算)、EMR(Elastic Map-Reduce,高性能大数据计算)、Redshift/Snowflake数据仓库和S3(Simple Storage Service,简单存储服务)、DevOps(用于Github持续集成)和Github Enterprise这些服务。
其次,我们需要梳理清楚Oracle数据库中Informatica Designer中的数据源、目标表和数据转换映射对象中的表、视图以及用户定义函数和存储过程之间的依赖关系。操作上可以使用Oracle的系统存储过程,如dbms_metadata.get_ddl,以及系统视图如user_tables、user_tab_columns来查看数据库对象之间的关系。Oracle数据库中的user_tables和user_tab_columns用法如下所示。
1.查询数据库的所有表和视图:
select * from user_tables;
2.查询表的所有列的信息
– 获取表字段
– user_tab_columns 字段
– table_name 表名称
– column_name 字段名
– data_type 字段类型
– data_length 字段长度
select * from user_tab_columns where table_name=’<表名/视图名>’;
3.获取表定义
set long 2000 select dbms_metadata.get_ddl(‘TABLE’,’<表名>’,’’) from dual;
4.获取物化视图定义
set pagesize 0
set long 2000
select dbms_metadata.get_ddl(‘MATERIALIZED VIEW’,’<视图名>’,’’) from dual;
5.获取普通视图定义
set pagesize 0
set long 10000
SELECT DBMS_METADATA.GET_DDL(‘VIEW’,’<视图名>’,’’) FROM dual;
对于数据迁移,如果EMR集群可以连接到Oracle服务器,我们可以用sqoop来同时连接数据库和Hive数仓导入数据或用PySpark程序去连接数据库,然后读取表数据到DataFrame对象,再存储为Parquet文件,连接到Hive外部表。否则,我们只能用sqlplus命令行工具、SQL Developer工具或Python等编程语言编写的程序将表导出为数据文件,比如Csv文件,导出的时候需要将数据字段全部导出为字符串以保留原始数据的精度,Csv文件的编码为UTF-8。在导出Csv文件时,二进制类型IMAGE、TEXT、NTEXT、BINARY和VARBINARY字段数据是无法导出到Csv文件的,这时我们可以使用SSIS的EXPORT组件将这些二进制类型的数据以主键字段数据为文件名并选择合适的扩展名导出到以字段名命名的文件夹里,放在以表名命名的目录里。用sqoop命令将Oracle表数据导入Hive的命令如下所示。
sqoop import --connect jdbc:oracle:thin:@<Oracle服务器名或IP地址>:1521:<数据库名> --username <用户名> --password <密码> --table <表名> -m 1 --hive-import --hive-database <Hive数据库名>
用sqlplus命令行工具导出字段带引号表数据的Csv文件的命令行如下所示。
spool.sql文件内容:
set colsep ,
set feedback off
set heading off
set trimout on
spool <Csv文件路径>
select ‘"’ || <字段名1> || ‘","’ || <字段名2> || ‘","’ || <字段名3> || ‘","’ || … || ‘","’ || <字段名n> || ‘"’ from <表名>;
spool off
exit

sqlplus命令行调用:
sqlplus -s <用户名>/<密码>@<数据库名> @spool.sql

参数说明:
set colsep’ '; //-域输出分隔符
set newp none //设置查询出来的数据分多少页显示,如果需要连续的数据,中间不要出现空行就把newp设置为none,这样输出的数据行都是连续的,中间没有空行之类的
set echo off; //显示start启动的脚本中的每个sql命令,缺省为on
set echo on //设置运行命令是是否显示语句
set feedback on; //设置显示“已选择XX行”
set feedback off; //回显本次sql命令处理的记录条数,缺省为on即去掉最后的 “已经选择10000行”
set heading off; //输出域标题,缺省为on 设置为off就去掉了select结果的字段名,只显示数据
set pagesize 0; //输出每页行数,缺省为24,为了避免分页,可设定为0。
set linesize 80; //输出一行字符个数,缺省为80
set numwidth 12; //输出number类型域长度,缺省为10
set termout off; //显示脚本中的命令的执行结果,缺省为on
set trimout on; //去除标准输出每行的拖尾空格,缺省为off
set trimspool on; //去除重定向(spool)输出每行的拖尾空格,缺省为off
set serveroutput on; //设置允许显示输出类似dbms_output
set timing on; //设置显示“已用时间:XXXX”
set autotrace on-; //设置允许对执行的sql进行分析
然后将导出的Csv文件压缩为Zip或Gzip格式文件之后上传到S3,在EMR里跑PySpark程序,将压缩文件解压并将Csv读入Dataframe对象然后转换为Parquet格式。另外,我们还可以在公司内网的电脑上安装Spark,接着用PySpark程序直接连到Oracle数据库,将表的数据保存为Parquet格式数据文件,然后直接上传到S3。Csv数据分两次导入到最终Parquet文件,第一次导入stg目录,字段数据类型全为string,第二次导入ods目录,字段类型转型为Oracle表中字段对应的类型,以便在导入数据出现错误时,可以检查中间步骤输出数据是否有错误。
然后建立Hive外部表连接到S3上的Parquet格式数据文件。数据加载和Hive表创建完毕之后需要检验Hive表的字段类型是否与Oracle数据表一致。Oracle数据库中可以查询系统视图user_tab_columns,通过PL/SQL语句select table_name 表名, column_name 字段名, data_type 数据类型, data_length 数据长度, DATA_PRECISION 精度FROM user_tab_columns A where table_name = ‘<表名>’ ORDER BY column_id;来查看表的字段定义。在Hive数据仓库中可以使用命令SHOW CREATE TABLE <表名>;或DESC FORMATTED <表名>;来查看表的字段定义。要检验Hive表的记录行数是否与Oracle数据表一致,可以使用以下PL/SQL语句在Oracle数据库中查询表的行数。
SELECT COUNT(1) FROM <数据库名>.<表名>;
可以使用以下HQL语句在Hive数据仓库中获取表的行数。
SELECT COUNT(1) FROM <Hive数据库>.<表名>;
再并根据Oracle数据库里已经存在的视图定义,在Hive里面创建对应的视图。而存储过程和用户定义函数的逻辑可以在公共库里创建PySpark函数来实现,同时PySpark也能够创建Spark SQL语句里可以使用的UDF。但是由于Oracle数据库的PL/SQL编程语言的内置函数和PySpark的内置函数有所不同,所以需要进行转换。还有Parquet格文件不能修改或追加记录,只能删除后重新创建,所以需要对原来的PL/SQL里面代码进行等效代码替换。比如,对于更新PL/SQL语句,可以在表创建的时候加入更新PL/SQL语句中的字段,并以更新的条件作为CASE WHEN筛选的条件。而对于删除PL/SQL语句,可以在表创建的时候排除删除的条件。对于插入PL/SQL语句,可以在表创建的时候UNION ALL需要插入的数据集或对Hive表进行分区,新建分区插入数据。对于Merge合并语句,可以使用将需要合并数据的表和用来合并的表、视图或查询FULL JOIN一下,然后取后者的值返回,再通过判断合并目标表和源数据的字段值是否为NULL来确定要不要在合并数据的同时插入或删除数据。当然,Oracle数据库里的表分区设置还是可以继续应用在Hive表分区中。
另外,在编写PySpark代码的过程中,还有一些内存和磁盘缓存以及AQE等其它的技巧来优化运行性能。
等数仓数据和源数据迁移完毕,并且PySpark程序也开发好之后。接下来我们需要一个调度平台来管理PySpark程序的运行和监控,比如Airflow,用来连接到PySpark程序并按项目将PySpark程序放在不同的目录下分类存储在S3中。Airflow是一个编排、调度和监控workflow的web平台,由Airbnb开源,跑在EC2上面。数据仓库的数据集市层使用Snowflake或Redshift云数据库存储PySpark程序跑出来的报表数据,以供Tableau报表抽取展现,同时也存储企业的基础数据以供用户自行编写SQL查询数据。Redshift云数据库的运行性能较好,而Snowflake云数据库的易用性和易管理性较好,管理成本也较低。
另外考虑到团队开发的需要,我们这里使用Github Enterprise作为版本控制平台,并且我们在Github的DevOps里配置Jenkins来做持续集成,来实现提交代码到Git代码仓库时,同步保存代码到AWS S3中,直接就能在Airflow里看到结果,对于Hive和Snowflake/Redshift的DDL和DCL的SQL语句提交后直接在云数据库后直接运行的效果。Git代码仓库分成开发、测试和生产三个分支。测试和生产分支的代码保持一致。开发人员在开发分支提交代码后,会直接将代码应用到开发环境。要发布时,先创建一个以ticket ID和ticket描述为名称的代码分友,将修改过的代码放到该分支里,并提交到测试分支,记录下提交的commit id,接着提交代码修改的审批申请,审批申请通过后在测试环境下测试。测试通过之后,使用git cherry-pick,用刚才记录的commit id提交一个补丁到生产分支做代码发布。代码发布时一般Sprint结束的那一天,这个代码发布的分支会和这个Sprint发布的其它代码分支合并,然后使用Github Enterprise的Group Review功能,让所有参与的开发者Review合并后的代码修改,等所有开发者都Review完了,就提交到生产代码仓库,最近由持续集成功能发布修改代码到生产环境并应用相关的DDL、DML和DCL SQL语句和Airflow代码。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/176913?site
推荐阅读
相关标签
  

闽ICP备14008679号