赞
踩
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
源码地址:https://github.com/alibaba/DataX
Apache Sqoop(SQL-to-Hadoop)项目旨在协助RDBMS与Hadoop之间进行高效的大数据交流。用户可以在 Sqoop 的帮助下,轻松地把关系型数据库的数据导入到 Hadoop 与其相关的系统 (如HBase和Hive)中;同时也可以把数据从 Hadoop 系统里抽取并导出到关系型数据库里。
Sqoop是一个在结构化数据和Hadoop之间进行批量数据迁移的工具,结构化数据可以是MySQL、Oracle等RDBMS。Sqoop底层用MapReduce程序实现抽取、转换、加载,MapReduce天生的特性保证了并行化和高容错率,而且相比Kettle等传统ETL工具,任务跑在Hadoop集群上,减少了ETL服务器资源的使用情况。在特定场景下,抽取过程会有很大的性能提升。
MySQL中的库ecommerce有以下表,需要将以下表导入Hive中的ecommerce库中作为ODS层以后进一步处理。
可以通过sqoop和DataX两种方式来实现数据从MySQL导入Hive功能。
在Hive里建库 ecommerce
create database ecommerce;
使用Python写入DataX配置文件批量生成脚本gen_import_config.py
# coding=utf-8 import json import getopt import os import sys import MySQLdb #MySQL相关配置,需根据实际情况作出修改 mysql_host = "hadoop102" mysql_port = "3306" mysql_user = "root" mysql_passwd = "123456" #HDFS NameNode相关配置,需根据实际情况作出修改 hdfs_nn_host = "hadoop102" hdfs_nn_port = "8020" #生成配置文件的目标路径,可根据实际情况作出修改 output_path = "/opt/module/datax/job/import" #获取mysql连接 def get_connection(): return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd) #获取表格的元数据 包含列名和数据类型 def get_mysql_meta(database, table): connection = get_connection() cursor = connection.cursor() sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION" cursor.execute(sql, [database, table]) fetchall = cursor.fetchall() cursor.close() connection.close() return fetchall #获取mysql表的列名 def get_mysql_columns(database, table): return map(lambda x: x[0], get_mysql_meta(database, table)) #将获取的元数据中mysql的数据类型转换为hive的数据类型 写入到hdfswriter中 def get_hive_columns(database, table): def type_mapping(mysql_type): mappings = { "bigint": "bigint", "int": "bigint", "smallint": "bigint", "tinyint": "bigint", "decimal": "string", "double": "double", "float": "float", "binary": "string", "char": "string", "varchar": "string", "datetime": "string", "time": "string", "timestamp": "string", "date": "string", "text": "string", "tinytext": "string" } return mappings[mysql_type] meta = get_mysql_meta(database, table) return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta) #生成json文件 def generate_json(source_database, source_table): job = { "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": mysql_user, "password": mysql_passwd, "column": get_mysql_columns(source_database, source_table), "splitPk": "", "connection": [{ "table": [source_table], "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database] }] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port, "fileType": "text", "path": "${targetdir}", "fileName": source_table, "column": get_hive_columns(source_database, source_table), "writeMode": "append", "fieldDelimiter": "\t", "compress": "gzip" } } }] } } if not os.path.exists(output_path): os.makedirs(output_path) with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f: json.dump(job, f) def main(args): source_database = "" source_table = "" options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl=']) for opt_name, opt_value in options: if opt_name in ('-d', '--sourcedb'): source_database = opt_value if opt_name in ('-t', '--sourcetbl'): source_table = opt_value generate_json(source_database, source_table) if __name__ == '__main__': main(sys.argv[1:])
由于需要使用Python访问Mysql数据库,故需安装驱动,命令如下:
yum install -y MySQL-python
调用上述的配置文件脚本
#!/bin/bash
python /root/bin/gen_import_config.py -d ecommerce -t t_commodity
python /root/bin/gen_import_config.py -d ecommerce -t t_commodity_cate
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon_member
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon_order
python /root/bin/gen_import_config.py -d ecommerce -t t_delivery
python /root/bin/gen_import_config.py -d ecommerce -t t_feedback
python /root/bin/gen_import_config.py -d ecommerce -t t_member
python /root/bin/gen_import_config.py -d ecommerce -t t_member_addr
python /root/bin/gen_import_config.py -d ecommerce -t t_order
python /root/bin/gen_import_config.py -d ecommerce -t t_order_commodity
python /root/bin/gen_import_config.py -d ecommerce -t t_shop
python /root/bin/gen_import_config.py -d ecommerce -t t_shop_order
python /root/bin/gen_import_config.py -d ecommerce -t t_user
在/opt/module/datax/job/import可以看到json文件
编写脚本将mysql数据传入HDFS中
#!/bin/bash DATAX_HOME=/opt/module/datax #处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行 handle_targetdir() { hadoop fs -test -e $1 if [[ $? -eq 1 ]]; then echo "路径$1不存在,正在创建......" hadoop fs -mkdir -p $1 else echo "路径$1已经存在" fs_count=$(hadoop fs -count $1) content_size=$(echo $fs_count | awk '{print $3}') if [[ $content_size -eq 0 ]]; then echo "路径$1为空" else echo "路径$1不为空,正在清空......" hadoop fs -rm -r -f $1/* fi fi } #数据同步 import_data() { datax_config=$1 target_dir=$2 handle_targetdir $target_dir python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config } case $1 in "t_commodity") import_data /opt/module/datax/job/import/ecommerce.t_commodity.json /ecommerce/t_commodity ;; "t_commodity_cate") import_data /opt/module/datax/job/import/ecommerce.t_commodity_cate.json /ecommerce/t_commodity_cate ;; "t_coupon") import_data /opt/module/datax/job/import/ecommerce.t_coupon.json /ecommerce/t_coupon ;; "t_coupon_member") import_data /opt/module/datax/job/import/ecommerce.t_coupon_member.json /ecommerce/t_coupon_member ;; "t_coupon_order") import_data /opt/module/datax/job/import/ecommerce.t_coupon_order.json /ecommerce/t_coupon_order ;; "t_delivery") import_data /opt/module/datax/job/import/ecommerce.t_delivery.json /ecommerce/t_delivery ;; "t_feack") import_data /opt/module/datax/job/import/ecommerce.t_feack.json /ecommerce/t_feack ;; "t_member") import_data /opt/module/datax/job/import/ecommerce.t_member.json /ecommerce/t_member ;; "t_member_addr") import_data /opt/module/datax/job/import/ecommerce.t_member_addr.json /ecommerce/t_member_addr ;; "t_order") import_data /opt/module/datax/job/import/ecommerce.t_order.json /ecommerce/t_order ;; "t_order_commodity") import_data /opt/module/datax/job/import/ecommerce.t_order_commodity.json /ecommerce/t_order_commodity ;; "t_shop") import_data /opt/module/datax/job/import/ecommerce.t_shop.json /ecommerce/t_shop ;; "t_shop_order") import_data /opt/module/datax/job/import/ecommerce.t_shop_order.json /ecommerce/t_shop_order ;; "t_user") import_data /opt/module/datax/job/import/ecommerce.t_user.json /ecommerce/t_user ;; "all") import_data /opt/module/datax/job/import/ecommerce.t_commodity.json /ecommerce/t_commodity import_data /opt/module/datax/job/import/ecommerce.t_commodity_cate.json /ecommerce/t_commodity_cate import_data /opt/module/datax/job/import/ecommerce.t_coupon.json /ecommerce/t_coupon import_data /opt/module/datax/job/import/ecommerce.t_coupon_member.json /ecommerce/t_coupon_member import_data /opt/module/datax/job/import/ecommerce.t_coupon_order.json /ecommerce/t_coupon_order import_data /opt/module/datax/job/import/ecommerce.t_delivery.json /ecommerce/t_delivery import_data /opt/module/datax/job/import/ecommerce.t_feedback.json /ecommerce/t_feedback import_data /opt/module/datax/job/import/ecommerce.t_member.json /ecommerce/t_member import_data /opt/module/datax/job/import/ecommerce.t_member_addr.json /ecommerce/t_member_addr import_data /opt/module/datax/job/import/ecommerce.t_order.json /ecommerce/t_order import_data /opt/module/datax/job/import/ecommerce.t_order_commodity.json /ecommerce/t_order_commodity import_data /opt/module/datax/job/import/ecommerce.t_shop.json /ecommerce/t_shop import_data /opt/module/datax/job/import/ecommerce.t_shop_order.json /ecommerce/t_shop_order import_data /opt/module/datax/job/import/ecommerce.t_user.json /ecommerce/t_user ;; esac
调用上述函数可以在HDFS上看到数据库文件
启动hiveserver2在Hive中建表,与MySQL字段映射关系如下
写入SQL建表及加载HDFS上的数据
DROP TABLE IF EXISTS `t_commodity`; CREATE TABLE `t_commodity` ( `id` BIGINT, `commodity_name` STRING, `commodity_price` DECIMAL(10,2), `commodity_cate_one` BIGINT, `commodity_cate_two` BIGINT, `create_user_id` BIGINT, `status` BIGINT, `create_time` STRING, `update_time` STRING )COMMENT '商品信息表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_commodity/'; load data inpath 'hdfs:/ecommerce/t_commodity' into table `t_commodity`; DROP TABLE IF EXISTS `t_commodity_cate`; CREATE TABLE `t_commodity_cate` ( `id` BIGINT, `cate_name` STRING, `cate_parent_id` BIGINT, `create_user_id` BIGINT, `status` BIGINT, `create_time` STRING, `update_time` STRING )COMMENT '商品类别信息表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_commodity_cate/'; load data inpath 'hdfs:/ecommerce/t_commodity_cate' into table `t_commodity_cate`; DROP TABLE IF EXISTS `t_coupon`; CREATE TABLE `t_coupon` ( `id` BIGINT, `coupon_name` STRING, `coupon_price` decimal(10,2), `create_user_id` BIGINT, `create_time` STRING, `update_time` STRING )COMMENT '优惠券表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_coupon/'; load data inpath 'hdfs:/ecommerce/t_coupon' into table `t_coupon`; DROP TABLE IF EXISTS `t_coupon_member`; CREATE TABLE `t_coupon_member` ( `id` BIGINT, `coupon_id` BIGINT, `member_id` BIGINT, `coupon_channel` BIGINT COMMENT '1 用户购买 2 公司发放', `create_time` STRING, `update_time` STRING )COMMENT '用户优惠券表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_coupon_member/'; load data inpath 'hdfs:/ecommerce/t_coupon_member' into table `t_coupon_member`; DROP TABLE IF EXISTS `t_member`; CREATE TABLE `t_member` ( `id` BIGINT, `name` STRING, `password` STRING, `sex` STRING, `phone` STRING, `address_default_id` BIGINT, `member_channel` BIGINT COMMENT '1 IOS 2 android 3 微信小程序 4 微信公众号 5 h5', `mp_open_id` STRING COMMENT '微信公众号openId', `status` BIGINT, `create_time` STRING, `update_time` STRING )COMMENT '用户信息表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_member/'; load data inpath 'hdfs:/ecommerce/t_member' into table `t_member`; DROP TABLE IF EXISTS `t_member_addr`; CREATE TABLE `t_member_addr` ( `id` BIGINT, `member_id` BIGINT, `contact_person` STRING, `contact_phone` STRING, `address` STRING, `create_time` STRING, `update_time` STRING )COMMENT '用户地址表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_member_addr/'; load data inpath 'hdfs:/ecommerce/t_member_addr' into table `t_member_addr`; DROP TABLE IF EXISTS `t_delivery`; CREATE TABLE `t_delivery` ( `id` BIGINT, `delivery_no` STRING, `order_id` BIGINT, `shop_id` BIGINT, `postman` BIGINT, `pick_time` STRING, `arrive_time` STRING, `member_id` BIGINT, `member_addr_id` BIGINT )COMMENT '快递表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_delivery/'; load data inpath 'hdfs:/ecommerce/t_delivery' into table `t_delivery`; DROP TABLE IF EXISTS `t_feedback`; CREATE TABLE `t_feedback` ( `id` BIGINT, `member_id` BIGINT, `create_user_id` BIGINT, `feedback_content` STRING, `feedback_type` BIGINT COMMENT '1 破损 2 缺货 3 错货 4 投诉' )COMMENT '反馈表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_feedback/'; load data inpath 'hdfs:/ecommerce/t_feedback' into table `t_feedback`; DROP TABLE IF EXISTS `t_shop`; CREATE TABLE `t_shop` ( `id` BIGINT, `shop_name` STRING, `city_id` BIGINT, `city_name` STRING, `area_id` BIGINT, `area_name` STRING, `charge_user` STRING, `create_time` STRING, `update_time` STRING )COMMENT '商店表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_shop/'; load data inpath 'hdfs:/ecommerce/t_shop' into table `t_shop`; DROP TABLE IF EXISTS `t_shop_order`; CREATE TABLE `t_shop_order` ( `id` BIGINT, `shop_id` BIGINT, `order_id` BIGINT, `start_time` STRING, `done_time` STRING )COMMENT '商家订单表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_shop_order/'; load data inpath 'hdfs:/ecommerce/t_shop_order' into table `t_shop_order`; DROP TABLE IF EXISTS `t_user`; CREATE TABLE `t_user` ( `id` BIGINT, `user_name` STRING, `user_password` STRING, `user_phone` STRING, `create_time` STRING, `update_time` STRING )COMMENT '后台用户表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_user/'; load data inpath 'hdfs:/ecommerce/t_user' into table `t_user`; DROP TABLE IF EXISTS `t_order`; CREATE TABLE `t_order` ( `order_id` BIGINT, `member_id` BIGINT, `origin_price` decimal(10,2), `pay_price` decimal(10,2), `shop_id` BIGINT, `shop_name` STRING, `order_status` STRING COMMENT '1,进行中 2 已完成 3 已取消', `create_time` STRING, `update_time` STRING )COMMENT '订单表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_order/'; load data inpath 'hdfs:/ecommerce/t_order' into table `t_order`; DROP TABLE IF EXISTS `t_order_commodity`; CREATE TABLE `t_order_commodity` ( `id` BIGINT, `order_id` BIGINT, `commodity_id` BIGINT, `commodity_name` STRING, `commodity_num` BIGINT, `commodity_price` decimal(10,2), `create_time` STRING, `update_time` STRING )COMMENT '订单商品表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/warehouse/ecommerce/t_order_commodity/'; load data inpath 'hdfs:/ecommerce/t_order_commodity' into table `t_order_commodity`;
在Hive中可以看到如下表格
数据也都进入了Hive表中
在Hive里建库 ecommerce
create database ecommerce;
运行下面的函数可以将数据从MySQL导入Hive
#!/bin/bash # 定义一个函数,执行sqoop命令,所以执行脚本应该在sqoop/bin下面 sq() { ./sqoop import \ --connect jdbc:mysql://hadoop102:3306/ecommerce \ --username root \ --password 000000 \ --table $1 \ --num-mappers 1 \ --hive-import \ --fields-terminated-by "\t" \ --hive-overwrite \ --hive-database ecommerce \ --hive-table $1 } sq t_commodity sq t_commodity_cate sq t_coupon sq t_coupon_member sq t_coupon_order sq t_delivery sq t_feedback sq t_member sq t_member_addr sq t_order sq t_order_commodity sq t_shop sq t_shop_order sq t_user
1、异构数据库 和 文件系统 之间的数据交换;
2、采用 Framework + plugin 架构构建,
Framework 处理了缓冲,流控,并发,上下文加载
等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,
插件仅需实现对数据处理系统的访问;
3、数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有 IPC;
4、开放式的框架,开发者可以在极短的时间开发一个新插件
以快速支持新的数据库/文件系统。
1、可以将关系型数据库中的数据导入 hdfs、hive 或者 hbase 等 hadoop 组件中,
也可将 hadoop 组件中的数据导入到关系型数据库中;
2、sqoop 在导入导出数据时,充分采用了 map-reduce 计算框架,
根据输入条件生成一个 map-reduce 作业,在 hadoop 集群中运行。
采用 map-reduce 框架同时在多个节点进行 import 或者 export 操作,
速度比单节点运行多个并行导入导出效率高,同时提供了良好的并发性和容错性;
3、支持 insert、update 模式,可以选择参数,若内容存在就更新,若不存在就插入;
4、对国外的主流关系型数据库支持性更好。
1、sqoop 采用 map-reduce 计算框架进行导入导出,而 datax 仅仅在运行 datax 的单台机器上进行数据的抽取和加载,速度比 sqoop 慢了许多;
2、sqoop 只可以在关系型数据库和 hadoop 组件之间进行数据迁移,
而在 hadoop 相关组件之间,比如 hive 和 hbase 之间就无法使用 sqoop 互相导入导出数据,同时在关系型数据库之间,比如 mysql 和 oracle 之间也无法通过 sqoop 导入导出数据。
datax 能够分别实现关系型数据库 hadoop 组件之间、关系型数据库之间、hadoop 组件之间的数据迁移;
3、sqoop 是专门为 hadoop 而生,对 hadoop 支持度好,而 datax 可能会出现不支持高版本 hadoop 的现象;
4、sqoop 只支持官方提供的指定几种关系型数据库和 hadoop 组件之间的数据交换,而在 datax 中,用户只需根据自身需求修改文件,生成相应 rpm 包,自行安装之后就可以使用自己定制的插件;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。