赞
踩
本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/108316723
之前曾经写过 连接MySQL【二】, 连接MySQL【一】,最近对我的代码做了些优化,想着做些分享;
个人博客:https://blog.csdn.net/zyooooxie
往表里造数据时,我常常用到这2个方法;
以前在使用execute() 都是直接传了 我要执行的sql,偶然看了下源码,才发现 和executemany() 有点相似,都有args。
execute()
executemany()
所以,就把自己写的 公共方法做了优化,然后发现 实际调用这些方法时,很多造数方法 也得优化,干脆就全部优化一遍了。
(代码有删改)
@File: common_mysql.py
""" @blog: https://blog.csdn.net/zyooooxie """ import pymysql import traceback from pymysql.connections import Connection from pymysql.cursors import Cursor from user_log import Log def connect_db(**kwargs): """ 数据库链接,传参为 具体的db :param kwargs: :return: """ if kwargs.get('ABC_db'): mysql_user = '' mysql_host = '' mysql_pwd = '' mysql_db = '' elif kwargs.get('abc_db'): mysql_user = '' mysql_host = '' mysql_pwd = '' mysql_db = '' else: raise Exception('传参不合法') db = pymysql.connect(host=mysql_host, passwd=mysql_pwd, user=mysql_user, port=3306, autocommit=True, database=mysql_db, charset='utf8', use_unicode=True) cur = db.cursor() return db, cur def execute_sql_no_args_no_close(sql: str, db: Connection, cur: Cursor): """ execute() 没有args :param sql: :param db: :param cur: :return: """ Log.debug(sql) try: result = cur.execute(sql) except Exception as e: db.rollback() Log.debug(e.args) Log.info(traceback.format_exc()) result = False execute_sql_result_check(result) def execute_sql_no_close(data_list_tuple: list or tuple, sql: str, db: Connection, cur: Cursor): """ execute() 要传args :param data_list_tuple: :param sql: :param db: :param cur: :return: """ Log.debug(sql) try: assert sql.find('(%s') != -1 # %s可以用作查询中的占位符 # If args is a list or tuple, %s can be used as a placeholder in the query. result = cur.execute(sql, data_list_tuple) except Exception as e: db.rollback() Log.debug(e.args) Log.info(traceback.format_exc()) result = False execute_sql_result_check(result) def executemany_sql_no_close(data_list: list, sql: str, db: Connection, cur: Cursor): """ executemany() 要传args :param data_list: :param sql: :param db: :param cur: :return: """ Log.debug(sql) try: db.ping(reconnect=True) assert sql.find('(%s') != -1 result = cur.executemany(sql, data_list) except Exception as e: db.rollback() Log.debug(e.args) Log.info(traceback.format_exc()) result = False execute_sql_result_check(result) def execute_sql_result_check(result): """ :param result: :return: """ Log.debug(result) if result is False: Log.error('Execute Fail') raise else: assert isinstance(result, int) is True Log.debug('Execute Succeed') def fetchall_data_no_close(sql: str, db: Connection, cur: Cursor): """ fetchall() :param sql: :param db: :param cur: :return: """ Log.debug(sql) try: cur.execute(sql) data = cur.fetchall() except Exception as e: db.rollback() Log.debug(e.args) Log.info(traceback.format_exc()) data = False fetch_sql_result_check(data) return data def fetchone_data_no_close(sql: str, db: Connection, cur: Cursor): """ fetchone() :param sql: :param db: :param cur: :return: """ Log.debug(sql) try: cur.execute(sql) data = cur.fetchone() except Exception as e: db.rollback() Log.debug(e.args) Log.info(traceback.format_exc()) data = False fetch_sql_result_check(data) return data def fetch_sql_result_check(result): """ :param result: :return: """ Log.debug(result) if result is False: Log.error('Fetch Fail') raise elif not bool(result): Log.error('Fetch None') else: Log.debug('Fetch Succeed')
公共方法中 execute_sql_no_close() 和 executemany_sql_no_close() 、fetchone_data_no_close()和fetchall_data_no_close() 大部分代码一样,还可以再优化下;
""" @blog: https://blog.csdn.net/zyooooxie """ def exe_sql(sql: str, db: Connection, cur: Cursor, exe_mode: str = None, data_list: list = None): Log.debug(sql) if data_list is None and exe_mode is None: try_str = """cur.execute(sql)""" elif exe_mode == 'execute' and data_list is not None: assert sql.find('(%s') != -1 try_str = """cur.execute(sql, data_list)""" elif exe_mode == 'executemany' and data_list is not None: assert sql.find('(%s') != -1 try_str = """cur.executemany(sql, data_list)""" else: Log.error('{}--{}'.format(exe_mode, data_list)) raise try: result = eval(try_str, locals()) except Exception as e: db.rollback() Log.debug(e.args) Log.info(traceback.format_exc()) result = False execute_sql_result_check(result) def fetch_sql(sql: str, db: Connection, cur: Cursor, fetch_mode: str): Log.debug(sql) if fetch_mode == 'fetchall': try_str = """cur.fetchall()""" elif fetch_mode == 'fetchone': try_str = """cur.fetchone()""" else: Log.error('fetch_mode: {}'.format(fetch_mode)) raise try: cur.execute(sql) data = eval(try_str, locals()) except Exception as e: db.rollback() Log.debug(e.args) Log.info(traceback.format_exc()) data = False fetch_sql_result_check(data) return data
(代码有删改)
@File: send_gift.py
""" @blog: https://blog.csdn.net/zyooooxie """ def test_delete_all_data(user_id: str = 'zyooooxie'): """ :param user_id: :return: """ db_zy, cur_zy = connect_db(zy_db='YES') exe_sql("""DELETE FROM table_records WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon) exe_sql("""DELETE FROM table_records_detail WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon) # execute_sql_no_args_no_close("""DELETE FROM table_records WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon) # execute_sql_no_args_no_close("""DELETE FROM table_records_detail WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon) Log.info('数据已经清理-{}'.format(user_id)) cur_zy.close() db_zy.close()
@filename: create_order.py
""" @blog: https://blog.csdn.net/zyooooxie """ def test_data2(db: Connection, cur: Cursor, user_id: str = 'zyooooxie'): """ 造支付记录-详情 :param db: :param cur: :param user_id: :return: """ sql1 = """SELECT COUNT(1) FROM table_pay_record; """ # sql_data = fetchone_data_no_close(sql1, db, cur) sql_data = fetch_sql(sql1, db, cur, fetch_mode='fetchone') sql_data = sql_data[0] record_id = int(sql_data) + 1 Log.info(f'{record_id}') sql2 = """ SELECT package_code, package_name_cn, price FROM table_package ; """ # data = fetchall_data_no_close(sql2, db, cur) data = fetch_sql(sql2, db, cur, fetch_mode='fetchall') data = random.choice(data) Log.info(data) ele = data if len(data[1]) < 40 else (data[0], data[1][:35], data[2]) # `package_name` varchar(40) res = record_id, user_id, *ele Log.info(res) return res def test_insert(db: Connection, cur: Cursor, user_id: str = 'zyooooxie', mobile: str = '17777777777'): """ :param db: :param cur: :param user_id: :param mobile: :return: """ data2 = test_data2(db, cur, user_id=user_id) sql2 = """INSERT INTO table_pay_record_detail (record_id, user_id, package_code, package_name, package_price, receive_status,verification) VALUES (%s, %s, %s, %s, %s, %s, %s); """ receive_status = random.randint(1, 6) verification = 1 data2 = *data2, receive_status, verification # execute_sql_no_close(data2, sql2, db, cur) exe_sql(sql2, db, cur, exe_mode='execute', data_list=data2) Log.info('数据插入完成') def test_main(): """ :return: """ db_m, cur_m = connect_db(hk_cemposea_db='YES') for _ in range(10): test_insert(db_m, cur_m) else: cur_m.close() db_m.close()
""" @blog: https://blog.csdn.net/zyooooxie """ def test_1(package_code: str = 'zyooooxie'): """ 造订单数据-为了方便在管理后台查到,userId、phone写死 :param package_code: :return: """ db_m, cur_m = connect_db(hk_cemposea_db='YES') sql = """ SELECT package_name_cn,coupon_effect_tm_st,coupon_effect_tm_end FROM table_package WHERE status = 1 AND package_code = '{}'; """.format(package_code) # op_data = fetchone_data_no_close(sql, db_m, cur_m) op_data = fetch_sql(sql, db_m, cur_m, fetch_mode='fetchone') sql = """SELECT coupon_code, check_code FROM table_package_coupon WHERE package_code = '{}' AND create_by = 'zyooooxie'; """.format(package_code) # opc_data = fetchall_data_no_close(sql, db_m, cur_m) opc_data = fetch_sql(sql, db_m, cur_m, fetch_mode='fetchall') coupon_code, check_code = zip(*opc_data) # 生成2个元组 order_code = 222222222222222222 abc = random.randint(1000, 9999) order_code_list = [order_code + (i+1) * abc for i in range(len(opc_data))] zip_data = [op_data] * len(opc_data) commodity_name, order_start_time, order_end_time = zip(*zip_data) sql = """INSERT INTO table_order (order_code, user_id, phone_number, order_time, commodity_name,coupon_code,check_code,order_start_time,order_end_time,commodity_code,order_status) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s); """ dt = datetime.datetime.now() all_data = list(zip(order_code_list, ['zyooooxie'] * len(opc_data), ['zyooooxie'] * len(opc_data), [dt] * len(opc_data), commodity_name, coupon_code, check_code, order_start_time, order_end_time, [package_code] * len(opc_data), [0] * len(opc_data))) # executemany_sql_no_close(all_data, sql, db_m, cur_m) exe_sql(sql, db_m, cur_m, exe_mode='executemany', data_list=all_data) Log.info('数据插入完成') cur_m.close() db_m.close()
本文链接:https://blog.csdn.net/zyooooxie/article/details/108316723
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。