赞
踩
该脚本主要是使用python脚本通过dataX查询mysql的数据增量同步至HIVE。
1、python3
2、dataX
3、pyhive
4、pyspark
(1)支持测试和生产的自由切换。
(2)支持增量同步。
(3)支持补历史数据。
(4)运行环境简单。
(5)支持HIVE队列的切换。
对于同步数据,该脚本基本已经都支持。还有优化空间就是:
1、连接HIVE时可以使用HA模式,不连接单节点。减少宕机风险。
2、日志打印的规范性。
3、dataX脚本的缺失。由于其他原因不能放出来。我后面见放一个demo出来。
4、可以通过变量的形式将要同步的表目标表传进来。实现举一反三的同步。
后续在其他脚本优化。
#!/usr/bin/env python # -*- coding: utf-8 -*- # 增量同步消息 from pyhive import hive import os, sys,datetime isPrd = True hiveInfo = {'host':'192.168.1.1','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'} \ if(isPrd) else {'host':'192.168.1.122','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'} sourceDbInfo = {'url':'192.168.1.1:3306/db','user':'root','passwd':'123'} \ if(isPrd) else {'url':'192.168.1.122:3306/db','user':'root','passwd':'root123'} sys.path.append(os.getcwd()) UTF8 = "UTF-8"; class HiveClient: def __init__(self): self.conn = hive.connect( host=hiveInfo.get('host'), port=hiveInfo.get('port'), username=hiveInfo.get('user'), database=hiveInfo.get('database'),) def query(self, sql): sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "") print(sql) with self.conn.cursor() as cursor: cursor.execute("set mapreduce.job.queuename=root.users.project") cursor.execute(sql) return cursor.fetchall() def execute(self, sql): sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "") print(sql) with self.conn.cursor() as cursor: cursor.execute("set mapreduce.job.queuename=root.users.project") cursor.execute(sql) def close(self): self.conn.close() def __getMaxPk(): #增加分区 addPartion="alter table ods.ods_message_incr add if not exists partition (dt='{dt}') ".format(dt=dt) HiveClient().execute(addPartion) #获取最大ID sql = """select max(id) from ods.ods_message_incr where dt='{dt}'""".format(dt=dt) data = HiveClient().query(sql) HiveClient().close() print(data) if (data[0][0] == None): return 0 return data[0][0] # 增量同步推送消息 def syncPushMessage(dt): maxPk = __getMaxPk(); datax_json_path = os.getcwd() + '/ods_message_incr.json' etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d') # 这是执行dataX 命令,后面是传参。 commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % ( datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs')); print(commandStr) os.system(commandStr) # 补充缺失消息 def syncPushMessage_history(dt,maxPk): datax_json_path = os.getcwd() + '/ods_message_incr.json' etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d') commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % ( datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs')); print(commandStr) os.system(commandStr) if __name__ == '__main__': if len(sys.argv) == 1: dt = (datetime.datetime.now()).strftime('%Y-%m-%d') syncPushMessage(dt) elif len(sys.argv) == 2: dt = sys.argv[1] syncPushMessage(dt) elif len(sys.argv) == 3: dt = sys.argv[1] maxPk = sys.argv[2] syncPushMessage_history(dt, maxPk) else: print('参数输入错误') sys.exit(1)
文章为原创,转载请出示原地址。
感谢你的阅读,如果这篇文章能帮到你。是我的荣幸!谢谢~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。