当前位置:   article > 正文

Data Lake Analytics,大数据的ETL神器!_datalake中数据同步工具

datalake中数据同步工具

0. Data Lake Analytics(简称DLA)介绍

数据湖(Data Lake)是时下大数据行业热门的概念:https://en.wikipedia.org/wiki/Data_lake。基于数据湖做分析,可以不用做任何ETL、数据搬迁等前置过程,实现跨各种异构数据源进行大数据关联分析,从而极大的节省成本和提升用户体验。关于Data Lake的概念。

终于,阿里云现在也有了自己的数据湖分析产品:https://www.aliyun.com/product/datalakeanalytics
可以点击申请使用(目前公测阶段还属于邀测模式),体验本教程分析OTS数据之旅。
产品文档:https://help.aliyun.com/product/70174.html

1. ETL介绍

ETL(https://en.wikipedia.org/wiki/Extract,_transform,_load)就是Extract、Transfrom、Load即抽取、转换、加载,是传统数仓和大数据的重要工具。

抽取:就是从源系统抽取需要的数据,这些源系统是同构或异构的:比如Excel表格、XML文件、关系型数据库。
转换:源系统的数据按照分析目的,转换成目标系统要求的格式,或者做数据清洗和数据加工。
加载:把转换后的数据装载到目标数据库,作为联机分析、数据挖掘、数据展示的基础。

整个ETL过程就像是在源系统和目标系统之间构建一个管道,数据在这个管道里源源不断的流动。

2. DLA与ETL

Data Placement Optimization(数据摆放优化)是目前云平台上的业务系统的主流架构方向和思路。架构师们会从读写性能、稳定性、强一致性、成本、易用性、开发效率等方面来考量不同存储引擎给业务上带来的好处,从而实现整个业务系统的完美的平衡状态。

而这种跨异构数据源之间的数据搬迁,却不是一件容易的事情。很多ELT工具基本上属于框架级别,需要自己开发不少的辅助工具;同时表达能力也较弱,无法满足很多场景;另外对异构数据源的抽象和兼容性也不是那么完美。

反观DLA,无论从哪方面来看,DLA都完美的契合ETL的需求场景。下图是DLA的简易架构图,DLA一开始就是基于“MPP计算引擎+存储计算分离+弹性高可用+异构数据集源”等架构原则来设计的,支持各种异构数据源读写是DLA的核心目标!

通过连接异构数据源来执行select + join + subQuery等逻辑实现Extract,通过Filter+ Project + Aggregation + Sort + Functions等实现数据流转换和映射Transform,而通过insert实现Load,下面是一个例子:

  1. --基本格式
  2. insert into target_table (col1, col2, col3, ....)  --需要导入的列以及列的顺序
  3. select c1, c2, c3, ....     --需要与导入列的类型兼容,顺序要确认清楚
  4. from ... --可以是任何你想要查询的数据目标
  5. where ...
  6. --下面是一个例子
  7. insert into target_table (id, name, age)
  8. select s1.pk1, s2.name, s1.age
  9. from source_table1 s1
  10. join source_table2 s2
  11. on s1.sid = s2.sid
  12. where s1.xxx = 'yyy'

下面我们就尝试往不同的数据源导入数据吧。

3. 实际测试(以TableStore:为例)

  • 准备DLA账号(已有测试账号)

    • 测试集群:上海region;
    • 账号账号:DLA测试账号;
  • 准备两个来源表(两个TPC-H的OSS表,customer和nation),用来做join和数据查询;
  • 准备一个TableStore(https://help.aliyun.com/document_detail/27280.html)的目标表;
  • 执行导入SQL,写入数据后校验结果;

a)两个来源表定义:

  1. mysql> show create database tpch_50x_text;
  2. +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  3. | Database | Create Database |
  4. +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  5. | tpch_50x_text | CREATE DATABASE `tpch_50x_text`
  6. WITH DBPROPERTIES (
  7. catalog = 'hive',
  8. location = 'oss://${您的bucket}/datasets/tpch/50x/text_date/'
  9. )
  10. COMMENT '' |
  11. +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  12. 1 row in set (0.02 sec)
  13. mysql> show tables;
  14. +------------+
  15. | Table_Name |
  16. +------------+
  17. | customer |
  18. | nation     |
  19. +------------+
  20. 2 rows in set (0.03 sec)
  21. mysql> show create table customer;
  22. +----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  23. | Table | Create Table |
  24. +----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  25. | customer | CREATE EXTERNAL TABLE `tpch_50x_text`.`customer` (
  26. `c_custkey` int,
  27. `c_name` string,
  28. `c_address` string,
  29. `c_nationkey` int,
  30. `c_phone` string,
  31. `c_acctbal` double,
  32. `c_mktsegment` string,
  33. `c_comment` string
  34. )
  35. ROW FORMAT DELIMITED
  36. FIELDS TERMINATED BY '|'
  37. STORED AS `TEXTFILE`
  38. LOCATION 'oss://${您的bucket}/datasets/tpch/50x/text_date/customer_text' |
  39. +----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  40. 1 row in set (0.90 sec)
  41. mysql> show create table nation;
  42. +------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  43. | Table | Create Table |
  44. +------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  45. | nation     | CREATE EXTERNAL TABLE `tpch_50x_text`.`nation` (
  46. `n_nationkey` int,
  47. `n_name` string,
  48. `n_regionkey` int,
  49. `n_comment` string
  50. )
  51. ROW FORMAT DELIMITED
  52. FIELDS TERMINATED BY '|'
  53. STORED AS `TEXTFILE`
  54. LOCATION 'oss://${您的bucket}/datasets/tpch/50x/text_date/nation_text' |
  55. +------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  56. 1 row in set (0.73 sec)

b)准备TableStore的库和表

  1. ## 建库
  2. mysql> show create database etl_ots_test;
  3. +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  4. | Database | Create Database |
  5. +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  6. | etl_ots_test | CREATE DATABASE `etl_ots_test`
  7. WITH DBPROPERTIES (
  8. catalog = 'ots',
  9. location = 'https://${您的instance}.cn-shanghai.ots-internal.aliyuncs.com',
  10. instance = '${您的instance}'
  11. )
  12. COMMENT '' |
  13. +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  14. 1 row in set (0.02 sec)
  15. ## 使用库
  16. mysql> use etl_ots_test;
  17. Database changed
  18. ## 建表
  19. mysql> show create table test_insert;
  20. +-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  21. | Table | Create Table |
  22. +-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  23. | test_insert | CREATE EXTERNAL TABLE `test_insert` (
  24. `id1_int` int NOT NULL COMMENT '客户id主键',
  25. `c_address` varchar(20) NULL COMMENT '客户的地址',
  26. `c_acctbal` double NULL COMMENT '客户的account balance',
  27. PRIMARY KEY (`id1_int`)
  28. )
  29. COMMENT '' |
  30. +-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  31. 1 row in set (0.03 sec)

