当前位置:   article > 正文

Python脚本之操作MySQL【一】_python cur.executemany

python cur.executemany

本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/108316723

之前曾经写过 连接MySQL【二】连接MySQL【一】,最近对我的代码做了些优化,想着做些分享;

个人博客:https://blog.csdn.net/zyooooxie

Cursor类的execute()、executemany()

往表里造数据时,我常常用到这2个方法;

以前在使用execute() 都是直接传了 我要执行的sql,偶然看了下源码,才发现 和executemany() 有点相似,都有args。

execute()

在这里插入图片描述

executemany()

在这里插入图片描述

所以,就把自己写的 公共方法做了优化,然后发现 实际调用这些方法时,很多造数方法 也得优化,干脆就全部优化一遍了。

代码

公共方法

(代码有删改)

@File: common_mysql.py

"""
@blog: https://blog.csdn.net/zyooooxie
"""


import pymysql
import traceback
from pymysql.connections import Connection
from pymysql.cursors import Cursor
from user_log import Log


def connect_db(**kwargs):
    """
    数据库链接,传参为 具体的db
    :param kwargs:
    :return:
    """

    if kwargs.get('ABC_db'):
        mysql_user = ''
        mysql_host = ''
        mysql_pwd = ''
        mysql_db = ''

    elif kwargs.get('abc_db'):
        mysql_user = ''
        mysql_host = ''
        mysql_pwd = ''
        mysql_db = ''


    else:
        raise Exception('传参不合法')

    db = pymysql.connect(host=mysql_host, passwd=mysql_pwd, user=mysql_user, port=3306, autocommit=True,
                         database=mysql_db, charset='utf8', use_unicode=True)
    cur = db.cursor()

    return db, cur


def execute_sql_no_args_no_close(sql: str, db: Connection, cur: Cursor):
    """
    execute() 没有args
    :param sql:
    :param db:
    :param cur:
    :return:
    """
    Log.debug(sql)

    try:
        result = cur.execute(sql)

    except Exception as e:
        db.rollback()
        Log.debug(e.args)
        Log.info(traceback.format_exc())
        result = False

    execute_sql_result_check(result)


def execute_sql_no_close(data_list_tuple: list or tuple, sql: str, db: Connection, cur: Cursor):
    """
    execute() 要传args
    :param data_list_tuple:
    :param sql:
    :param db:
    :param cur:
    :return:
    """
    Log.debug(sql)

    try:
        assert sql.find('(%s') != -1

        # %s可以用作查询中的占位符
        # If args is a list or tuple, %s can be used as a placeholder in the query.

        result = cur.execute(sql, data_list_tuple)

    except Exception as e:
        db.rollback()
        Log.debug(e.args)
        Log.info(traceback.format_exc())

        result = False

    execute_sql_result_check(result)


def executemany_sql_no_close(data_list: list, sql: str, db: Connection, cur: Cursor):
    """
    executemany() 要传args
    :param data_list:
    :param sql:
    :param db:
    :param cur:
    :return:
    """
    Log.debug(sql)

    try:
        db.ping(reconnect=True)

        assert sql.find('(%s') != -1
        result = cur.executemany(sql, data_list)

    except Exception as e:
        db.rollback()
        Log.debug(e.args)
        Log.info(traceback.format_exc())

        result = False

    execute_sql_result_check(result)


def execute_sql_result_check(result):
    """

    :param result:
    :return:
    """
    Log.debug(result)
    if result is False:
        Log.error('Execute Fail')
        raise
    else:
        assert isinstance(result, int) is True
        Log.debug('Execute Succeed')


def fetchall_data_no_close(sql: str, db: Connection, cur: Cursor):
    """
    fetchall()
    :param sql:
    :param db:
    :param cur:
    :return:
    """
    Log.debug(sql)

    try:
        cur.execute(sql)
        data = cur.fetchall()
    except Exception as e:
        db.rollback()
        Log.debug(e.args)
        Log.info(traceback.format_exc())

        data = False

    fetch_sql_result_check(data)

    return data


