赞
踩
为了避免反复的手手工从后台数据库导出某些数据表到Excel文件、高效率到多份离线数据。
支持一次性导出多个数据源表、自动获取各表的字段名。
支持控制批次的写入速率。例如:每5000行一个批次写入到excel。
支持结构相同的表导入到同一个Excel文件。可适用于经过水平切分后的分布式表。
1、概览
A[创建类] -->|方法1| B(创建数据库连接)
A[创建类] -->|方法2| C(取查询结果集)
A[创建类] -->|方法3| D(利用句柄写入Excel)
A[创建类] -->|方法4| E(读取多个源表)
B(创建数据库连接) -->U(调用示例)
C(取查询结果集) -->U(调用示例)
D(利用句柄写入Excel) -->U(调用示例)
E(读取多个源表) -->U(调用示例)
2、主要方法
首先需要安装第三方库pymssql实现对SQLServer的连接访问,自定义方法__getConn()需要指定如下五个参数:服务器host、登录用户名user、登录密码pwd、指定的数据库db、字符编码charset。连接成功后,通过cursor()获取游标对象,它将用来执行数据库脚本,并得到返回结果集和数据总量。
创建数据库连接和执行SQL的源码:
def __init__(self,host,user,pwd,db):
self.host = host
self.user = user
self.pwd = pwd
self.db = db
def __getConn(self):
if not self.db:
raise(NameError,'没有设置数据库信息')
self.conn = pymssql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db, charset='utf8')
cur = self.conn.cursor()
if not cur:
raise(NameError,'连接数据库失败')
else:
return cur
3、方法3中写入Excel时,注意一定要用到Pandas中的公共句柄ExcelWriter对象writer。当数据被分批多次写入同一个文件时,如果直接使用to_excel()方法,则前面批次的结果集将会被后续结果覆盖。增加了这个公共句柄限制后,后面的写入会累加到前面写入的数据尾部行,而不是全部覆盖。
writer = pd.ExcelWriter(file)
df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow)
分批次写入到目标Excel时的另一个要注意的参数是写入行startrow的设置。每次写入完成后需要重新指下一批次数据的初始位置值。每个批次的数据会记录各自的所属批次信息。
利用关键字参数**args 指定多个数据源表和数据库连接。
def exportToExcel(self, **args):
for sourceTB in args['sourceTB']:
arc_dict = dict(
sourceTB = sourceTB,
path=args['path'],
startRow=args['startRow'],
isHeader=args['isHeader'],
batch=args['batch']
)
print('\n当前导出的数据表为:%s' %(sourceTB))
self.writeToExcel(**arc_dict)
return 'success'
#!/usr/bin/env python # coding: utf-8 # 主要功能:分批次导出大数据量、结构相同的数据表到excel # 导出多个表的数据到各自的文件, # 目前问题:to_excel 虽然设置了分批写入,但先前的数据会被下一次写入覆盖, # 利用Pandas包中的ExcelWriter()方法增加一个公共句柄,在写入新的数据之时保留原来写入的数据,等到把所有的数据都写进去之后关闭这个句柄 import pymssql import pandas as pd import datetime import math class MSSQL(object): def __init__(self,host,user,pwd,db): self.host = host self.user = user self.pwd = pwd self.db = db def __getConn(self): if not self.db: raise(NameError,'没有设置数据库信息') self.conn = pymssql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db, charset='utf8') cur = self.conn.cursor() if not cur: raise(NameError,'连接数据库失败') else: return cur def executeQuery(self,sql): cur = self.__getConn() cur.execute(sql) # 获取所有数据集 # fetchall()获取结果集中的剩下的所有行 # 如果数据量太大,是否需要分批插入 resList, rowcount = cur.fetchall(),cur.rowcount self.conn.close() return (resList, rowcount) # 导出单个数据表到excel def writeToExcel(self,**args): sourceTB = args['sourceTB'] columns = args.get('columns') path=args['path'] fname=args.get('fname') startRow=args['startRow'] isHeader=args['isHeader'] N=args['batch'] # 获取指定源数据列 if columns is None: columns_select = ' * ' else: columns_select = ','.join(columns) if fname is None: fname=sourceTB+'_exportData.xlsx' file = path + fname # 增加一个公共句柄,写入新数据时,保留原数据 writer = pd.ExcelWriter(file) sql_select = 'select '+ columns_select + ' from '+ sourceTB fetch_data, rowcount = self.executeQuery(sql_select) # print(rowcount) df_fetch_data = pd.DataFrame(fetch_data) # 一共有roucount行数据,每N行一个batch提交写入到excel times = math.floor(rowcount/N) i = 1 rs_startrow = 0 # 当总数据量 > 每批插入的数据量时 print(i, times) is_while=0 while i <= times: is_while = 1 # 如果是首次,且指定输入标题,则有标题 if i==1: # isHeader = True startRow = 1 else: # isHeader = False startRow+=N # 切片取指定的每个批次的数据行 ,前闭后开 # startrow: 写入到目标文件的起始行。0表示第1行,1表示第2行。。。 df_fetch_data['batch'] = 'batch'+str(i) df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow) print('第',str(i),'次循环,取源数据第',rs_startrow,'行至',i*N,'行','写入到第',startRow,'行') print('第',str(i),'次写入数据为:',df_fetch_data[rs_startrow:i*N]) # 重新指定源数据的读取起始行 rs_startrow =i * N i+=1 # 写入文件的开始行数 # 当没有做任何循环时,仍然从第一行开始写入 if is_while == 0: startRow = startRow else: startRow+=N df_fetch_data['batch'] = 'batch'+str(i) print('第{0}次读取数据,从第{1}行开始,写入到第{2}行!'.format(str(i), str(rs_startrow), str(startRow))) print('第',str(i),'写入数据为:',df_fetch_data[rs_startrow:i*N]) df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow) # 注: 这里一定要saver()将数据从缓存写入磁盘!!!!!!!!!!!!!!!!!!!!!1 writer.save() start_time=datetime.datetime.now() # 导出结构相同的多个表到同一样excel def exportToExcel(self, **args): for sourceTB in args['sourceTB']: arc_dict = dict( sourceTB = sourceTB, path=args['path'], startRow=args['startRow'], isHeader=args['isHeader'], batch=args['batch'] ) print('\n当前导出的数据表为:%s' %(sourceTB)) self.writeToExcel(**arc_dict) return 'success' start_time=datetime.datetime.now() if __name__ == "__main__": ms = MSSQL(host="localhost",user="test",pwd="test",db="db_jun") args = dict( sourceTB = ['tb2', 'tb1'],# 待导出的表 path='D:\\myPC\\Python\\',# 导出到指定路径 startRow=1,#设定写入文件的首行,第2行为数据首行 isHeader=False,# 是否包含源数据的标题 batch=5 ) # 导出多个文件 ms.exportToExcel(**args)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。