当前位置:   article > 正文

python利用pandas.DataFrame批量写入clickhouse_python 快速写入量数据 clickhouse

python 快速写入量数据 clickhouse

python 包引入 

  1. import json
  2. import logging
  3. import math
  4. import os
  5. import pandas as pd
  6. import datetime
  7. import requests
  8. from clickhouse_driver import Client
  9. from cmc.config import config
  10. #在类外定义全局变量,这样当前这个py文件都可以共享
  11. process_date = datetime.datetime.now().strftime("%Y%m%d")
  12. class MyPyClassDemo:
  13. def __init__(self, api_key: str):
  14. self.api_key = api_key
  15. def tstFuctions(self):
  16. pass
  17. ....
  1. def getClickHouseClient(self):
  2. try:
  3. host_name = 'xxxx.xxx.com'
  4. client = Client(
  5. host=host_name,
  6. database='your db name',
  7. user='root',
  8. password='123123',
  9. send_receive_timeout=20,
  10. settings={'use_numpy': True}
  11. )
  12. return client
  13. except Exception as e:
  14. print("Error: "+str(e))
  15. return None

注意这里一定要有 settings={'use_numpy': True} 这个设置,否则会报错:

TypeError: Unsupported column type: <class 'numpy.ndarray'>. list or tuple is expected.

  1. def process_json_files(self):
  2. tmp_dir = 'out/cmc_data/'
  3. files = os.listdir(tmp_dir) #获取当前目录下的所有文件名称列表
  4. print(files)
  5. storage_client = self.getClickHouseClient() #加载CH数据库连接
  6. for file in files:
  7. if not file.startswith('cmc_projects'):
  8. continue
  9. with open(tmp_dir + file, 'r') as f: #根据相对路径读取json文件
  10. json_string = f.read() #获取json字符串
  11. data_list = json.loads(json_string) #转成list类型
  12. #df = pd.DataFrame.from_dict(json_normalize(data_list), orient='columns') #按列读取(也包含json格式当中嵌套的列),用全部读取到的列来构造dFrame当中的类
  13. df = pd.json_normalize(data_list) # 新版本的推荐写法
  14. #print(df.T) # 打印读取到的列
  15. insert_df = df[['id', 'name', 'symbol', 'slug', 'rank', 'is_active', 'first_historical_data', 'platform.id', 'platform.name', 'platform.symbol', 'platform.slug', 'platform.token_address']] # 抽取指定的列,重组新的dframe
  16. insert_df.insert(loc=12, column='update_time', value=process_date) # 在新重组的dframe当中插入一列(数据写入的日期)
  17. insert_df["platform.id"] = insert_df["platform.id"].apply(self.modify) # 通过apply() 修改某一列的值
  18. #insert_df = insert_df.loc[0:1] # (调试)取第0行
  19. #insert_df.iloc[:,0:12] # (调试)取0-12列
  20. insert_df.rename(columns={"platform.id": "platform_id", "platform.name": "platform_name", "platform.symbol": "platform_symbol", "platform.slug": "platform_slug", "platform.token_address": "token_address"}, inplace=True) # 要求dframe当中的列字段必须与CH数据库当中的列字段一一对应,否则报keyError错
  21. #print(insert_df)
  22. #print(type(insert_df["platform_id"]))
  23. storage_client.insert_dataframe('INSERT INTO tstdb.ods_infos (*) VALUES', insert_df) # 用CH提供的client批量将的frame当中的数据一次刷入CH当中

注意: 这里一次批量刷入CH的条数,取决于json文件当中的数据条数,可在源文件或者data_list

的位置做数据量的控制操作

  1. def modify(self, id):
  2. if math.isnan(id) : # python <class 'float'>判断是nan
  3. return int(0)
  4. else :
  5. return int(id)
  6. return id

CH建表语句和说明 

  1. CREATE TABLE tst_db.ods_infos (
  2. id UInt32,
  3. name String,
  4. symbol String,
  5. slug String,
  6. rank UInt32,
  7. is_active UInt32,
  8. first_historical_data String,
  9. platform_id UInt32,
  10. platform_name String,
  11. platform_symbol String,
  12. platform_slug String,
  13. token_address String,
  14. update_time String
  15. )
  16. ENGINE = ReplacingMergeTree
  17. PARTITION BY update_time
  18. PRIMARY KEY id -- 这里一定要设置主键,插入相同key值的数据才会覆盖跟新,否则记录会重复
  19. ORDER BY (
  20. id,
  21. name,
  22. symbol,
  23. platform_id,
  24. platform_name
  25. )
  26. SETTINGS index_granularity = 8192,
  27. storage_policy = 'policy_name_eth';


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/526463
推荐阅读
相关标签
  

闽ICP备14008679号