def fetchone_data_no_close(sql: str, db: Connection, cur: Cursor):
    """
    fetchone()
    :param sql:
    :param db:
    :param cur:
    :return:
    """
    Log.debug(sql)

    try:
        cur.execute(sql)
        data = cur.fetchone()

    except Exception as e:
        db.rollback()

        Log.debug(e.args)
        Log.info(traceback.format_exc())

        data = False

    fetch_sql_result_check(data)
    return data


def fetch_sql_result_check(result):
    """

    :param result:
    :return:
    """
    Log.debug(result)

    if result is False:
        Log.error('Fetch Fail')
        raise

    elif not bool(result):
        Log.error('Fetch None')

    else:
        Log.debug('Fetch Succeed')
		

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205

方法优化

公共方法中 execute_sql_no_close() 和 executemany_sql_no_close() 、fetchone_data_no_close()和fetchall_data_no_close() 大部分代码一样,还可以再优化下;

"""
@blog: https://blog.csdn.net/zyooooxie
"""

def exe_sql(sql: str, db: Connection, cur: Cursor, exe_mode: str = None, data_list: list = None):
    Log.debug(sql)

    if data_list is None and exe_mode is None:
        try_str = """cur.execute(sql)"""

    elif exe_mode == 'execute' and data_list is not None:
        assert sql.find('(%s') != -1
        try_str = """cur.execute(sql, data_list)"""

    elif exe_mode == 'executemany' and data_list is not None:
        assert sql.find('(%s') != -1
        try_str = """cur.executemany(sql, data_list)"""

    else:
        Log.error('{}--{}'.format(exe_mode, data_list))
        raise

    try:
        result = eval(try_str, locals())

    except Exception as e:
        db.rollback()
        Log.debug(e.args)
        Log.info(traceback.format_exc())
        result = False

    execute_sql_result_check(result)


def fetch_sql(sql: str, db: Connection, cur: Cursor, fetch_mode: str):
    Log.debug(sql)

    if fetch_mode == 'fetchall':
        try_str = """cur.fetchall()"""

    elif fetch_mode == 'fetchone':
        try_str = """cur.fetchone()"""

    else:
        Log.error('fetch_mode: {}'.format(fetch_mode))
        raise

    try:
        cur.execute(sql)
        data = eval(try_str, locals())

    except Exception as e:
        db.rollback()

        Log.debug(e.args)
        Log.info(traceback.format_exc())

        data = False

    fetch_sql_result_check(data)
    return data
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

实际应用

(代码有删改)

@File: send_gift.py

"""
@blog: https://blog.csdn.net/zyooooxie
"""

def test_delete_all_data(user_id: str = 'zyooooxie'):
    """
    
    :param user_id:
    :return:
    """

    db_zy, cur_zy = connect_db(zy_db='YES')
    
    exe_sql("""DELETE FROM table_records  WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon)
    exe_sql("""DELETE FROM table_records_detail  WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon)

    # execute_sql_no_args_no_close("""DELETE FROM table_records  WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon)
    # execute_sql_no_args_no_close("""DELETE FROM table_records_detail  WHERE USER_ID = '{}';""".format(user_id), db_coupon, cur_coupon)


    Log.info('数据已经清理-{}'.format(user_id))

    cur_zy.close()
    db_zy.close()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

@filename: create_order.py

"""
@blog: https://blog.csdn.net/zyooooxie
"""


