赞
踩
python 包引入
- import json
- import logging
- import math
- import os
- import pandas as pd
- import datetime
- import requests
- from clickhouse_driver import Client
- from cmc.config import config
-
- #在类外定义全局变量,这样当前这个py文件都可以共享
- process_date = datetime.datetime.now().strftime("%Y%m%d")
-
- class MyPyClassDemo:
-
- def __init__(self, api_key: str):
- self.api_key = api_key
-
- def tstFuctions(self):
- pass
- ....
- def getClickHouseClient(self):
- try:
- host_name = 'xxxx.xxx.com'
- client = Client(
- host=host_name,
- database='your db name',
- user='root',
- password='123123',
- send_receive_timeout=20,
- settings={'use_numpy': True}
- )
- return client
- except Exception as e:
- print("Error: "+str(e))
- return None
注意这里一定要有 settings={'use_numpy': True} 这个设置,否则会报错:
TypeError: Unsupported column type: <class 'numpy.ndarray'>. list or tuple is expected.
- def process_json_files(self):
- tmp_dir = 'out/cmc_data/'
- files = os.listdir(tmp_dir) #获取当前目录下的所有文件名称列表
- print(files)
-
- storage_client = self.getClickHouseClient() #加载CH数据库连接
- for file in files:
- if not file.startswith('cmc_projects'):
- continue
- with open(tmp_dir + file, 'r') as f: #根据相对路径读取json文件
- json_string = f.read() #获取json字符串
- data_list = json.loads(json_string) #转成list类型
- #df = pd.DataFrame.from_dict(json_normalize(data_list), orient='columns') #按列读取(也包含json格式当中嵌套的列),用全部读取到的列来构造dFrame当中的类
- df = pd.json_normalize(data_list) # 新版本的推荐写法
- #print(df.T) # 打印读取到的列
- insert_df = df[['id', 'name', 'symbol', 'slug', 'rank', 'is_active', 'first_historical_data', 'platform.id', 'platform.name', 'platform.symbol', 'platform.slug', 'platform.token_address']] # 抽取指定的列,重组新的dframe
- insert_df.insert(loc=12, column='update_time', value=process_date) # 在新重组的dframe当中插入一列(数据写入的日期)
- insert_df["platform.id"] = insert_df["platform.id"].apply(self.modify) # 通过apply() 修改某一列的值
- #insert_df = insert_df.loc[0:1] # (调试)取第0行
- #insert_df.iloc[:,0:12] # (调试)取0-12列
- insert_df.rename(columns={"platform.id": "platform_id", "platform.name": "platform_name", "platform.symbol": "platform_symbol", "platform.slug": "platform_slug", "platform.token_address": "token_address"}, inplace=True) # 要求dframe当中的列字段必须与CH数据库当中的列字段一一对应,否则报keyError错
- #print(insert_df)
- #print(type(insert_df["platform_id"]))
- storage_client.insert_dataframe('INSERT INTO tstdb.ods_infos (*) VALUES', insert_df) # 用CH提供的client批量将的frame当中的数据一次刷入CH当中
注意: 这里一次批量刷入CH的条数,取决于json文件当中的数据条数,可在源文件或者data_list
的位置做数据量的控制操作
- def modify(self, id):
- if math.isnan(id) : # python <class 'float'>判断是nan
- return int(0)
- else :
- return int(id)
- return id
CH建表语句和说明
- CREATE TABLE tst_db.ods_infos (
- id UInt32,
- name String,
- symbol String,
- slug String,
- rank UInt32,
- is_active UInt32,
- first_historical_data String,
- platform_id UInt32,
- platform_name String,
- platform_symbol String,
- platform_slug String,
- token_address String,
- update_time String
- )
- ENGINE = ReplacingMergeTree
- PARTITION BY update_time
- PRIMARY KEY id -- 这里一定要设置主键,插入相同key值的数据才会覆盖跟新,否则记录会重复
- ORDER BY (
- id,
- name,
- symbol,
- platform_id,
- platform_name
- )
- SETTINGS index_granularity = 8192,
- storage_policy = 'policy_name_eth';
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。