当前位置:   article > 正文

【python】使用python将influxdb1.x版本数据转成mysql数据库表 | influxdb2mysql | influxdb1.x 版本数据读取

【python】使用python将influxdb1.x版本数据转成mysql数据库表 | influxdb2mysql | influxdb1.x 版本数据读取

一、influxdb2mysql

1、问题

1、首先 influxdb 1.x 版本的查询很多无法使用高级函数
2、存入 influxdb中的时间为UTC需要转成东八区

在这里插入图片描述

2、分析

influxdb中数据结构表是meansurement
里面存的字段是field
我们需要查找 field中指定的key(字符串
如下面代码中的category_mapping 字段中的value
然后根据这些field中的key,对应的value值,组装成一行存入下面mysql

在这里插入图片描述

3、源码

import configparser
import logging
from datetime import datetime, timedelta
import time

import pytz
import schedule as schedule
from influxdb import InfluxDBClient
import mysql.connector

# from src.config import DB_CONFIG
logger = logging.getLogger(__name__)

config = configparser.ConfigParser()
config.read('config.ini')

# InfluxDB配置
influxdb_config = {
    'host': config.get('DB_INFLUXDB_CONFIG', 'host'),
    'port': config.get('DB_INFLUXDB_CONFIG', 'port'),
    'username': config.get('DB_INFLUXDB_CONFIG', 'user'),
    'password': config.get('DB_INFLUXDB_CONFIG', 'password'),
    'database': config.get('DB_INFLUXDB_CONFIG', 'database'),
}

# MySQL配置
mysql_config = {
    'host': config.get('DB_MYSQL_CONFIG', 'host'),
    'port': config.get('DB_MYSQL_CONFIG', 'port'),
    'user': config.get('DB_MYSQL_CONFIG', 'user'),
    'password': config.get('DB_MYSQL_CONFIG', 'password'),
    'database': config.get('DB_MYSQL_CONFIG', 'database'),
}

# 数据库字段对应关系
category_mapping = {
    "12061021C170051": ["9LnrgoqZ_yc", "8PQ3L0qJ_yc", "forBg507_yc"],
    "12061021C170052": ["BxTFGbJy_yc", "Gqdh47Kb_yc", "9PsyYoLB_yc"],
    "12061021C170253": ["KL4KLEW7_yc", "qNbkeDnT_yc", "EoSismcC_yc"],
    "12061021C170254": ["y7DsN31p_yc", "omtPgmvQ_yc", "2nS6RVk5_yc"],
    "120D20222230044": ["qMZducGo_yc", "eBUaw8mT_yc", "m1GWrLCS_yc"],
    "120D20222230158": ["GMjvuMy5_yc", "N4a4mpQK_yc", "6KVqFgX3_yc"],
    "120D20222230162": ["rHPiWfDv_yc", "tYn9vmyi_yc", "DGDSZ0e3_yc"],
    "120D20222230180": ["rJqVeGiA_yc", "Upc9daRk_yc", "Qr8t2dpk_yc"],
    "20050B0222110211": ["buvzdC8j_yc", "8dtoofVm_yc", "1c5tf9mH_yc"],
    "20050B0222110215": ["Vhq6VjWd_yc", "La1KqgGr_yc", "2mnQJpsM_yc"],
    "20050B0222110230": ["w4pGQ7Uh_yc", "6vWarpBn_yc", "hP8PnxRM_yc"]
}
# output_power_names = ["wNE7S0xc_yc", "MLpUH8AD_yc", "jKpykAkC_yc", "0Jb0FdYH_yc", "C3FSVxsZ_yc",
#                       "28gKpTav_yc", "GGgMeK5q_yc", "4isoym3R_yc", "5eQLce2Y_yc", "1VGhSzm5_yc", "MCGvNjXV_yc"]
output_power_names = ["forBg507_yc", "9PsyYoLB_yc", "EoSismcC_yc", "2nS6RVk5_yc", "m1GWrLCS_yc",
                      "6KVqFgX3_yc", "DGDSZ0e3_yc", "Qr8t2dpk_yc", "1c5tf9mH_yc", "2mnQJpsM_yc", "hP8PnxRM_yc"]
day_power_names = ["9LnrgoqZ_yc", "BxTFGbJy_yc", "KL4KLEW7_yc", "y7DsN31p_yc", "qMZducGo_yc",
                   "GMjvuMy5_yc", "rHPiWfDv_yc", "rJqVeGiA_yc", "buvzdC8j_yc", "Vhq6VjWd_yc", "w4pGQ7Uh_yc"]
total_power_names = ["8PQ3L0qJ_yc", "Gqdh47Kb_yc", "qNbkeDnT_yc", "omtPgmvQ_yc", "eBUaw8mT_yc",
                     "N4a4mpQK_yc", "tYn9vmyi_yc", "Upc9daRk_yc", "8dtoofVm_yc", "La1KqgGr_yc", "6vWarpBn_yc"]
sql_fields = ["trans_date", "day_power", "total_power", "output_power"]


def format_time(time_str):
    formats = ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ"]

    for fmt in formats:
        try:
            dt = datetime.strptime(time_str, fmt)
            # Save as 年月日 时分秒
            formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
            return formatted_time
        except ValueError:
            pass

    raise ValueError(f"Time data '{time_str}' does not match any of the expected formats")





def get_inverter_data(mysql_cursor, sn, trans_date):
    # 查询逆变器信息表中的id
    query = f'''
        SELECT id FROM ods_mk_inverter_data
        WHERE inverter_info_id = "{sn}" and trans_date ="{trans_date}"
    '''
    # print(query)
    mysql_cursor.execute(query)
    result = mysql_cursor.fetchone()
    return result[0] if result else None


def get_inverter_info_id(mysql_cursor, sn):
    # 查询逆变器信息表中的id
    query = f'''
        SELECT id FROM ods_mk_inverter_info
        WHERE sn = "{sn}"
    '''
    mysql_cursor.execute(query)
    result = mysql_cursor.fetchone()
    return result[0] if result else None


def convert_utc_to_eight_hours(time_str):
    # Parse the string to a datetime object
    utc_time = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')

    # Assign UTC timezone to the datetime object
    utc_time = pytz.utc.localize(utc_time)

    # Convert to East Eight timezone
    east_eight_time = utc_time.astimezone(pytz.timezone('Asia/Shanghai'))

    # Return the formatted time string
    return east_eight_time.strftime('%Y-%m-%d %H:%M:%S')


def insert_data_to_mysql(mysql_cursor, table, sql_fields, category_mapping, data):
    values_str_list = []
    filtered_data = {key: value for key, value in data.items() if any(v is not None for v in value.values())}
    print(filtered_data)
    for category, names in filtered_data.items():

        print(data[category])

        trans_date = convert_utc_to_eight_hours(
            data[category]["time"])  # Assuming each category has a field named "time"

        # trans_date_str = trans_date.replace(" ", " ")  # Replace space with T in the datetime string
        # sn = data[category]["sn"]  # Assuming each category has a field named "sn"
        inverter_info_id = get_inverter_info_id(mysql_cursor, category)

        # 查询逆变器信息表,获取对应的id
        is_exit = get_inverter_data(mysql_cursor, inverter_info_id, trans_date)
        if is_exit:
            # 如果记录存在,执行更新操作
            # update_query = f'''
            #     UPDATE {table}
            #     SET {', '.join([f"{field}={data[category][name]}" for name, field in zip(names, sql_fields)])}
            #     WHERE inverter_info_id = {inverter_info_id} AND trans_date = "{trans_date}"
            # '''
            # mysql_cursor.execute(update_query)
            print("----已经存在拉")
        else:
            # 如果记录不存在,执行插入操作

            values_str = ', '.join(
                [f'{data[category][name]}' if data[category][name] is not None else 'NULL' for name in names if
                 name != 'time']
            )
            print("----不经存-添加:", values_str)
            # values_str = ', '.join([f'{data[category][name]}' for name in names])
            values_str_list.append(
                f'({inverter_info_id}, "{trans_date}", {values_str}, now(), now(),"{trans_date.split(" ")[0]}")')

    if values_str_list:
        values_str = ', '.join(values_str_list)

        # 构建插入语句
        query = f'''
            INSERT INTO {table} (inverter_info_id,  {', '.join(sql_fields)}, created_at, updated_at,info_date)
            VALUES {values_str}
        '''
        print(query)

        # 执行插入语句
        mysql_cursor.execute(query)


def getMin(min_stamp):
    # 获取当前时间(带有时区信息)
    current_time = datetime.now(pytz.utc)

    # 计算当前时间的前 min_stamp 分钟
    new_time = current_time - timedelta(minutes=min_stamp)

    # 返回带有 UTC 时区信息的时间字符串
    return new_time.strftime('%Y-%m-%dT%H:%M:%SZ')


def query_inlfux_to_mysql():
    min_start = 16
    min_end = 11

    start_time_str = getMin(min_start)
    end_time_str = getMin(min_end)

    # 计算当前时间的前1分钟和前6分钟
    # temp = datetime.strptime(current_time - timedelta(minutes=130), '%Y-%m-%d %H:%M:%S.%f')
    # end_time_str = temp.strftime('%Y-%m-%dT%H:%M:%SZ')
    # start_time_str = datetime.strptime(current_time - timedelta(minutes=136), '%Y-%m-%d %H:%M:%S.%f').strftime(
    #     '%Y-%m-%dT%H:%M:%SZ')
    print("--------------start_time_str:{", start_time_str, "},end_time_str:{", end_time_str, "}")
    # 连接到InfluxDB
    influxdb_client = InfluxDBClient(**influxdb_config)

    # 连接到MySQL
    mysql_connection = mysql.connector.connect(**mysql_config)
    mysql_cursor = mysql_connection.cursor()

    # 构建查询参数
    # names_conditions = ' OR '.join([f'"name" = \'{name}\'' for name in  (rated_power_names + output_power_names + day_power_names + total_power_names)])
    names_conditions = ' OR '.join(
        [f'"name" = \'{name}\'' for name in (output_power_names + day_power_names + total_power_names)])

    query = f'''
                SELECT "time", "name", "value"
                FROM "bjtnyh1q1"
                WHERE ({names_conditions})
--                 AND time >= '2024-01-24T16:55:00Z' AND time < '2024-01-24T17:10:00Z'
                AND time >= '{start_time_str}' AND time < '{end_time_str}'
            '''
    # --             AND time >= '2024-01-24T16:55:00Z' AND time < '2024-01-24T17:10:00Z'
    print(query)
    # 获取查询结果
    result = influxdb_client.query(query)
    # 处理查询结果并插入到MySQL表中
    print(result)
    if len(result) <= 0:
        return
    data = {category: {name: None for name in names} for category, names in category_mapping.items()}

    for point in result.get_points():
        # print(point)

        time = point['time']
        name = point['name']
        value = point['value']

        # 根据 name 和 category 映射到对应的数据结构
        for category, names in category_mapping.items():
            # print(category)
            # print(names)

            if name in names:
                data[category][name] = value
                data[category]["time"] = str(format_time(time))

                # 插入到 MySQL

    print(data)
    insert_data_to_mysql(mysql_cursor, 'ods_mk_inverter_data', sql_fields, category_mapping, data)

    # 提交并关闭连接
    mysql_connection.commit()
    mysql_connection.close()
    influxdb_client.close()


# schedule_config = {
#     "daily_job_time": config.get('SCHEDULE_CONFIG', 'daily_job_time'),
#     "daily_job_time1": config.get('SCHEDULE_CONFIG', 'daily_job_time1')
# }

# schedule.every().day.at(schedule_config['daily_job_time1']).do(job)

# 手动执行一次
query_inlfux_to_mysql()
# 设置每5分钟执行一次
schedule.every(5).minutes.do(query_inlfux_to_mysql)

while True:
    schedule.run_pending()
    time.sleep(1)

if __name__ == '__main__':
    # query_inlfux_to_mysql('', '')
    # Calculate the start time as one year ago from the current time
    start_time = datetime.utcnow() - timedelta(days=1)
    while start_time < datetime.utcnow():
        # Round down to the nearest 5 minutes
        start_time = start_time.replace(minute=(start_time.minute // 5) * 5, second=0, microsecond=0)

        # Format the start and end times
        start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        end_time = start_time + timedelta(minutes=5)
        end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        print(start_time_str, end_time_str)
        query_inlfux_to_mysql(start_time_str, end_time_str)
        # Increment the start time for the next iteration
        start_time = end_time
        # time.sleep(5 * 60)  # Sleep for 5 minutes (in seconds)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/175558
推荐阅读
相关标签
  

闽ICP备14008679号