赞
踩
1、首先 influxdb 1.x 版本的查询很多无法使用高级函数
2、存入 influxdb中的时间为UTC需要转成东八区
influxdb中数据结构表是meansurement
里面存的字段是field
我们需要查找 field中指定的key(字符串)
如下面代码中的category_mapping
字段中的value
然后根据这些field中的key,对应的value值,组装成一行存入下面mysql
import configparser
import logging
from datetime import datetime, timedelta
import time
import pytz
import schedule as schedule
from influxdb import InfluxDBClient
import mysql.connector
# from src.config import DB_CONFIG
logger = logging.getLogger(__name__)
config = configparser.ConfigParser()
config.read('config.ini')
# InfluxDB配置
influxdb_config = {
'host': config.get('DB_INFLUXDB_CONFIG', 'host'),
'port': config.get('DB_INFLUXDB_CONFIG', 'port'),
'username': config.get('DB_INFLUXDB_CONFIG', 'user'),
'password': config.get('DB_INFLUXDB_CONFIG', 'password'),
'database': config.get('DB_INFLUXDB_CONFIG', 'database'),
}
# MySQL配置
mysql_config = {
'host': config.get('DB_MYSQL_CONFIG', 'host'),
'port': config.get('DB_MYSQL_CONFIG', 'port'),
'user': config.get('DB_MYSQL_CONFIG', 'user'),
'password': config.get('DB_MYSQL_CONFIG', 'password'),
'database': config.get('DB_MYSQL_CONFIG', 'database'),
}
# 数据库字段对应关系
category_mapping = {
"12061021C170051": ["9LnrgoqZ_yc", "8PQ3L0qJ_yc", "forBg507_yc"],
"12061021C170052": ["BxTFGbJy_yc", "Gqdh47Kb_yc", "9PsyYoLB_yc"],
"12061021C170253": ["KL4KLEW7_yc", "qNbkeDnT_yc", "EoSismcC_yc"],
"12061021C170254": ["y7DsN31p_yc", "omtPgmvQ_yc", "2nS6RVk5_yc"],
"120D20222230044": ["qMZducGo_yc", "eBUaw8mT_yc", "m1GWrLCS_yc"],
"120D20222230158": ["GMjvuMy5_yc", "N4a4mpQK_yc", "6KVqFgX3_yc"],
"120D20222230162": ["rHPiWfDv_yc", "tYn9vmyi_yc", "DGDSZ0e3_yc"],
"120D20222230180": ["rJqVeGiA_yc", "Upc9daRk_yc", "Qr8t2dpk_yc"],
"20050B0222110211": ["buvzdC8j_yc", "8dtoofVm_yc", "1c5tf9mH_yc"],
"20050B0222110215": ["Vhq6VjWd_yc", "La1KqgGr_yc", "2mnQJpsM_yc"],
"20050B0222110230": ["w4pGQ7Uh_yc", "6vWarpBn_yc", "hP8PnxRM_yc"]
}
# output_power_names = ["wNE7S0xc_yc", "MLpUH8AD_yc", "jKpykAkC_yc", "0Jb0FdYH_yc", "C3FSVxsZ_yc",
# "28gKpTav_yc", "GGgMeK5q_yc", "4isoym3R_yc", "5eQLce2Y_yc", "1VGhSzm5_yc", "MCGvNjXV_yc"]
output_power_names = ["forBg507_yc", "9PsyYoLB_yc", "EoSismcC_yc", "2nS6RVk5_yc", "m1GWrLCS_yc",
"6KVqFgX3_yc", "DGDSZ0e3_yc", "Qr8t2dpk_yc", "1c5tf9mH_yc", "2mnQJpsM_yc", "hP8PnxRM_yc"]
day_power_names = ["9LnrgoqZ_yc", "BxTFGbJy_yc", "KL4KLEW7_yc", "y7DsN31p_yc", "qMZducGo_yc",
"GMjvuMy5_yc", "rHPiWfDv_yc", "rJqVeGiA_yc", "buvzdC8j_yc", "Vhq6VjWd_yc", "w4pGQ7Uh_yc"]
total_power_names = ["8PQ3L0qJ_yc", "Gqdh47Kb_yc", "qNbkeDnT_yc", "omtPgmvQ_yc", "eBUaw8mT_yc",
"N4a4mpQK_yc", "tYn9vmyi_yc", "Upc9daRk_yc", "8dtoofVm_yc", "La1KqgGr_yc", "6vWarpBn_yc"]
sql_fields = ["trans_date", "day_power", "total_power", "output_power"]
def format_time(time_str):
formats = ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ"]
for fmt in formats:
try:
dt = datetime.strptime(time_str, fmt)
# Save as 年月日 时分秒
formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
return formatted_time
except ValueError:
pass
raise ValueError(f"Time data '{time_str}' does not match any of the expected formats")
def get_inverter_data(mysql_cursor, sn, trans_date):
# 查询逆变器信息表中的id
query = f'''
SELECT id FROM ods_mk_inverter_data
WHERE inverter_info_id = "{sn}" and trans_date ="{trans_date}"
'''
# print(query)
mysql_cursor.execute(query)
result = mysql_cursor.fetchone()
return result[0] if result else None
def get_inverter_info_id(mysql_cursor, sn):
# 查询逆变器信息表中的id
query = f'''
SELECT id FROM ods_mk_inverter_info
WHERE sn = "{sn}"
'''
mysql_cursor.execute(query)
result = mysql_cursor.fetchone()
return result[0] if result else None
def convert_utc_to_eight_hours(time_str):
# Parse the string to a datetime object
utc_time = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
# Assign UTC timezone to the datetime object
utc_time = pytz.utc.localize(utc_time)
# Convert to East Eight timezone
east_eight_time = utc_time.astimezone(pytz.timezone('Asia/Shanghai'))
# Return the formatted time string
return east_eight_time.strftime('%Y-%m-%d %H:%M:%S')
def insert_data_to_mysql(mysql_cursor, table, sql_fields, category_mapping, data):
values_str_list = []
filtered_data = {key: value for key, value in data.items() if any(v is not None for v in value.values())}
print(filtered_data)
for category, names in filtered_data.items():
print(data[category])
trans_date = convert_utc_to_eight_hours(
data[category]["time"]) # Assuming each category has a field named "time"
# trans_date_str = trans_date.replace(" ", " ") # Replace space with T in the datetime string
# sn = data[category]["sn"] # Assuming each category has a field named "sn"
inverter_info_id = get_inverter_info_id(mysql_cursor, category)
# 查询逆变器信息表,获取对应的id
is_exit = get_inverter_data(mysql_cursor, inverter_info_id, trans_date)
if is_exit:
# 如果记录存在,执行更新操作
# update_query = f'''
# UPDATE {table}
# SET {', '.join([f"{field}={data[category][name]}" for name, field in zip(names, sql_fields)])}
# WHERE inverter_info_id = {inverter_info_id} AND trans_date = "{trans_date}"
# '''
# mysql_cursor.execute(update_query)
print("----已经存在拉")
else:
# 如果记录不存在,执行插入操作
values_str = ', '.join(
[f'{data[category][name]}' if data[category][name] is not None else 'NULL' for name in names if
name != 'time']
)
print("----不经存-添加:", values_str)
# values_str = ', '.join([f'{data[category][name]}' for name in names])
values_str_list.append(
f'({inverter_info_id}, "{trans_date}", {values_str}, now(), now(),"{trans_date.split(" ")[0]}")')
if values_str_list:
values_str = ', '.join(values_str_list)
# 构建插入语句
query = f'''
INSERT INTO {table} (inverter_info_id, {', '.join(sql_fields)}, created_at, updated_at,info_date)
VALUES {values_str}
'''
print(query)
# 执行插入语句
mysql_cursor.execute(query)
def getMin(min_stamp):
# 获取当前时间(带有时区信息)
current_time = datetime.now(pytz.utc)
# 计算当前时间的前 min_stamp 分钟
new_time = current_time - timedelta(minutes=min_stamp)
# 返回带有 UTC 时区信息的时间字符串
return new_time.strftime('%Y-%m-%dT%H:%M:%SZ')
def query_inlfux_to_mysql():
min_start = 16
min_end = 11
start_time_str = getMin(min_start)
end_time_str = getMin(min_end)
# 计算当前时间的前1分钟和前6分钟
# temp = datetime.strptime(current_time - timedelta(minutes=130), '%Y-%m-%d %H:%M:%S.%f')
# end_time_str = temp.strftime('%Y-%m-%dT%H:%M:%SZ')
# start_time_str = datetime.strptime(current_time - timedelta(minutes=136), '%Y-%m-%d %H:%M:%S.%f').strftime(
# '%Y-%m-%dT%H:%M:%SZ')
print("--------------start_time_str:{", start_time_str, "},end_time_str:{", end_time_str, "}")
# 连接到InfluxDB
influxdb_client = InfluxDBClient(**influxdb_config)
# 连接到MySQL
mysql_connection = mysql.connector.connect(**mysql_config)
mysql_cursor = mysql_connection.cursor()
# 构建查询参数
# names_conditions = ' OR '.join([f'"name" = \'{name}\'' for name in (rated_power_names + output_power_names + day_power_names + total_power_names)])
names_conditions = ' OR '.join(
[f'"name" = \'{name}\'' for name in (output_power_names + day_power_names + total_power_names)])
query = f'''
SELECT "time", "name", "value"
FROM "bjtnyh1q1"
WHERE ({names_conditions})
-- AND time >= '2024-01-24T16:55:00Z' AND time < '2024-01-24T17:10:00Z'
AND time >= '{start_time_str}' AND time < '{end_time_str}'
'''
# -- AND time >= '2024-01-24T16:55:00Z' AND time < '2024-01-24T17:10:00Z'
print(query)
# 获取查询结果
result = influxdb_client.query(query)
# 处理查询结果并插入到MySQL表中
print(result)
if len(result) <= 0:
return
data = {category: {name: None for name in names} for category, names in category_mapping.items()}
for point in result.get_points():
# print(point)
time = point['time']
name = point['name']
value = point['value']
# 根据 name 和 category 映射到对应的数据结构
for category, names in category_mapping.items():
# print(category)
# print(names)
if name in names:
data[category][name] = value
data[category]["time"] = str(format_time(time))
# 插入到 MySQL
print(data)
insert_data_to_mysql(mysql_cursor, 'ods_mk_inverter_data', sql_fields, category_mapping, data)
# 提交并关闭连接
mysql_connection.commit()
mysql_connection.close()
influxdb_client.close()
# schedule_config = {
# "daily_job_time": config.get('SCHEDULE_CONFIG', 'daily_job_time'),
# "daily_job_time1": config.get('SCHEDULE_CONFIG', 'daily_job_time1')
# }
# schedule.every().day.at(schedule_config['daily_job_time1']).do(job)
# 手动执行一次
query_inlfux_to_mysql()
# 设置每5分钟执行一次
schedule.every(5).minutes.do(query_inlfux_to_mysql)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
# query_inlfux_to_mysql('', '')
# Calculate the start time as one year ago from the current time
start_time = datetime.utcnow() - timedelta(days=1)
while start_time < datetime.utcnow():
# Round down to the nearest 5 minutes
start_time = start_time.replace(minute=(start_time.minute // 5) * 5, second=0, microsecond=0)
# Format the start and end times
start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
end_time = start_time + timedelta(minutes=5)
end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
print(start_time_str, end_time_str)
query_inlfux_to_mysql(start_time_str, end_time_str)
# Increment the start time for the next iteration
start_time = end_time
# time.sleep(5 * 60) # Sleep for 5 minutes (in seconds)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。