赞
踩
系统发出调查问卷,问卷的填写记录入了数据库,现在需要对用户的调查问卷填写情况进行统计。
这里使用Python对数据进行简单的处理,并将统计结果保存为excel文件。
因为数据存储在MySQL,我使用pymysql库连接数据库。
这里编写了三条查询SQL语句,将数据分为三部分:调查问卷基本信息、调查问卷填写详情、调查问卷问题和选项。
将这三部分数据保存在一个列表中,方便后续处理。
- import traceback
- from pprint import pprint
- import pymysql
- import yaml
-
-
- def data_detail(dbinfo, survey_name):
- """
- 连接数据库查询详细数据,保存为excel
- :param dbinfo: 数据库连接信息
- :param survey_name: 调查名称
- :return: 数据库查询结果
- """
- # SQL列表示例
- sql_list = [f'调查问卷基本信息查询SQL:select * from table1 where name={survey_name}', '调查问卷填写详情查询SQL', '调查问卷问题和选项查询SQL']
- data = [execute_sql(dbinfo, sql) for sql in sql_list]
- return data
-
-
- def execute_sql(dbinfo, sql):
- """
- 连接数据库,执行sql查询语句,返回查询结果
- :param dbinfo: 数据库配置信息
- :param sql: 数据库查询语句
- :return:
- """
- try:
- conn = pymysql.connect(**dbinfo) # 连接数据库
- cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 数据库游标
- cursor.execute(sql) # 执行sql语句
- results = cursor.fetchall() # 查询结果
- conn.close() # 关闭数据库连接
- return results
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
-
- def yaml_read(filename):
- """
- 读取yaml文件
- """
- with open(file=filename, mode="r", encoding="utf-8") as f:
- data = yaml.safe_load(f.read())
- return data
-
-
- if __name__ == "__main__":
- db_info = yaml_read('cfg_mysql_test.yaml')
- datas = data_detail(db_info, '调查问卷数据统计测试')
- pprint(datas)
运行结果:
将查询的数据结果保存到同一个excel表格中,用于后续分析统计。
先看一下查询的数据结果示例:[[{k:v,k:v},{k:v,k:v}],[{k:v,k:v},{k:v,k:v}],[{k:v,k:v},{k:v,k:v}]]
从内到外:字典-列表-字典
这里我做了如下处理:
1.使用xlwt模块写入数据
2.查询结果写入到同一个表格的多个工作表
3.xlwt.xfstyle()设置表格的样式:表头、表格、时间数据
- import datetime
- import os
- import time
- import traceback
- import xlwt
-
-
- def to_excel(data, sheet_name=None, file_name=None):
- """
- 数据写入excel
- :param data: 数据查询结果
- :param sheet_name: 工作表名称
- :param file_name: excel文件名称
- :return:
- """
- try:
- book = xlwt.Workbook(encoding='utf-8') # 新建工作簿
- new_datas = data_pre_handle(data) # 预处理数据
- if sheet_name is None:
- sheet_name = ["new_sheet" + str(i + 1) for i in range(len(new_datas))]
- write_data(book, new_datas, sheet_name) # 数据写入工作表
- save_excel(book, file_name) # 保存工作簿
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
-
- def save_excel(workbook, file_name=None):
- """
- 保存工作簿
- :param workbook: 工作簿
- :param file_name: excel文件名称
- :return:
- """
- if file_name is None:
- sj = datetime.datetime.now().strftime("%Y-%m-%d %H%M%S")
- name = sj + ".xlsx"
- else:
- name = file_name + ".xlsx"
- workbook.save(name)
- cur_file = os.path.dirname(os.path.abspath(__file__)) + os.sep + name # 当前excel的保存路径
- print(f"excel保存成功!【{cur_file}】")
-
-
- def data_pre_handle(data):
- """
- 预处理数据库查询到的数据
- :param data: 数据查询结果
- :return:
- """
- return [[list(v[0].keys())] + [list(val.values()) for index, val in enumerate(v)] for i, v in enumerate(data)]
-
-
- def write_data(workbook, data, sheet_name):
- """
- 数据写入sheet
- :param workbook: 工作簿
- :param data: 要写入的数据
- :param sheet_name: 工作表名称
- :return:
- """
- try:
- for index, value in enumerate(data):
- sheet = workbook.add_sheet(sheet_name[index], cell_overwrite_ok=True) # 新建sheet
- size = [12 for i in value[0]]
- head_style = style_head(sheet, size, wrap=1, is_bg=1) # 定义表头样式
- time_style = style_table_time() # 时间内容样式
- normal_style = style_table_normal({"horz": "CENTER", "vert": "CENTER"}) # 普通内容样式
- for col, val in enumerate(value[0]):
- sheet.write(0, col, val, head_style)
- print(f"写入表头成功!{value[0]}")
- for i, v in enumerate(value[1:]):
- for co, va in enumerate(v):
- val = str(va)
- if is_date(val):
- if len(val) > 19:
- new_val = datetime.datetime.strptime(val, "%Y-%m-%d %H:%M:%S.%f")
- elif 10 < len(val) <= 19:
- new_val = datetime.datetime.strptime(val, "%Y-%m-%d %H:%M:%S")
- else:
- new_val = datetime.datetime.strptime(val, '%Y-%m-%d')
- sheet.write(i + 1, co, new_val, time_style)
- elif is_number(val) == 1 and len(val) < 11:
- sheet.write(i + 1, co, int(val), normal_style)
- else:
- sheet.write(i + 1, co, val, normal_style)
- print(f"写入表格内容成功!共{len(value) - 1}行")
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
-
- def is_date(date_str):
- """
- 判断date_str是否为日期格式
- :param date_str:
- :return:
- """
- date_format = "%Y-%m-%d"
- try:
- if isinstance(date_str, str):
- new_str = date_str[:10]
- valid_date = time.strptime(new_str, date_format)
- return True
- else:
- return False
- except ValueError or TypeError as e:
- return False
-
-
- def is_number(val: str):
- """
- 判断字符串的数字类型
- :param val: 字符串
- :return: 1--整数,2--小数,0--非数字
- """
- try:
- float(val)
- if val.isdigit() or val.split('-')[-1].isdigit():
- return 1
- elif val.split('.')[0].isdigit() or val.split('-')[-1].split('.')[-1].isdigit():
- return 2
- else:
- return 0
- except ValueError:
- pass
- return 0
-
-
- def style_head(worksheet, size, wrap=0, is_bg=0, color=22):
- """
- 表头的样式
- :param worksheet: 表格
- :param size: 表格列宽
- :param wrap: 1--自动换行,默认不换行
- :param is_bg: 1--设置背景色,默认不设置
- :param color: 默认浅灰色背景
- :return:
- """
- style = xlwt.XFStyle()
- set_font(style, height=14)
- set_border(style)
- set_widths(worksheet, size)
- dicts = {"horz": "CENTER", "vert": "CENTER"}
- set_alignments(style, wrap=wrap, **dicts)
- if is_bg == 1:
- set_pattern(style, color=color)
- return style
-
-
- def style_table_normal(dicts: dict):
- """
- 普通表格内容样式
- :param dicts: 对齐方式
- :return:
- """
- style = xlwt.XFStyle()
- set_font(style, bold=False)
- set_border(style)
- set_alignments(style, wrap=1, **dicts)
- return style
-
-
- def style_table_time():
- """
- 时间格式表格内容样式
- :return:
- """
- style = xlwt.XFStyle()
- set_border(style)
- style.num_format_str = "yyyy/MM/dd HH:MM:SS"
- dicts = {"horz": "LEFT", "vert": "CENTER"}
- set_alignments(style, wrap=1, **dicts)
- return style
-
-
- def set_font(style, bold=True, name='宋体', height=11):
- """
- 设置字体,默认宋体加粗,高度11
- :param style:
- :param bold:
- :param name:
- :param height:
- :return:
- """
- style.font.bold = bold
- style.font.name = name
- style.font.height = 20 * height
-
-
- def set_border(style, status=1):
- """
- 设置边框
- :param style:
- :param status:
- :return:
- """
- style.borders.left = status
- style.borders.right = status
- style.borders.top = status
- style.borders.bottom = status
-
-
- def set_pattern(style, color=23):
- """
- 设置表格背景颜色,默认深灰
- 0 = 黑, 1 = 白, 2 = 红, 3 = 绿, 4 = 蓝, 5 = 黄, 6 = 品红, 7 = 蓝绿,
- 16 = 褐红, 17 = 深绿, 18 = 深蓝, 19 = 棕色, 20 = 暗洋红, 21 = 蓝绿色, 22 = 浅灰, 23 = 深灰......
- :param style:
- :param color:
- :return:
- """
- style.pattern.pattern = xlwt.Pattern.SOLID_PATTERN
- style.pattern.pattern_fore_colour = color
-
-
- def set_widths(worksheet, size):
- """
- 设置宽度
- :param worksheet:
- :param size:
- :return:
- """
- for i, v in enumerate(size):
- worksheet.col(i).width = v * 256
-
-
- def set_alignments(style, wrap=1, **kwargs):
- """
- 设置对齐方式,默认自动换行
- 中心对齐参数:{"horz": "CENTER", "vert": "CENTER"}
- horz(水平):CENTER(居中),DISTRIBUTED(两端),GENERAL,CENTER_ACROSS_SEL(分散),RIGHT(右),LEFT(左)
- vert(垂直):CENTER(居中),DISTRIBUTED(两端),BOTTOM(下),TOP(上)
- """
-
- if "horz" in kwargs.keys():
- style.alignment.horz = eval(f"xlwt.Alignment.HORZ_{kwargs['horz'].upper()}")
- if "vert" in kwargs.keys():
- style.alignment.vert = eval(f"xlwt.Alignment.VERT_{kwargs['vert'].upper()}")
- style.alignment.wrap = wrap # 设置自动换行
-
-
- if __name__ == "__main__":
- db_info = yaml_read('cfg_mysql_test.yaml')
- datas = data_detail(db_info, '调查问卷数据统计测试')
- sheets_name = ['调查问卷基本信息', '调查问卷填写详情', '调查问卷问题和选项']
- to_excel(datas, sheets_name)
运行结果:
同样使用xlwt模块,统计我们想要的数据,写入excel,并设置单元格的样式
- import datetime
- import os
- import time
- import traceback
- import xlwt
-
-
- def to_excel(data, sheet_name=None, file_name=None):
- """
- 数据写入excel
- :param data: 数据查询结果
- :param sheet_name: 工作表名称
- :param file_name: excel文件名称
- :return:
- """
- try:
- book = xlwt.Workbook(encoding='utf-8') # 新建工作簿
- new_datas = data_pre_handle(data) # 预处理数据
- if sheet_name is None:
- sheet_name = ["new_sheet" + str(i + 1) for i in range(len(new_datas))]
- write_data(book, new_datas, sheet_name) # 数据写入工作表
- save_excel(book, file_name) # 保存工作簿
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
-
- def save_excel(workbook, file_name=None):
- """
- 保存工作簿
- :param workbook: 工作簿
- :param file_name: excel文件名称
- :return:
- """
- if file_name is None:
- sj = datetime.datetime.now().strftime("%Y-%m-%d %H%M%S")
- name = sj + ".xlsx"
- else:
- name = file_name + ".xlsx"
- workbook.save(name)
- cur_file = os.path.dirname(os.path.abspath(__file__)) + os.sep + name # 当前excel的保存路径
- print(f"excel保存成功!【{cur_file}】")
-
-
- def data_pre_handle(data):
- """
- 预处理数据库查询到的数据
- :param data: 数据查询结果
- :return:
- """
- return [[list(v[0].keys())] + [list(val.values()) for index, val in enumerate(v)] for i, v in enumerate(data)]
-
-
- def write_data(workbook, data, sheet_name):
- """
- 数据写入sheet
- :param workbook: 工作簿
- :param data: 要写入的数据
- :param sheet_name: 工作表名称
- :return:
- """
- try:
- for index, value in enumerate(data):
- sheet = workbook.add_sheet(sheet_name[index], cell_overwrite_ok=True) # 新建sheet
- size = [12 for i in value[0]]
- head_style = style_head(sheet, size, wrap=1, is_bg=1) # 定义表头样式
- time_style = style_table_time() # 时间内容样式
- normal_style = style_table_normal({"horz": "CENTER", "vert": "CENTER"}) # 普通内容样式
- for col, val in enumerate(value[0]):
- sheet.write(0, col, val, head_style)
- print(f"写入表头成功!{value[0]}")
- for i, v in enumerate(value[1:]):
- for co, va in enumerate(v):
- val = str(va)
- if is_date(val):
- if len(val) > 19:
- new_val = datetime.datetime.strptime(val, "%Y-%m-%d %H:%M:%S.%f")
- elif 10 < len(val) <= 19:
- new_val = datetime.datetime.strptime(val, "%Y-%m-%d %H:%M:%S")
- else:
- new_val = datetime.datetime.strptime(val, '%Y-%m-%d')
- sheet.write(i + 1, co, new_val, time_style)
- elif is_number(val) == 1 and len(val) < 11:
- sheet.write(i + 1, co, int(val), normal_style)
- else:
- sheet.write(i + 1, co, val, normal_style)
- print(f"写入表格内容成功!共{len(value) - 1}行")
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
-
- def is_date(date_str):
- """
- 判断date_str是否为日期格式
- :param date_str:
- :return:
- """
- date_format = "%Y-%m-%d"
- try:
- if isinstance(date_str, str):
- new_str = date_str[:10]
- valid_date = time.strptime(new_str, date_format)
- return True
- else:
- return False
- except ValueError or TypeError as e:
- return False
-
-
- def is_number(val: str):
- """
- 判断字符串的数字类型
- :param val: 字符串
- :return: 1--整数,2--小数,0--非数字
- """
- try:
- float(val)
- if val.isdigit() or val.split('-')[-1].isdigit():
- return 1
- elif val.split('.')[0].isdigit() or val.split('-')[-1].split('.')[-1].isdigit():
- return 2
- else:
- return 0
- except ValueError:
- pass
- return 0
-
-
- def style_label():
- """
- 主表头的样式
- :return:
- """
- style = xlwt.XFStyle()
- set_font(style, height=16)
- set_border(style)
- set_pattern(style)
- dicts = {"horz": "CENTER", "vert": "CENTER"}
- set_alignments(style, wrap=0, **dicts)
- return style
-
-
- def style_head(worksheet, size, wrap=0, is_bg=0, color=22):
- """
- 表头的样式
- :param worksheet: 表格
- :param size: 表格列宽
- :param wrap: 1--自动换行,默认不换行
- :param is_bg: 1--设置背景色,默认不设置
- :param color: 默认浅灰色背景
- :return:
- """
- style = xlwt.XFStyle()
- set_font(style, height=14)
- set_border(style)
- set_widths(worksheet, size)
- dicts = {"horz": "CENTER", "vert": "CENTER"}
- set_alignments(style, wrap=wrap, **dicts)
- if is_bg == 1:
- set_pattern(style, color=color)
- return style
-
-
- def style_table_normal(dicts: dict):
- """
- 普通表格内容样式
- :param dicts: 对齐方式
- :return:
- """
- style = xlwt.XFStyle()
- set_font(style, bold=False)
- set_border(style)
- set_alignments(style, wrap=1, **dicts)
- return style
-
-
- def style_table_time():
- """
- 时间格式表格内容样式
- :return:
- """
- style = xlwt.XFStyle()
- set_border(style)
- style.num_format_str = "yyyy/MM/dd HH:MM:SS"
- dicts = {"horz": "LEFT", "vert": "CENTER"}
- set_alignments(style, wrap=1, **dicts)
- return style
-
-
- def style_table_percent():
- """
- 百分比格式表格内容样式
- :return:
- """
- style = xlwt.XFStyle()
- set_border(style)
- style.num_format_str = '0%'
- dicts = {"horz": "CENTER", "vert": "CENTER"}
- set_alignments(style, **dicts)
- return style
-
-
- def style_table_separator():
- """
- 分隔列表格内容样式
- :return:
- """
- style = xlwt.XFStyle()
- set_border(style)
- set_pattern(style, color=22)
- return style
-
-
- def set_font(style, bold=True, name='宋体', height=11):
- """
- 设置字体,默认宋体加粗,高度11
- :param style:
- :param bold:
- :param name:
- :param height:
- :return:
- """
- style.font.bold = bold
- style.font.name = name
- style.font.height = 20 * height
-
-
- def set_border(style, status=1):
- """
- 设置边框
- :param style:
- :param status:
- :return:
- """
- style.borders.left = status
- style.borders.right = status
- style.borders.top = status
- style.borders.bottom = status
-
-
- def set_pattern(style, color=23):
- """
- 设置表格背景颜色,默认深灰
- 0 = 黑, 1 = 白, 2 = 红, 3 = 绿, 4 = 蓝, 5 = 黄, 6 = 品红, 7 = 蓝绿,
- 16 = 褐红, 17 = 深绿, 18 = 深蓝, 19 = 棕色, 20 = 暗洋红, 21 = 蓝绿色, 22 = 浅灰, 23 = 深灰......
- :param style:
- :param color:
- :return:
- """
- style.pattern.pattern = xlwt.Pattern.SOLID_PATTERN
- style.pattern.pattern_fore_colour = color
-
-
- def set_widths(worksheet, size):
- """
- 设置宽度
- :param worksheet:
- :param size:
- :return:
- """
- for i, v in enumerate(size):
- worksheet.col(i).width = v * 256
-
-
- def set_alignments(style, wrap=1, **kwargs):
- """
- 设置对齐方式,默认自动换行
- 中心对齐参数:{"horz": "CENTER", "vert": "CENTER"}
- horz(水平):CENTER(居中),DISTRIBUTED(两端),GENERAL,CENTER_ACROSS_SEL(分散),RIGHT(右),LEFT(左)
- vert(垂直):CENTER(居中),DISTRIBUTED(两端),BOTTOM(下),TOP(上)
- """
-
- if "horz" in kwargs.keys():
- style.alignment.horz = eval(f"xlwt.Alignment.HORZ_{kwargs['horz'].upper()}")
- if "vert" in kwargs.keys():
- style.alignment.vert = eval(f"xlwt.Alignment.VERT_{kwargs['vert'].upper()}")
- style.alignment.wrap = wrap # 设置自动换行
-
-
- def data_statistics(data):
- """
- 统计详细数据,保存为excel
- :param data: 数据库查询结果
- :return:
- """
- wb = xlwt.Workbook(encoding='utf-8') # 新建工作簿
- ws = wb.add_sheet('活动问卷数据汇总', cell_overwrite_ok=True) # 新建sheet
-
- label_style = style_label()
- head_style = style_head(ws, size=[20, 20, 2, 10, 20, 20, 25, 2, 10, 25, 20, 2, 25, 20, 8, 25, 25])
- dicts1 = {"horz": "CENTER", "vert": "CENTER"}
- dicts2 = {"horz": "LEFT", "vert": "CENTER"}
- normal_table_style_center = style_table_normal(dicts1)
- normal_table_style_left = style_table_normal(dicts2)
- separator_table_style = style_table_separator()
- time_table_style = style_table_time()
- percent_table_style = style_table_percent()
-
- # 基本信息
- ws.write_merge(0, 0, 0, 1, '基本信息', label_style)
- ws.write(1, 0, '调查名称', head_style)
- ws.write(1, 1, data[0][0]['调查名称'], normal_table_style_center)
- ws.write(2, 0, '调查问卷名称', head_style)
- ws.write(2, 1, data[0][0]['调查问卷名称'], normal_table_style_center)
- ws.write(3, 0, '问卷发送人数', head_style)
- result1 = [v['姓名'] for v in data[0] if v['目前卡片数'] == 18]
- ws.write(3, 1, len(result1), normal_table_style_center)
- ws.write(4, 0, '问卷提交人数', head_style)
- result2 = [v['姓名'] for v in data[0] if v['填写时长'] is not None]
- ws.write(4, 1, len(result2), normal_table_style_center)
-
- # 分隔
- ws.write_merge(0, 142, 2, 2, '', separator_table_style)
-
- # 用户问卷发送及提交信息
- ws.write_merge(0, 0, 3, 6, '用户问卷发送及提交信息', label_style)
- head1 = ['姓名', '问卷发送时间', '问卷提交时间', '填写时长(s)']
- for i, v in enumerate(head1):
- ws.write(1, i + 3, v, head_style)
- for row, v in enumerate(data[0]):
- ws.write(row + 2, 3, v['姓名'], normal_table_style_left)
- if v['目前卡片数'] != 18:
- ws.write(row + 2, 4, '', normal_table_style_left)
- else:
- ws.write(row + 2, 4, v['最后卡片时间'], time_table_style)
- for j in data[1]:
- if v['会员id'] == j['会员id']:
- ws.write(row + 2, 5, j['问卷提交时间'], time_table_style)
- break
- if v['会员id'] not in [i['会员id'] for i in data[1]]:
- ws.write(row + 2, 5, '', normal_table_style_left)
- if v['填写时长'] is None:
- ws.write(row + 2, 6, '', normal_table_style_left)
- else:
- ws.write(row + 2, 6, v['填写时长'], normal_table_style_center)
-
- # 分隔
- ws.write_merge(0, 142, 7, 7, '', separator_table_style)
-
- # 用户问卷填写内容详情
- ws.write_merge(0, 0, 8, 10, '用户问卷填写内容详情', label_style)
- head2 = ['姓名', '问题', '答案']
- for i, v in enumerate(head2):
- ws.write(1, i + 8, v, head_style)
- for row, val in enumerate(data[1]):
- ws.write(row + 2, 8, val['姓名'], normal_table_style_left)
- ws.write(row + 2, 9, val['问题'], normal_table_style_left)
- ws.write(row + 2, 10, val['答案'], normal_table_style_left)
-
- # 分隔
- ws.write_merge(0, 142, 11, 11, '', separator_table_style)
-
- # 问卷问题选项信息
- ws.write_merge(0, 0, 12, 16, '问卷问题选项信息', label_style)
- head3 = ['问题', '问题选项', '题型', '选项被选择的人数', '选项被选择的占比']
- for i, v in enumerate(head3):
- ws.write(1, i + 12, v, head_style)
- for row, val in enumerate(data[2]):
- ws.write(row + 2, 12, val['题目'], normal_table_style_left)
- ws.write(row + 2, 13, val['选项'], normal_table_style_left)
- ws.write(row + 2, 14, val['题型'], normal_table_style_center)
- result3 = [v['答案'] for v in data[1] if val['题目'] in v['问题'] if val['选项'] in v['答案']]
- ws.write(row + 2, 15, len(result3), normal_table_style_center)
- ws.write(row + 2, 16, len(result3) / len(result2), percent_table_style)
-
- sj = datetime.datetime.now().strftime('%Y-%m-%d %H%M%S')
- wb.save('调查问卷数据统计' + sj + '.xlsx') # 保存excel
-
-
- if __name__ == "__main__":
- db_info = yaml_read('cfg_mysql_test.yaml')
- datas = data_detail(db_info, '调查问卷数据统计测试')
- sheets_name = ['调查问卷基本信息', '调查问卷填写详情', '调查问卷问题和选项']
- # to_excel(datas, sheets_name)
- data_statistics(datas)
运行结果:
把上面的方法封装成类,这里使用静态方法装饰器@staticmethod:不需self参数,通过类或者类实例来调用
- import datetime
- import os
- import time
- import traceback
- import pymysql
- import xlwt
- import yaml
-
- class ExcelStyle:
-
- @staticmethod
- def style_label():
- """
- 主表头的样式
- :return:
- """
- se = ExcelStyle()
- style = se.styles()
- se.set_font(style, height=16)
- se.set_border(style)
- se.set_pattern(style)
- dicts = {"horz": "CENTER", "vert": "CENTER"}
- se.set_alignments(style, wrap=0, **dicts)
- return style
-
- @staticmethod
- def style_head(worksheet, size, wrap=0, is_bg=0, color=22):
- """
- 表头的样式
- :param worksheet: 表格
- :param size: 表格列宽
- :param wrap: 1--自动换行,默认不换行
- :param is_bg: 1--设置背景色,默认不设置
- :param color: 默认浅灰色背景
- :return:
- """
- se = ExcelStyle()
- style = se.styles()
- se.set_font(style, height=14)
- se.set_border(style)
- se.set_widths(worksheet, size)
- dicts = {"horz": "CENTER", "vert": "CENTER"}
- se.set_alignments(style, wrap=wrap, **dicts)
- if is_bg == 1:
- se.set_pattern(style, color=color)
- return style
-
- @staticmethod
- def style_table_normal(dicts: dict):
- """
- 普通表格内容样式
- :param dicts: 对齐方式
- :return:
- """
- se = ExcelStyle()
- style = se.styles()
- se.set_font(style, bold=False)
- se.set_border(style)
- se.set_alignments(style, wrap=1, **dicts)
- return style
-
- @staticmethod
- def style_table_time():
- """
- 时间格式表格内容样式
- :return:
- """
- se = ExcelStyle()
- style = se.styles()
- se.set_border(style)
- style.num_format_str = "yyyy/MM/dd HH:MM:SS"
- dicts = {"horz": "LEFT", "vert": "CENTER"}
- se.set_alignments(style, wrap=1, **dicts)
- return style
-
- @staticmethod
- def style_table_percent():
- """
- 百分比格式表格内容样式
- :return:
- """
- se = ExcelStyle()
- style = se.styles()
- se.set_border(style)
- style.num_format_str = '0%'
- dicts = {"horz": "CENTER", "vert": "CENTER"}
- se.set_alignments(style, **dicts)
- return style
-
- @staticmethod
- def style_table_separator():
- """
- 分隔列表格内容样式
- :return:
- """
- se = ExcelStyle()
- style = se.styles()
- se.set_border(style)
- se.set_pattern(style, color=22)
- return style
-
- @staticmethod
- def styles():
- """设置单元格的样式的基础方法"""
- style = xlwt.XFStyle()
- return style
-
- @staticmethod
- def set_font(style, bold=True, name='宋体', height=11):
- """
- 设置字体,默认宋体加粗,高度11
- :param style:
- :param bold:
- :param name:
- :param height:
- :return:
- """
- style.font.bold = bold
- style.font.name = name
- style.font.height = 20 * height
-
- @staticmethod
- def set_border(style, status=1):
- """
- 设置边框
- :param style:
- :param status:
- :return:
- """
- style.borders.left = status
- style.borders.right = status
- style.borders.top = status
- style.borders.bottom = status
-
- @staticmethod
- def set_pattern(style, color=23):
- """
- 设置表格背景颜色,默认深灰
- 0 = 黑, 1 = 白, 2 = 红, 3 = 绿, 4 = 蓝, 5 = 黄, 6 = 品红, 7 = 蓝绿,
- 16 = 褐红, 17 = 深绿, 18 = 深蓝, 19 = 棕色, 20 = 暗洋红, 21 = 蓝绿色, 22 = 浅灰, 23 = 深灰......
- :param style:
- :param color:
- :return:
- """
- style.pattern.pattern = xlwt.Pattern.SOLID_PATTERN
- style.pattern.pattern_fore_colour = color
-
- @staticmethod
- def set_widths(worksheet, size):
- """
- 设置宽度
- :param worksheet:
- :param size:
- :return:
- """
- for i, v in enumerate(size):
- worksheet.col(i).width = v * 256
-
- @staticmethod
- def set_alignments(style, wrap=1, **kwargs):
- """
- 设置对齐方式,默认自动换行
- 中心对齐参数:{"horz": "CENTER", "vert": "CENTER"}
- horz(水平):CENTER(居中),DISTRIBUTED(两端),GENERAL,CENTER_ACROSS_SEL(分散),RIGHT(右),LEFT(左)
- vert(垂直):CENTER(居中),DISTRIBUTED(两端),BOTTOM(下),TOP(上)
- """
-
- if "horz" in kwargs.keys():
- style.alignment.horz = eval(f"xlwt.Alignment.HORZ_{kwargs['horz'].upper()}")
- if "vert" in kwargs.keys():
- style.alignment.vert = eval(f"xlwt.Alignment.VERT_{kwargs['vert'].upper()}")
- style.alignment.wrap = wrap # 设置自动换行
-
-
- class DataStatistics(ExcelStyle):
-
- @staticmethod
- def data_detail(dbinfo, survey_name):
- """
- 连接数据库查询详细数据,保存为excel
- :param dbinfo: 数据库连接信息
- :param survey_name: 调查名称
- :return: 数据库查询结果
- """
- se = DataStatistics()
- sql_name = ['调查问卷基本信息', '调查问卷填写详情', '调查问卷问题和选项']
- sql_list = [f'调查问卷基本信息查询SQL:select * from table1 where name={survey_name}', '调查问卷填写详情查询SQL', '调查问卷问题和选项查询SQL']
- data = [se.execute_sql(dbinfo, sql) for sql in sql_list]
- se.to_excel(data, sql_name)
- return data
-
- @staticmethod
- def execute_sql(dbinfo, sql):
- """
- 连接数据库,执行sql查询语句,返回查询结果
- :param dbinfo: 数据库配置信息
- :param sql: 数据库查询语句
- :return:
- """
- try:
- conn = pymysql.connect(**dbinfo) # 连接数据库
- cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 数据库游标
- cursor.execute(sql) # 执行sql语句
- results = cursor.fetchall() # 查询结果
- conn.close() # 关闭数据库连接
- return results
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
- @staticmethod
- def to_excel(data, sheet_name=None, file_name=None):
- """
- 数据写入excel
- :param data: 数据查询结果
- :param sheet_name: 工作表名称
- :param file_name: excel文件名称
- :return:
- """
- se = DataStatistics()
- try:
- book = xlwt.Workbook(encoding='utf-8') # 新建工作簿
- new_datas = se.data_pre_handle(data) # 预处理数据
- if sheet_name is None:
- sheet_name = ["new_sheet" + str(i + 1) for i in range(len(new_datas))]
- se.write_data(book, new_datas, sheet_name) # 数据写入工作表
- se.save_excel(book, file_name) # 保存工作簿
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
- @staticmethod
- def save_excel(workbook, file_name=None):
- """
- 保存工作簿
- :param workbook: 工作簿
- :param file_name: excel文件名称
- :return:
- """
- if file_name is None:
- sj = datetime.datetime.now().strftime("%Y-%m-%d %H%M%S")
- name = sj + ".xlsx"
- else:
- name = file_name + ".xlsx"
- workbook.save(name)
- cur_file = os.path.dirname(os.path.abspath(__file__)) + os.sep + name # 当前excel的保存路径
- print(f"excel保存成功!【{cur_file}】")
-
- @staticmethod
- def data_pre_handle(data):
- """
- 预处理数据库查询到的数据
- :param data: 数据查询结果
- :return:
- """
- return [[list(v[0].keys())] + [list(val.values()) for index, val in enumerate(v)] for i, v in enumerate(data)]
-
- @staticmethod
- def write_data(workbook, data, sheet_name):
- """
- 数据写入sheet
- :param workbook: 工作簿
- :param data: 要写入的数据
- :param sheet_name: 工作表名称
- :return:
- """
- se = DataStatistics()
- try:
- for index, value in enumerate(data):
- sheet = workbook.add_sheet(sheet_name[index], cell_overwrite_ok=True) # 新建sheet
- size = [12 for i in value[0]]
- head_style = se.style_head(sheet, size, wrap=1, is_bg=1) # 定义表头样式
- time_style = se.style_table_time() # 时间内容样式
- normal_style = se.style_table_normal({"horz": "CENTER", "vert": "CENTER"}) # 普通内容样式
- for col, val in enumerate(value[0]):
- sheet.write(0, col, val, head_style)
- print(f"写入表头成功!{value[0]}")
- for i, v in enumerate(value[1:]):
- for co, va in enumerate(v):
- val = str(va)
- if se.is_date(val):
- if len(val) > 19:
- new_val = datetime.datetime.strptime(val, "%Y-%m-%d %H:%M:%S.%f")
- elif 10 < len(val) <= 19:
- new_val = datetime.datetime.strptime(val, "%Y-%m-%d %H:%M:%S")
- else:
- new_val = datetime.datetime.strptime(val, '%Y-%m-%d')
- sheet.write(i + 1, co, new_val, time_style)
- elif se.is_number(val) == 1 and len(val) < 11:
- sheet.write(i + 1, co, int(val), normal_style)
- else:
- sheet.write(i + 1, co, val, normal_style)
- print(f"写入表格内容成功!共{len(value) - 1}行")
- except Exception as err:
- info = f"出了点小问题!\n{repr(err)}\n{traceback.format_exc()}"
- print(info)
-
- @staticmethod
- def is_date(date_str):
- """
- 判断date_str是否为日期格式
- :param date_str:
- :return:
- """
- date_format = "%Y-%m-%d"
- try:
- if isinstance(date_str, str):
- new_str = date_str[:10]
- valid_date = time.strptime(new_str, date_format)
- return True
- else:
- return False
- except ValueError or TypeError as e:
- return False
-
- @staticmethod
- def is_number(val: str):
- """
- 判断字符串的数字类型
- :param val: 字符串
- :return: 1--整数,2--小数,0--非数字
- """
- try:
- float(val)
- if val.isdigit() or val.split('-')[-1].isdigit():
- return 1
- elif val.split('.')[0].isdigit() or val.split('-')[-1].split('.')[-1].isdigit():
- return 2
- else:
- return 0
- except ValueError:
- pass
- return 0
-
- @staticmethod
- def data_statistics(data):
- """
- 统计详细数据,保存为excel
- :param data: 数据库查询结果
- :return:
- """
- se = DataStatistics()
- wb = xlwt.Workbook(encoding='utf-8') # 新建工作簿
- ws = wb.add_sheet('活动问卷数据汇总', cell_overwrite_ok=True) # 新建sheet
-
- label_style = se.style_label()
- head_style = se.style_head(ws, size=[20, 20, 2, 10, 20, 20, 25, 2, 10, 25, 20, 2, 25, 20, 8, 25, 25])
- dicts1 = {"horz": "CENTER", "vert": "CENTER"}
- dicts2 = {"horz": "LEFT", "vert": "CENTER"}
- normal_table_style_center = se.style_table_normal(dicts1)
- normal_table_style_left = se.style_table_normal(dicts2)
- separator_table_style = se.style_table_separator()
- time_table_style = se.style_table_time()
- percent_table_style = se.style_table_percent()
-
- # 基本信息
- ws.write_merge(0, 0, 0, 1, '基本信息', label_style)
- ws.write(1, 0, '调查名称', head_style)
- ws.write(1, 1, data[0][0]['调查名称'], normal_table_style_center)
- ws.write(2, 0, '调查问卷名称', head_style)
- ws.write(2, 1, data[0][0]['调查问卷名称'], normal_table_style_center)
- ws.write(3, 0, '问卷发送人数', head_style)
- result1 = [v['姓名'] for v in data[0] if v['目前卡片数'] == 18]
- ws.write(3, 1, len(result1), normal_table_style_center)
- ws.write(4, 0, '问卷提交人数', head_style)
- result2 = [v['姓名'] for v in data[0] if v['填写时长'] is not None]
- ws.write(4, 1, len(result2), normal_table_style_center)
-
- # 分隔
- ws.write_merge(0, 142, 2, 2, '', separator_table_style)
-
- # 用户问卷发送及提交信息
- ws.write_merge(0, 0, 3, 6, '用户问卷发送及提交信息', label_style)
- head1 = ['姓名', '问卷发送时间', '问卷提交时间', '填写时长(s)']
- for i, v in enumerate(head1):
- ws.write(1, i + 3, v, head_style)
- for row, v in enumerate(data[0]):
- ws.write(row + 2, 3, v['姓名'], normal_table_style_left)
- if v['目前卡片数'] != 18:
- ws.write(row + 2, 4, '', normal_table_style_left)
- else:
- ws.write(row + 2, 4, v['最后卡片时间'], time_table_style)
- for j in data[1]:
- if v['会员id'] == j['会员id']:
- ws.write(row + 2, 5, j['问卷提交时间'], time_table_style)
- break
- if v['会员id'] not in [i['会员id'] for i in data[1]]:
- ws.write(row + 2, 5, '', normal_table_style_left)
- if v['填写时长'] is None:
- ws.write(row + 2, 6, '', normal_table_style_left)
- else:
- ws.write(row + 2, 6, v['填写时长'], normal_table_style_center)
-
- # 分隔
- ws.write_merge(0, 142, 7, 7, '', separator_table_style)
-
- # 用户问卷填写内容详情
- ws.write_merge(0, 0, 8, 10, '用户问卷填写内容详情', label_style)
- head2 = ['姓名', '问题', '答案']
- for i, v in enumerate(head2):
- ws.write(1, i + 8, v, head_style)
- for row, val in enumerate(data[1]):
- ws.write(row + 2, 8, val['姓名'], normal_table_style_left)
- ws.write(row + 2, 9, val['问题'], normal_table_style_left)
- ws.write(row + 2, 10, val['答案'], normal_table_style_left)
-
- # 分隔
- ws.write_merge(0, 142, 11, 11, '', separator_table_style)
-
- # 问卷问题选项信息
- ws.write_merge(0, 0, 12, 16, '问卷问题选项信息', label_style)
- head3 = ['问题', '问题选项', '题型', '选项被选择的人数', '选项被选择的占比']
- for i, v in enumerate(head3):
- ws.write(1, i + 12, v, head_style)
- for row, val in enumerate(data[2]):
- ws.write(row + 2, 12, val['题目'], normal_table_style_left)
- ws.write(row + 2, 13, val['选项'], normal_table_style_left)
- ws.write(row + 2, 14, val['题型'], normal_table_style_center)
- result3 = [v['答案'] for v in data[1] if val['题目'] in v['问题'] if val['选项'] in v['答案']]
- ws.write(row + 2, 15, len(result3), normal_table_style_center)
- ws.write(row + 2, 16, len(result3) / len(result2), percent_table_style)
-
- sj = datetime.datetime.now().strftime('%Y-%m-%d %H%M%S')
- wb.save('调查问卷数据统计' + sj + '.xlsx') # 保存excel
-
-
- def yaml_read(filename):
- """
- 读取yaml文件
- """
- with open(file=filename, mode="r", encoding="utf-8") as f:
- data = yaml.safe_load(f.read())
- return data
-
-
- if __name__ == "__main__":
- db_info = yaml_read('cfg_mysql_test.yaml')
- ds = DataStatistics()
- datas = ds.data_detail(db_info, '调查问卷数据统计测试')
- ds.data_statistics(datas)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。