当前位置:   article > 正文

python脚本通过dataX增量同步mysql数据至HIVE_python mysql 增量同步

python mysql 增量同步

python通过dataX同步mysql数据

1、介绍

该脚本主要是使用python脚本通过dataX查询mysql的数据增量同步至HIVE。
  • 1

2、环境

1、python3 
2、dataX
3、pyhive
4、pyspark
  • 1
  • 2
  • 3
  • 4

3、功能

(1)支持测试和生产的自由切换。
(2)支持增量同步。
(3)支持补历史数据。
(4)运行环境简单。
(5)支持HIVE队列的切换。
  • 1
  • 2
  • 3
  • 4
  • 5

4、优化

对于同步数据,该脚本基本已经都支持。还有优化空间就是:
	1、连接HIVE时可以使用HA模式,不连接单节点。减少宕机风险。
	2、日志打印的规范性。
	3、dataX脚本的缺失。由于其他原因不能放出来。我后面见放一个demo出来。
	4、可以通过变量的形式将要同步的表目标表传进来。实现举一反三的同步。
后续在其他脚本优化。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5、源码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 增量同步消息
from pyhive import hive
import os, sys,datetime

isPrd = True
hiveInfo = {'host':'192.168.1.1','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'} \
	if(isPrd)  else {'host':'192.168.1.122','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'}
sourceDbInfo = {'url':'192.168.1.1:3306/db','user':'root','passwd':'123'} \
	if(isPrd)  else {'url':'192.168.1.122:3306/db','user':'root','passwd':'root123'}
sys.path.append(os.getcwd())
UTF8 = "UTF-8";
class HiveClient:
    def __init__(self):
        self.conn = hive.connect(
                        host=hiveInfo.get('host'),
                        port=hiveInfo.get('port'),
                        username=hiveInfo.get('user'),
                        database=hiveInfo.get('database'),)

    def query(self, sql):
        sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
        print(sql)
        with self.conn.cursor() as cursor:
            cursor.execute("set mapreduce.job.queuename=root.users.project")
            cursor.execute(sql)
            return cursor.fetchall()

    def execute(self, sql):
        sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
        print(sql)
        with self.conn.cursor() as cursor:
            cursor.execute("set mapreduce.job.queuename=root.users.project")
            cursor.execute(sql)

    def close(self):
        self.conn.close()




def __getMaxPk():
#增加分区
    addPartion="alter table ods.ods_message_incr add if not exists partition (dt='{dt}') ".format(dt=dt)
    HiveClient().execute(addPartion)
#获取最大ID
    sql = """select max(id) from ods.ods_message_incr where dt='{dt}'""".format(dt=dt)
    data = HiveClient().query(sql)
    HiveClient().close()

    print(data)
    if (data[0][0] == None):
        return 0
    return data[0][0]


# 增量同步推送消息
def syncPushMessage(dt):
    maxPk = __getMaxPk();
    datax_json_path = os.getcwd() + '/ods_message_incr.json'
    etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
    # 这是执行dataX 命令,后面是传参。
    commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s  -Dhdfs=%s '" % (
    datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
    print(commandStr)
    os.system(commandStr)

# 补充缺失消息
def syncPushMessage_history(dt,maxPk):
    datax_json_path = os.getcwd() + '/ods_message_incr.json'
    etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
    commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
    datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
    print(commandStr)
    os.system(commandStr)

if __name__ == '__main__':
    if len(sys.argv) == 1:
        dt = (datetime.datetime.now()).strftime('%Y-%m-%d')
        syncPushMessage(dt)
    elif len(sys.argv) == 2:
        dt = sys.argv[1]
        syncPushMessage(dt)
    elif len(sys.argv) == 3:
        dt = sys.argv[1]
        maxPk = sys.argv[2]
        syncPushMessage_history(dt, maxPk)
    else:
        print('参数输入错误')
        sys.exit(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

6、最后

文章为原创,转载请出示原地址。
感谢你的阅读,如果这篇文章能帮到你。是我的荣幸!谢谢~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/141168
推荐阅读
相关标签
  

闽ICP备14008679号