def test_data2(db: Connection, cur: Cursor, user_id: str = 'zyooooxie'):
    """
    造支付记录-详情
    :param db:
    :param cur:
    :param user_id:
    :return:
    """
    sql1 = """SELECT COUNT(1) FROM table_pay_record; """
    
    # sql_data = fetchone_data_no_close(sql1, db, cur)
    sql_data = fetch_sql(sql1, db, cur, fetch_mode='fetchone')

    sql_data = sql_data[0]
    record_id = int(sql_data) + 1
    Log.info(f'{record_id}')

    sql2 = """ SELECT package_code, package_name_cn, price FROM table_package ; """
    # data = fetchall_data_no_close(sql2, db, cur)
    data = fetch_sql(sql2, db, cur, fetch_mode='fetchall')

    data = random.choice(data)
    Log.info(data)

    ele = data if len(data[1]) < 40 else (data[0], data[1][:35], data[2])           # `package_name` varchar(40)
    res = record_id, user_id, *ele

    Log.info(res)
    return res


def test_insert(db: Connection, cur: Cursor, user_id: str = 'zyooooxie',
                mobile: str = '17777777777'):
    """

    :param db:
    :param cur:
    :param user_id:
    :param mobile:
    :return:
    """

    data2 = test_data2(db, cur, user_id=user_id)

    sql2 = """INSERT INTO table_pay_record_detail (record_id, user_id, package_code, package_name, package_price, receive_status,verification) VALUES (%s, %s, %s, %s, %s, %s, %s); """
    receive_status = random.randint(1, 6)
    verification = 1

    data2 = *data2, receive_status, verification

    # execute_sql_no_close(data2, sql2, db, cur)
    exe_sql(sql2, db, cur, exe_mode='execute', data_list=data2)    
    Log.info('数据插入完成')


def test_main():
    """

    :return:
    """
    db_m, cur_m = connect_db(hk_cemposea_db='YES')

    for _ in range(10):
        test_insert(db_m, cur_m)

    else:

        cur_m.close()
        db_m.close()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
"""
@blog: https://blog.csdn.net/zyooooxie
"""

def test_1(package_code: str = 'zyooooxie'):
    """
	造订单数据-为了方便在管理后台查到,userId、phone写死
    :param package_code:
    :return:
    """
    db_m, cur_m = connect_db(hk_cemposea_db='YES')
	
    sql = """ SELECT package_name_cn,coupon_effect_tm_st,coupon_effect_tm_end FROM table_package WHERE status = 1 AND package_code = '{}'; """.format(package_code)

    # op_data = fetchone_data_no_close(sql, db_m, cur_m)
    op_data = fetch_sql(sql, db_m, cur_m, fetch_mode='fetchone')

    sql = """SELECT coupon_code, check_code FROM table_package_coupon WHERE package_code = '{}' AND create_by = 'zyooooxie'; """.format(package_code)
    
    # opc_data = fetchall_data_no_close(sql, db_m, cur_m)
    opc_data = fetch_sql(sql, db_m, cur_m, fetch_mode='fetchall')

    coupon_code, check_code = zip(*opc_data)        # 生成2个元组

    order_code = 222222222222222222
    abc = random.randint(1000, 9999)
    order_code_list = [order_code + (i+1) * abc for i in range(len(opc_data))]

    zip_data = [op_data] * len(opc_data)
    commodity_name, order_start_time, order_end_time = zip(*zip_data)

    sql = """INSERT INTO table_order (order_code, user_id, phone_number, order_time, commodity_name,coupon_code,check_code,order_start_time,order_end_time,commodity_code,order_status) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s); """
	
    dt = datetime.datetime.now()
    all_data = list(zip(order_code_list, ['zyooooxie'] * len(opc_data), ['zyooooxie'] * len(opc_data),
                        [dt] * len(opc_data), commodity_name, coupon_code, check_code, order_start_time, order_end_time, [package_code] * len(opc_data), [0] * len(opc_data)))

    # executemany_sql_no_close(all_data, sql, db_m, cur_m)
    exe_sql(sql, db_m, cur_m, exe_mode='executemany', data_list=all_data)

    Log.info('数据插入完成')
    cur_m.close()
    db_m.close()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

本文链接:https://blog.csdn.net/zyooooxie/article/details/108316723

个人博客 https://blog.csdn.net/zyooooxie

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/980512
推荐阅读
相关标签
  

闽ICP备14008679号