以下是实际数据的截图:

c)开始导入数据,确保导入字段顺序和类型兼容性

  1. ## 检查数据,都是空的
  2. mysql> select * from etl_ots_test.test_insert;
  3. Empty set (0.31 sec)
  1. mysql> use tpch_50x_text;
  2. Database changed
  3. ## 查询下nation数据,其中CANADA的nationkey是3,后续要找这个数据
  4. mysql> select n_nationkey, n_name from nation;
  5. +-------------+----------------+
  6. | n_nationkey | n_name |
  7. +-------------+----------------+
  8. | 0 | ALGERIA |
  9. | 1 | ARGENTINA |
  10. | 2 | BRAZIL |
  11. | 3 | CANADA |
  12. | 4 | EGYPT |
  13. | 5 | ETHIOPIA |
  14. | 6 | FRANCE |
  15. | 7 | GERMANY |
  16. | 8 | INDIA |
  17. | 9 | INDONESIA |
  18. | 10 | IRAN |
  19. | 11 | IRAQ |
  20. | 12 | JAPAN |
  21. | 13 | JORDAN |
  22. | 14 | KENYA |
  23. | 15 | MOROCCO |
  24. | 16 | MOZAMBIQUE |
  25. | 17 | PERU |
  26. | 18 | CHINA |
  27. | 19 | ROMANIA |
  28. | 20 | SAUDI ARABIA |
  29. | 21 | VIETNAM |
  30. | 22 | RUSSIA |
  31. | 23 | UNITED KINGDOM |
  32. | 24 | UNITED STATES |
  33. +-------------+----------------+
  34. 25 rows in set (0.37 sec)
  35. ## 查询下customer数据,我们只关注nationkey=3以及c_mktsegment='BUILDING'的数据
  36. mysql> select count(*) from customer where c_nationkey = 3 and c_mktsegment = 'BUILDING';
  37. +----------+
  38. | count(*) |
  39. +----------+
  40. | 60350 |
  41. +----------+
  42. 1 row in set (0.66 sec)
  43. ## 查询下customer数据,我们只关注nationkey=3以及c_mktsegment='BUILDING'的数据
  44. mysql> select * from customer where c_nationkey = 3 and c_mktsegment = 'BUILDING' order by c_custkey limit 3;
  45. +-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+
  46. | c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment |
  47. +-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+
  48. | 13 | Customer#000000013 | nsXQu0oVjD7PM659uC3SRSp | 3 | 13-761-547-5974 | 3857.34 | BUILDING | ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely |
  49. | 27 | Customer#000000027 | IS8GIyxpBrLpMT0u7 | 3 | 13-137-193-2709 | 5679.84 | BUILDING | about the carefully ironic pinto beans. accoun |
  50. | 40 | Customer#000000040 | gOnGWAyhSV1ofv | 3 | 13-652-915-8939 | 1335.3 | BUILDING | rges impress after the slyly ironic courts. foxes are. blithely |
  51. +-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+
  52. 3 rows in set (0.78 sec)

