赞
踩
${}为传参
{ "job": { "setting": { "speed": { "channel": 5 }, "errorLimit":{ "record":0, "percentage":0 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "${SRC_MYSQL_业务名称_USR}", "password": "${SRC_MYSQL_业务名称_PWD}", "connection": [ { "querySql": [ "select id, pro_id, rtrim(ltrim(replace(replace(replace(device_id,char(9),''),char(10),''),char(13),''))), '${CURRENT_FLOW_START_DAY}', ${EXECUTION_ID} from ${SRC_MYSQL_业务名称_DB_库名称}.表名称_98" ], "jdbcUrl": [ "${SRC_MYSQL_业务名称_URL}" ] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "pro_id", "type": "bigint" }, { "name": "device_id", "type": "string" }, { "name": "op_time", "type": "string" }, { "name": "execution_id", "type": "string" } ], "defaultFS": "${HDFS_ROOT_URI}", "fieldDelimiter": "\t", "fileName": "表名称", "fileType": "orc", "path": "${HDFS_HV_EXTTB_ROOT}/${HIVE_DB_ODS_业务名称}/表名称/load_date=${CURRENT_FLOW_START_DAY}/record_id=98", "writeMode": "append" }, "extendScript": { "post": [ { "type":"hql", "script":"ALTER TABLE ${HIVE_DB_ODS_业务名称}.表名称 ADD IF NOT EXISTS PARTITION(load_date='${CURRENT_FLOW_START_DAY}',record_id='98')" } ] } } } ] } }
本质:将"querysql"拆分成"column","where","table",MysqlReader根据指定的column、table、where条件拼接SQL
并发task数。看自己需求调整
记得加双引号
将全量改为增量抽取
将100张表放入一个脚本
{ "job": { "setting": { "speed": { "channel": 20 }, "errorLimit":{ "record":0, "percentage":0 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "${SRC_MYSQL_业务名称_USR}", "password": "${SRC_MYSQL_业务名称_PWD}", "column": [ "id", "pro_id", "rtrim(ltrim(replace(replace(replace(device_id,char(9),''),char(10),''),char(13),'')))", "update_time", "${CURRENT_FLOW_START_DAY}", "${EXECUTION_ID}" ], "where": "date_format(update_time,'%Y-%m-%d') >='${START_DATEKEY}'", "connection": [ { "table": [ "${SRC_MYSQL_业务名称_DB_库名称}.表名称_0", 。。。。。。。此处省略。。。。。。。。。 "${SRC_MYSQL_业务名称_DB_库名称}.表名称_99" ], "jdbcUrl": [ "${SRC_MYSQL_业务名称_URL}" ] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "pro_id", "type": "bigint" }, { "name": "device_id", "type": "string" }, { "name": "update_time", "type": "string" }, { "name": "op_time", "type": "string" }, { "name": "execution_id", "type": "string" } ], "defaultFS": "${HDFS_ROOT_URI}", "fieldDelimiter": "\t", "fileName": "表名称", "fileType": "orc", "path": "${HDFS_HV_EXTTB_ROOT}/${HIVE_DB_ODS_业务名称}/表名称/load_date=${CURRENT_FLOW_START_DAY}", "writeMode": "append" }, "extendScript": { "post": [ { "type":"hql", "script":"ALTER TABLE ${HIVE_DB_ODS_业务名称}.表名称 ADD IF NOT EXISTS PARTITION(load_date='${CURRENT_FLOW_START_DAY}')" } ] } } } ] } }
原先的全量变增量
use ${HIVE_DB_DWD_业务名称}; insert overwrite table dwd_表名称 select t.id, t.pro_id, t.device_id, t.update_time, t.op_time, t.execution_id, '${CURRENT_FLOW_START_DAY}' as load_date from ( select --取当天增量数据/重跑的全部数据 ods.id, ods.pro_id, ods.device_id, ods.update_time, ods.op_time, ods.execution_id from (select * from ${HIVE_DB_ODS_业务名称}.表名称 where load_date = '${CURRENT_FLOW_START_DAY}' ) ods union all select --如果是增量数据,取原数据,否则,不取 dwd.id, dwd.pro_id, dwd.device_id, dwd.update_time, dwd.op_time, dwd.execution_id from (select * from ${HIVE_DB_DWD_业务名称}.表名称 ) dwd left join (select * from ${HIVE_DB_ODS_业务名称}.表名称 where load_date = '${CURRENT_FLOW_START_DAY}' )ods on dwd.id=ods.id where ods.id is null )t ;
table
”, “1”, “‘bazhen.csy’”, “null”, “to_char(a + 1)”, “2.3” , “true”] id为普通列名,table
为包含保留在的列名,1为整形数字常量,'bazhen.csy’为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。