导入之前我们想清楚需求:把国家是'CANADA'的,客户的market segmentation为'BUILDING'的客户找到,然后对c_custkey排序,选择前10条数据,然后选择他们的c_custkey、c_address、c_acctbal三列,清晰到OTS的test_insert表中,以备后续使用

  1. ##先查询下数据,看看有几条数据
  2. mysql> select c.c_custkey, c.c_address, c.c_acctbal
  3. -> from tpch_50x_text.customer c
  4. -> join tpch_50x_text.nation n
  5. -> on c.c_nationkey = n.n_nationkey
  6. -> where n.n_name = 'CANADA'
  7. -> and c.c_mktsegment = 'BUILDING'
  8. -> order by c.c_custkey
  9. -> limit 10;
  10. +-----------+--------------------------------+-----------+
  11. | c_custkey | c_address | c_acctbal |
  12. +-----------+--------------------------------+-----------+
  13. | 13 | nsXQu0oVjD7PM659uC3SRSp | 3857.34 |
  14. | 27 | IS8GIyxpBrLpMT0u7 | 5679.84 |
  15. | 40 | gOnGWAyhSV1ofv | 1335.3 |
  16. | 64 | MbCeGY20kaKK3oalJD,OT | -646.64 |
  17. | 255 | I8Wz9sJBZTnEFG08lhcbfTZq3S | 3196.07 |
  18. | 430 | s2yfPEGGOqHfgkVSs5Rs6 qh,SuVmR | 7905.17 |
  19. | 726 | 4w7DOLtN9Hy,xzZMR | 6253.81 |
  20. | 905 | f iyVEgCU2lZZPCebx5bGp5 | -600.73 |
  21. | 1312 | f5zgMB4MHLMSHaX0tDduHAmVd4 | 9459.5 |
  22. | 1358 | t23gsl4TdVXqTZha DioEHIq5w7y | 5149.23 |
  23. +-----------+--------------------------------+-----------+
  24. 10 rows in set (1.09 sec)
  25. ##开始导入
  26. mysql> insert into etl_ots_test.test_insert (id1_int ,c_address, c_acctbal)
  27. -> select c.c_custkey, c.c_address, c.c_acctbal
  28. -> from tpch_50x_text.customer c
  29. -> join tpch_50x_text.nation n
  30. -> on c.c_nationkey = n.n_nationkey
  31. -> where n.n_name = 'CANADA'
  32. -> and c.c_mktsegment = 'BUILDING'
  33. -> order by c.c_custkey
  34. -> limit 10;
  35. +------+
  36. | rows |
  37. +------+
  38. | 10 |
  39. +------+
  40. 1 row in set (2.14 sec)
  41. ## 验证结果,没有问题:
  42. mysql> select * from etl_ots_test.test_insert;
  43. +---------+--------------------------------+-----------+
  44. | id1_int | c_address | c_acctbal |
  45. +---------+--------------------------------+-----------+
  46. | 13 | nsXQu0oVjD7PM659uC3SRSp | 3857.34 |
  47. | 27 | IS8GIyxpBrLpMT0u7 | 5679.84 |
  48. | 40 | gOnGWAyhSV1ofv | 1335.3 |
  49. | 64 | MbCeGY20kaKK3oalJD,OT | -646.64 |
  50. | 255 | I8Wz9sJBZTnEFG08lhcbfTZq3S | 3196.07 |
  51. | 430 | s2yfPEGGOqHfgkVSs5Rs6 qh,SuVmR | 7905.17 |
  52. | 726 | 4w7DOLtN9Hy,xzZMR | 6253.81 |
  53. | 905 | f iyVEgCU2lZZPCebx5bGp5 | -600.73 |
  54. | 1312 | f5zgMB4MHLMSHaX0tDduHAmVd4 | 9459.5 |
  55. | 1358 | t23gsl4TdVXqTZha DioEHIq5w7y | 5149.23 |
  56. +---------+--------------------------------+-----------+
  57. 10 rows in set (0.27 sec)

d)注意点:

虽然有ETL工具快速导入导出,但也有些问题需要注意的,比如:

  • 如果导入任务时间太长,请走异步模式,否则连接断开可能会影响任务正常运行;
  • TableStore目前的insert是根据主键覆盖,主键不会去重判断的,请务必不能对你正常的数据表做插入;
  • 目前DLA和TableStore的事务能力还不够,可能会出现中断,已导入的数据不会清楚,需要自行清理;
  • 列的个数和列的类型,需要自己对齐保障,否则会报错;

4. 其他数据源导入

整个过程是不是很简单?是不是想要导入其他场景的数据源?对DLA而言,底层任何数据源都以相同方式处理,只要确保其他数据源的库、表在DLA中正常创建,就可以正常的读写,实现ETL啦!赶紧试试吧!

其他相关的文档:

原文链接 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/375211
推荐阅读
相关标签
  

闽ICP备14008679号