赞
踩
游标:执行sql语句后,取出返回结果的接口。提供游标接口,借助游标一行一行取出数据,直到最后一行,游标为空。
图示说明:
如果不使用游标功能,直接使用select查询,会一次性将结果集打印到屏幕上,你无法针对结果集做第二次编程
案例:student表
使用fetchall()方法,获取SQL查询结果集中的数据
db = pymysql.connect(host='localhost',user='root',db='huangwei',
password='123456',port=3306,charset='utf8')
cursor = db.cursor()
cursor.execute('select sname,ssex from student')
aa = cursor.fetchall()
# print(aa)
for a,b in aa:
if b == "女":
a = "我叫{},我是一个学生!".format(a)
print(a)
db.close()
结果如下:
① 连接数据库,创建一个数据库对象
db = pymysql.connect(host='localhost',user='root',db='huangwei',
password='123456',port=3306,charset='utf8')
② 开启游标功能,创建游标对象
# 这里使用的是数据库对象db中的cursor()方法,
cursor = db.cursor()
③ 使用execute()方法,执行SQL语句
cursor.execute('select sname,ssex from student')
④ 使用fetchone()或fetchall(),fetchmany(n)获取数据
# 一次性获取一条数据
a = cursor.fetchone()
# 一次性获取所有数据
a = cursor.fetchall()
# 获取n条数据
a = cursor.fetchmany(n)
⑤ 断开数据库,释放资源
db.close()
data_old.py
import time import cx_Oracle import pymysql def migratedat(): source_db = cx_Oracle.connect('********', '********', '133.0.**.**:**/xydb') # 源库 target_db = pymysql.connect('10.37.6.**', 'root', '********', 'enterprise_internet') # 目标库 # 2.创建游标 cur_select = source_db.cursor() # 源库查询对象 cur_insert = target_db.cursor() # 目标库插入对象 # 3.执行SQL sql_hlw_select = "SELECT ACC_NUM from CUS_INST.XY_PROD_INST@to_crm30db " \ "where prod_id in (select product_id from lh_zq_lx_product where flag_hlzx = 'hlw_zx' and flag_provinces='1')" \ "and prod_use_type='1000'and status_cd in (100000,120000)and lan_id ='8420600'" # 获取源表有多少个列 cur_select.execute(sql_hlw_select) desc = cur_select.description col_len = len(desc) # MySQL批量插入语法是 insert into tb_name values(%s,%s,%s,%s) val_str = '' for i in range(1, col_len): val_str = val_str + '%s' + ',' val_str += '%s' # 拼接insert into 目标表 values #目标表插入语句 sql_insert_product = 'insert into bossInfo_prod_hlw_inst(acc_num)' \ + ' values(' + val_str + ')' print('开始执行insert_hlw:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) while True: # 每次获取100行,由cur_select.arraysize值决定,MySQL fetchmany 返回的是 tuple 数据类型 所以用list做类型转换 rows = list(cur_select.fetchmany(1)) # 批量插入每次100行,需要注意的是 rows 必须是 list [] 数据类型 cur_insert.executemany(sql_insert_product, rows) target_db.commit() # 提交 if not rows: break # 中断循环 print('执行update_hlw:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) sql_select_acc = 'SELECT acc_num from bossInfo_prod_hlw_inst where acc_state = "" or acc_state is null' cur_insert.execute(sql_select_acc) data_acc = cur_insert.fetchall() for acc in data_acc: sql_select_prod = "SELECT prod_inst_id, prod_id, owner_cust_id, use_cust_id, address_desc, region_id, status_cd, create_date " \ " from CUS_INST.XY_PROD_INST @to_crm30db where prod_use_type = '1000' and acc_num ='" + acc[0] + "'" cur_select.execute(sql_select_prod) rows = cur_select.fetchone() for i in range(8): rows[i] if rows[i] else "" sql_update_prod_inst = "UPDATE bossInfo_prod_hlw_inst set " \ " prod_inst_id =" + "'" + str(rows[0]) + \ "', prod_id =" + "'" + str(rows[1]) + \ "', owner_cust_id =" + "'" + str(rows[2]) + \ "', use_cust_id =" + "'" + str(rows[3]) + \ "', address_desc =" + "'" + str(rows[4]) + \ "', region_id =" + "'" + str(rows[5]) + \ "', acc_state =" + "'" + str(rows[6]) + \ "', create_date =" + "'" + str(rows[7]) + \ "' where acc_num ='" + acc[0] + "'" cur_insert.execute(sql_update_prod_inst) target_db.commit() # 提交 # 二更新 sql_select_prod = 'SELECT acc_num, prod_inst_id, owner_cust_id, use_cust_id, region_id, prod_id from bossInfo_prod_hlw_inst ' \ ' where product_name = "" or product_name is null' cur_insert.execute(sql_select_prod) data_prod = cur_insert.fetchall() for acc, prod, owner, use, region, prod_id in data_prod: # owner_cust_name sql_select_owner_cust_name = "select cust_name from CUS_CUST.XY_CUSTOMER@to_crm30db " \ " where cust_id = " + owner # use_cust_name sql_select_use_cust_name = "select cust_name from CUS_CUST.XY_CUSTOMER@to_crm30db " \ " where cust_id = " + use # cust_mobile_phone sql_select_cust_mobile_phone = "select distinct mobile_phone, home_phone from CUS_CUST.CONTACTS_INFO@to_crm30db where contact_id in (" \ "select contact_id from CUS_CUST.CUST_CONTACT_INFO_REL@to_crm30db " \ " where cust_id =" + owner + ")" # cust_manage sql_select_manager = "select attr_value from ord_so.ord_prod_inst_attr@to_crm30db " \ " where prod_inst_id =" + prod + "and attr_id = 4604" # manage_mobile_phone sql_select_manage_mobile = "select attr_value from ord_so.ord_prod_inst_attr@to_crm30db " \ " where prod_inst_id =" + prod + "and attr_id = 4605" # region_name sql_select_region_name = "SELECT region_name FROM bossInfo_common_region where common_region_id=" + "'" + region + "'" # product_name sql_select_product_name = "SELECT product_name FROM bossInfo_product where product_id=" + "'" + prod_id + "'" # ip sql_select_ip = "SELECT 代表IP FROM ipzlcx where 业务号 =" + "'" + acc + "'" # 收集结果集 cur_select.execute(sql_select_owner_cust_name) rows_owner_cust_name = cur_select.fetchone() cur_select.execute(sql_select_use_cust_name) rows_use_cust_name = cur_select.fetchone() cur_select.execute(sql_select_cust_mobile_phone) rows_cust_mobile_phone = cur_select.fetchone() cur_select.execute(sql_select_manager) rows_manager = cur_select.fetchone() cur_select.execute(sql_select_manage_mobile) rows_manage_mobile = cur_select.fetchone() cur_insert.execute(sql_select_region_name) rows_region_name = cur_insert.fetchone() cur_insert.execute(sql_select_product_name) rows_product_name = cur_insert.fetchone() # 插入空值 rows_owner_cust_name_str = rows_owner_cust_name[0] if rows_owner_cust_name else "" rows_use_cust_name_str = rows_use_cust_name[0] if rows_use_cust_name else "" rows_cust_mobile_phone_str = rows_cust_mobile_phone[0] if rows_cust_mobile_phone else "" rows_cust_mobile_phone_str = rows_cust_mobile_phone_str if rows_cust_mobile_phone_str else rows_cust_mobile_phone[1] rows_region_name_str = rows_region_name[0] if rows_region_name else "" rows_product_name_str = rows_product_name[0] if rows_product_name else "" rows_manager_str = rows_manager[0] if rows_manager else "" rows_manager_mobile_str = rows_manage_mobile[0] if rows_manage_mobile else "" # 跟新prod_inst sql_update_prod_inst = "UPDATE bossInfo_prod_hlw_inst set " \ " owner_cust_name =" + "'" + str(rows_owner_cust_name_str) + \ "', use_cust_name =" + "'" + str(rows_use_cust_name_str) + \ "', cust_mobile_phone =" + "'" + str(rows_cust_mobile_phone_str) + \ "', cust_manager =" + "'" + str(rows_manager_str) + \ "', manager_mobile_phone =" + "'" + str(rows_manager_mobile_str) + \ "', region_name =" + "'" + str(rows_region_name_str) + \ "', product_name =" + "'" + str(rows_product_name_str) + \ "' where prod_inst_id =" + prod cur_insert.execute(sql_update_prod_inst) target_db.commit() # 提交 print('执行成功hlw:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.close() cur_insert.close() source_db.close() target_db.close()
data_temp.py
import time import cx_Oracle import pymysql def sqlMigrate(): source_db = cx_Oracle.connect('********', '********', '133.0.**.**:**/xydb') # 源库 target_db = pymysql.connect('10.37.6.**', 'root', '********', 'enterprise_internet') # 目标库 # 2.创建游标 cur_select = source_db.cursor() # 源库查询对象 cur_insert = target_db.cursor() # 目标库插入对象 # 3.执行SQL sql_select_product = "SELECT product_id, product_name, flag1, flag2 FROM xy_report.ldl_zq_zw_hlw_product " cur_select.execute(sql_select_product) # 获取源表有多少个列 desc = cur_select.description col_len = len(desc) # MySQL批量插入语法是 insert into tb_name values(%s,%s,%s,%s) val_str = '' for i in range(1, col_len): val_str = val_str + '%s' + ',' val_str += '%s' # 拼接insert into 目标表 values #目标表插入语句 sql_insert_product = 'insert into bossInfo_product(product_id, product_name,flag_provinces,flag_hlzw)' \ + ' values(' + val_str + ')' # order_status sql_select_order_status = "select attr_value,attr_value_name,attr_value_desc " \ "from cpcp_spec.attr_value@to_crm30db where attr_id ='9990010422'" sql_insert_order_status = 'insert into bossInfo_order_status(order_value, value_name,value_desc)' \ + ' values(%s,%s,%s)' # acc_status sql_select_acc_status = "select attr_value,attr_value_name,attr_value_desc " \ "from cpcp_spec.attr_value@to_crm30db where attr_id ='9990010486'" sql_insert_acc_status = 'insert into bossInfo_acc_status(acc_value, acc_name,acc_desc)' \ + ' values(%s,%s,%s)' sql_select_service_offer = "SELECT service_offer_id,service_offer_name from CPCP_SPEC.SERVICE_OFFER@to_crm30db" sql_insert_service_offer = 'insert into bossInfo_service_offer(service_offer_id, service_offer_name)' \ + ' values(%s,%s)' sql_select_common_region = "select common_region_id, par_region_id, region_nbr, region_name " \ "from cpcp_spec.common_region@to_crm30db where lan_id = 8420600" sql_insert_common_region = 'insert into bossInfo_common_region(common_region_id, par_region_id, region_nbr, region_name )' \ + ' values(%s,%s,%s,%s)' print('开始执行product:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.execute(sql_select_product) # 执行 while True: # 每次获取100行,由cur_select.arraysize值决定,MySQL fetchmany 返回的是 tuple 数据类型 所以用list做类型转换 rows = list(cur_select.fetchmany(10)) # 批量插入每次100行,需要注意的是 rows 必须是 list [] 数据类型 cur_insert.executemany(sql_insert_product, rows) target_db.commit() # 提交 if not rows: break # 中断循环 print('开始执行order_status:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.execute(sql_select_order_status) # 执行 while True: # 每次获取100行,由cur_select.arraysize值决定,MySQL fetchmany 返回的是 tuple 数据类型 所以用list做类型转换 rows = list(cur_select.fetchmany(10)) # 批量插入每次100行,需要注意的是 rows 必须是 list [] 数据类型 cur_insert.executemany(sql_insert_order_status, rows) target_db.commit() # 提交 if not rows: break # 中断循环 print('开始执行acc_status:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.execute(sql_select_acc_status) # 执行 while True: # 每次获取100行,由cur_select.arraysize值决定,MySQL fetchmany 返回的是 tuple 数据类型 所以用list做类型转换 rows = list(cur_select.fetchmany(10)) # 批量插入每次100行,需要注意的是 rows 必须是 list [] 数据类型 cur_insert.executemany(sql_insert_acc_status, rows) target_db.commit() # 提交 if not rows: break # 中断循环 # Service_Offer print('开始执行Service_Offer:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.execute(sql_select_service_offer) # 执行 rows = list(cur_select.fetchall()) cur_insert.executemany(sql_insert_service_offer, rows) target_db.commit() # 提交 # common_region print('开始执行common_region:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.execute(sql_select_common_region) # 执行 rows = list(cur_select.fetchall()) cur_insert.executemany(sql_insert_common_region, rows) target_db.commit() # 提交 cur_select.close() cur_insert.close() source_db.close() target_db.close() print('执行成功:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
migrate.py
def sqlMigrate_boss(): source_db = cx_Oracle.connect('********', '********', '133.0.**.**:**/xydb') # 源库 target_db = pymysql.connect('10.37.6.**', 'root', '********', 'enterprise_internet') # 目标库 source_db_ip = pymysql.connect(host='10.36.*.*', user='ip**', passwd='*******', db='ipzl', port=*6) # ip # 2.创建游标 cur_select = source_db.cursor() # 源库查询对象 cur_insert = target_db.cursor() # 目标库插入对象 cur_select_ip = source_db_ip.cursor() # ip # 3.执行SQL sql_select_prod_inst = "SELECT distinct prod_inst_id,order_item_id,acc_num,address_desc,prod_id," \ "owner_cust_id,use_cust_id,region_id,create_date " \ "FROM ORD_SO.ORD_PROD_INST@to_crm30db where prod_id in (" \ "select product_id from lh_zq_lx_product) " \ "and lan_id = '8420600'" \ "and to_char(create_date, 'yyyy-mm-dd') = to_char(sysdate - 1, 'yyyy-mm-dd')" # insert into order_item 目标表 values sql_insert_order_item = 'insert into bossInfo_order_item(prod_inst_id,order_item_id,acc_num,owner_cust_id,use_cust_id,region_id,product_id) ' \ 'values(%s,%s,%s,%s,%s,%s,%s)' sql_insert_order_item_r = 'select prod_inst_id,order_item_id from bossInfo_order_item where status_cd not in (30120)' print('开始执行insert_order_item:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.execute(sql_select_prod_inst) while True: # 每次获取100行,由cur_select.arraysize值决定,MySQL fetchmany 返回的是 tuple 数据类型 所以用list做类型转换 rows = list(cur_select.fetchmany(1)) if not rows: break # 中断循环 # sql_insert_order_item cur_insert.execute(sql_insert_order_item_r) data = cur_insert.fetchall() data_insert = str(rows[0][0]), str(rows[0][1]) rows_insert = [(str(rows[0][0]), str(rows[0][1]), str(rows[0][2]), str(rows[0][5]), str(rows[0][6]), str(rows[0][7]), str(rows[0][4]))] if data_insert not in data: cur_insert.executemany(sql_insert_order_item, rows_insert) target_db.commit() # 提交 print('开始执行update_order_item:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_insert.execute( 'select prod_inst_id,order_item_id, product_id from bossInfo_order_item where create_date = "" or create_date is null') data_order = cur_insert.fetchall() for prod, order, prod_id in data_order: sql_select_order = "SELECT cust_order_id, status_cd, service_offer_id, service_offer_name, apply_obj_spec_name, create_staff_name, " \ "create_org_name, update_staff_name, update_org_name,accept_region_id,create_date FROM ORD_SO.ORDER_ITEM @ to_crm30db " \ "where order_item_id =" + order # 收集结果集 cur_select.execute(sql_select_order) rows_order = cur_select.fetchone() for i in range(9): rows_order[i] if rows_order[i] else "" # 更新 order_item sql_update_order_item = "UPDATE bossInfo_order_item set cust_order_id = " + "'" + str(rows_order[0]) + \ "', status_cd =" + "'" + str(rows_order[1]) + \ "', service_offer_id =" + "'" + str(rows_order[2]) + \ "', service_offer_name =" + "'" + str(rows_order[3]) + \ "', apply_obj_spec_name =" + "'" + str(rows_order[4]) + \ "', create_staff_name =" + "'" + str(rows_order[5]) + \ "', create_org_name =" + "'" + str(rows_order[6]) + \ "', update_staff_name =" + "'" + str(rows_order[7]) + \ "', update_org_name =" + "'" + str(rows_order[8]) + \ "', accept_region_id =" + "'" + str(rows_order[9]) + \ "', create_date =" + "'" + str(rows_order[10]) + "'" + \ " where order_item_id =" + order cur_insert.execute(sql_update_order_item) target_db.commit() # 提交 # 更新 flag_hlzw sql_select_order_hlzw = "SELECT flag_provinces, flag_hlzw FROM bossInfo_product WHERE product_id =" + prod_id cur_insert.execute(sql_select_order_hlzw) rows_order_hlzw = cur_insert.fetchone() sql_update_order_hlzw = "UPDATE bossInfo_order_item set flag_provinces =" + "'" + str(rows_order_hlzw[0]) + \ "',flag_hlzw=" + "'" + str(rows_order_hlzw[1]) + \ "'where cust_order_id =" + str(rows_order[0]) cur_insert.execute(sql_update_order_hlzw) target_db.commit() # 提交 # 更新 cust_order_nbr sql_select_order_nbr = "SELECT cust_order_nbr FROM ORD_SO.CUSTOMER_ORDER @ to_crm30db " \ " where cust_order_id =" + str(rows_order[0]) cur_select.execute(sql_select_order_nbr) rows_order_nbr = cur_select.fetchone() sql_update_order_nbr = "UPDATE bossInfo_order_item set cust_order_nbr =" + str(rows_order_nbr[0]) + \ " where cust_order_id =" + str(rows_order[0]) cur_insert.execute(sql_update_order_nbr) target_db.commit() # 提交 print('开始执行prod_inst:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) sql_select_order_item = "SELECT cust_order_nbr, prod_inst_id, acc_num, owner_cust_id, use_cust_id,region_id ," \ "order_item_id,flag_provinces,flag_hlzw FROM bossInfo_order_item " \ "where status_cd not in ('30120')" cur_insert.execute(sql_select_order_item) data_order = cur_insert.fetchall() for order_nbr, prod, acc, owner, use, region, order, provinces, hlzw in data_order: sql_select_status_cd = "SELECT status_cd,service_offer_id FROM ORD_SO.ORDER_ITEM @ to_crm30db " \ " where order_item_id =" + order + " and obj_id =" + prod # 收集结果集 cur_select.execute(sql_select_status_cd) rows_status = cur_select.fetchone() rows_status_str = rows_status[0] if rows_status else "" rows_service_offer = rows_status[1] if rows_status else "" # print(rows_status_str, rows_service_offer, provinces,hlzw) # 新装竣工 if rows_status_str == "301200" and rows_service_offer == 4010100000 \ and provinces == "1" and hlzw == "hlw_zx": rows_insert = [(str(order_nbr), str(prod), str(acc), str(owner), str(use), str(region))] # insert_prod sql_insert_prod_inst = "insert into bossInfo_prod_hlw_inst(cust_order_nbr, prod_inst_id, acc_num, owner_cust_id, use_cust_id, region_id) " \ "values(%s,%s,%s,%s,%s,%s)" cur_insert.executemany(sql_insert_prod_inst, rows_insert) print("新装insert:" + acc) target_db.commit() # 提交 # # 是否重复 # sql_insert_prod_inst_r = 'select prod_inst_id,acc_num from bossInfo_prod_hlw_inst' # # sql_insert_r # cur_insert.execute(sql_insert_prod_inst_r) # data = cur_insert.fetchall() # data_insert = str(rows_insert[0][1]), str(rows_insert[0][2]) # if data_insert not in data: # cur_insert.executemany(sql_insert_prod_inst, rows_insert) # target_db.commit() # 提交 # owner_cust_name sql_select_owner_cust_name = "select cust_name from CUS_CUST.XY_CUSTOMER@to_crm30db " \ " where cust_id = " + owner # use_cust_name sql_select_use_cust_name = "select cust_name from CUS_CUST.XY_CUSTOMER@to_crm30db " \ " where cust_id = " + use # cust_mobile_phone ql_select_cust_mobile_phone = "select mobile_phone from CUS_CUST.CONTACTS_INFO@to_crm30db where contact_id = (" \ "select contact_id from CUS_CUST.CUST_CONTACT_INFO_REL@to_crm30db " \ " where cust_id =" + owner + ")" # sql_select_prod_inst = "SELECT address_desc, prod_id, acc_prod_name, create_date FROM ORD_SO.ORD_PROD_INST @ to_crm30db " \ " where prod_inst_id =" + prod + "and order_item_id=" + order # cust_manage sql_select_manager = "select attr_value from ord_so.ord_prod_inst_attr@to_crm30db " \ " where prod_inst_id =" + prod + "and attr_id = 4604" # manage_mobile_phone sql_select_manage_mobile = "select attr_value from ord_so.ord_prod_inst_attr@to_crm30db " \ " where prod_inst_id =" + prod + "and attr_id = 4605" # region_name sql_select_region_name = "SELECT region_name FROM bossInfo_common_region where common_region_id=" + "'" + region + "'" # ip sql_select_ip = "SELECT 代表IP FROM ipzlcx where 业务号 =" + "'" + acc + "'" # 收集结果集 cur_select.execute(sql_select_owner_cust_name) rows_owner_cust_name = cur_select.fetchone() cur_select.execute(sql_select_use_cust_name) rows_use_cust_name = cur_select.fetchone() cur_select.execute(ql_select_cust_mobile_phone) rows_cust_mobile_phone = cur_select.fetchone() cur_select.execute(sql_select_prod_inst) rows_prod_inst = cur_select.fetchone() cur_select.execute(sql_select_manager) rows_manager = cur_select.fetchone() cur_select.execute(sql_select_manage_mobile) rows_manage_mobile = cur_select.fetchone() cur_insert.execute(sql_select_region_name) rows_region_name = cur_insert.fetchone() cur_select_ip.execute(sql_select_ip) rows_ip = cur_select_ip.fetchone() # 插入空值 rows_owner_cust_name_str = rows_owner_cust_name[0] if rows_owner_cust_name else "" rows_use_cust_name_str = rows_use_cust_name[0] if rows_use_cust_name else "" rows_cust_mobile_phone_str = rows_cust_mobile_phone[0] if rows_cust_mobile_phone else "" rows_ip_str = rows_ip[0] if rows_ip else "" rows_region_name_str = rows_region_name[0] if rows_region_name else "" for i in range(4): rows_prod_inst[i] if rows_prod_inst[i] else "" rows_manager_str = rows_manager[0] if rows_manager else "" rows_manager_mobile_str = rows_manage_mobile[0] if rows_manage_mobile else "" # 跟新prod_inst sql_update_prod_inst = "UPDATE bossInfo_prod_hlw_inst set owner_cust_name =" + "'" + str( rows_owner_cust_name_str) + \ "', use_cust_name =" + "'" + str(rows_use_cust_name_str) + \ "', cust_mobile_phone =" + "'" + str(rows_cust_mobile_phone_str) + \ "', address_desc =" + "'" + str(rows_prod_inst[0]) + \ "', prod_id =" + "'" + str(rows_prod_inst[1]) + \ "', product_name =" + "'" + str(rows_prod_inst[2]) + \ "', create_date =" + "'" + str(rows_prod_inst[3]) + \ "', cust_manager =" + "'" + str(rows_manager_str) + \ "', manager_mobile_phone =" + "'" + str(rows_manager_mobile_str) + \ "', ip =" + "'" + str(rows_ip_str) + \ "', acc_state ='100000', acc_state_name ='在用'" \ ", region_name =" + "'" + str(rows_region_name_str) + \ "' where cust_order_nbr =" + order_nbr + " and prod_inst_id =" + prod cur_insert.execute(sql_update_prod_inst) target_db.commit() # 提交 # 拆机 if rows_status_str == "301200" and rows_service_offer in (4020100000, 4020300003)\ and provinces == "1" and hlzw == "hlw_zx": print("拆机:" + acc) sql_delete_prod_inst = "SET foreign_key_checks = 0" cur_insert.execute(sql_delete_prod_inst) sql_delete_prod_inst = "DELETE FROM bossInfo_prod_hlw_inst WHERE acc_num = " + "'" + acc + "'" cur_insert.execute(sql_delete_prod_inst) sql_delete_prod_inst = "SET foreign_key_checks = 1" cur_insert.execute(sql_delete_prod_inst) sql_select_acc_num = "SELECT acc_num FROM bossInfo_prod_hlw_inst " cur_insert.execute(sql_select_acc_num) data_acc_num = cur_insert.fetchall() if (acc,) in data_acc_num: # 过户 if rows_status_str == "301200" and rows_service_offer == 4040400000 \ and provinces == "1" and hlzw == "hlw_zx": print("过户:" + acc) # owner_cust_id sql_select_prod_inst = "SELECT owner_cust_id, use_cust_id from CUS_INST.XY_PROD_INST@to_crm30db " \ " WHERE acc_num = " + "'" + acc + "'" cur_select.execute(sql_select_prod_inst) rows_cust = cur_select.fetchone() sql_update_prod_inst = "UPDATE bossInfo_prod_hlw_inst set owner_cust_id =" + "'" + str(rows_cust[0]) + \ "',use_cust_id=" + "'" + str(rows_cust[1]) + \ "'WHERE acc_num = " + "'" + acc + "'" cur_insert.execute(sql_update_prod_inst) # owner_cust_name sql_select_owner_cust_name = "select cust_name from CUS_CUST.XY_CUSTOMER@to_crm30db " \ " where cust_id = " + str(rows_cust[0]) # use_cust_name sql_select_use_cust_name = "select cust_name from CUS_CUST.XY_CUSTOMER@to_crm30db " \ " where cust_id = " + str(rows_cust[1]) # 收集结果集 cur_select.execute(sql_select_owner_cust_name) rows_owner_cust_name = cur_select.fetchone() cur_select.execute(sql_select_use_cust_name) rows_use_cust_name = cur_select.fetchone() cur_insert.execute(sql_update_prod_inst) # 插入空值 rows_owner_cust_name_str = rows_owner_cust_name[0] if rows_owner_cust_name else "" rows_use_cust_name_str = rows_use_cust_name[0] if rows_use_cust_name else "" # 跟新prod_inst sql_update_prod_inst = "UPDATE bossInfo_prod_hlw_inst set owner_cust_name =" + "'" \ + str(rows_owner_cust_name_str) + "', use_cust_name =" + "'" \ + str(rows_use_cust_name_str) + "' WHERE acc_num = " + "'" + acc + "'" cur_insert.execute(sql_update_prod_inst) target_db.commit() # 提交 # 移机 if rows_status_str == "301200" and rows_service_offer == 4030100000 \ and provinces == "1" and hlzw == "hlw_zx": # address_desc sql_select_address_desc = "SELECT address_desc from CUS_INST.XY_PROD_INST@to_crm30db " \ "where prod_use_type = '1000' and acc_num = " + "'" + acc + "'" cur_select.execute(sql_select_address_desc) rows_address = cur_select.fetchone() sql_update_address_desc = "UPDATE bossInfo_prod_hlw_inst set address_desc =" + "'" + str( rows_address[0]) + \ "'WHERE acc_num = " + "'" + acc + "'" cur_insert.execute(sql_update_address_desc) # 停机保号 if rows_status_str == "301200" and rows_service_offer == 4060100002 \ and provinces == "1" and hlzw == "hlw_zx": # status_cd sql_select_status_cd = "SELECT address_desc from CUS_INST.XY_PROD_INST@to_crm30db " \ "where prod_use_type = '1000' and acc_num = " "'" + acc + "'" cur_select.execute(sql_select_status_cd) rows_status = cur_select.fetchone() if rows_status: sql_update_status_cd = "UPDATE bossInfo_prod_hlw_inst set status_cd =" + "'" + str(rows_status[0]) + \ "'WHERE acc_num = ""'" + acc + "'" cur_insert.execute(sql_update_status_cd) sql_update_order_status = " UPDATE bossInfo_order_item set status_cd =" + rows_status_str + \ " where order_item_id =" + order + " and prod_inst_id =" + prod cur_insert.execute(sql_update_order_status) target_db.commit() # 提交 print('执行成功:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select.close() cur_insert.close() cur_select_ip.close() source_db.close() target_db.close() source_db_ip.close()
import time import pymysql def sqlMigrate_ip(): source_db_ip = pymysql.connect(host='10.', user='ip**', passwd='******', db='******', port=****) # ip target_db = pymysql.connect(host='10.', user='root', passwd='******', db='*******', port=3306) # 目标库 # 2.创建游标 cur_select_ip = source_db_ip.cursor() # ip源库查询对象 cur_insert = target_db.cursor() # 目标库插入对象 # 3.执行SQL sql_ip_select = "SELECT * FROM ipzlcx" # 获取源表有多少个列 cur_select_ip.execute(sql_ip_select) desc = cur_select_ip.description col_len = len(desc) # MySQL批量插入语法是 insert into tb_name values(%s,%s,%s,%s) val_str = '' for i in range(1, col_len): val_str = val_str + '%s' + ',' val_str += '%s' # 拼接insert into 目标表 values #目标表插入语句 sql_insert_ip = 'insert into ipInfo_ip(region_name,ip_r,acc_num,owner_cust_name,bras_ip,' \ 'bras_rate,boss_rate,contract_rate,ip_f,ip_e,disassembled,web_port,interface,' \ 'vlan,address_desc,cust_manager,remark)' \ + ' values(' + val_str + ')' print('开始执行insert_ip:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) while True: # 每次获取100行,由cur_select.arraysize值决定,MySQL fetchmany 返回的是 tuple 数据类型 所以用list做类型转换 rows = list(cur_select_ip.fetchmany(100)) # 批量插入每次100行,需要注意的是 rows 必须是 list [] 数据类型 cur_insert.executemany(sql_insert_ip, rows) target_db.commit() # 提交 if not rows: break # 中断循环 print('执行成功insert_ip:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_select_ip.close() cur_insert.close() target_db.close() source_db_ip.close() sqlMigrate_ip()
import time import cx_Oracle import pymysql def sqlMigrate(): Oracle_db = cx_Oracle.connect('xy_', 'xyrpt**', '133.0.*.*:5/xydb') # 源库 Mysql_db = pymysql.connect('10.36.*.*', 'root', 'mysql*', 'target5g') # 目标库 # 2.创建游标 cur_Oracle = Oracle_db.cursor() # 源库查询对象 cur_Mysql = Mysql_db.cursor() # 目标库插入对象 # 3.执行SQL sql_Mysql_select_serv = "SELECT serv_id, product_offer_instance_id2 FROM targetdata_lhtarget5g where flag_5G is null" # print('开始执行serv', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) cur_Mysql.execute(sql_Mysql_select_serv) # 执行 rows_serv = cur_Mysql.fetchall() print(rows_serv) for serv, offer in rows_serv: sql_select_offer = "SELECT offer_id, offer_inst_id from CUS_INST.XY_OFFER_INST@to_crm30db where " \ "offer_type = 11 and status_cd = '1000' and offer_id in " \ "(select offer_id from xy_user.offer where offer_type=11 and offer_name like '%5G%')" \ "and offer_inst_id in (" \ "SELECT offer_inst_id from CUS_INST.XY_OFFER_PROD_INST_REL@to_crm30db " \ "where prod_inst_id =" + serv + ")" cur_Oracle.execute(sql_select_offer) rows_offer = cur_Oracle.fetchone() if rows_offer != None: print(offer, rows_offer[0], rows_offer[1]) sql_update_offer = "update targetdata_lhtarget5g set offer_id =" + "'" + str(rows_offer[0]) + "', flag_5G ="+"'1'" + "WHERE serv_id =" + serv sql_update_offerName = "update targetdata_lhtarget5g set offer_name = (SELECT offer_name from offer where offer_id ='" + str(rows_offer[0]) + "')" + "WHERE serv_id =" + serv cur_Mysql.execute(sql_update_offer) cur_Mysql.execute(sql_update_offerName) Mysql_db.commit() # 提交 sql_Oracle_order = "SELECT order_item_id, create_date from ORD_SO.ORD_OFFER_INST@to_crm30db " \ "where to_char(create_date, 'yyyy-mm-dd') > to_char(SYSDATE - 10, 'yyyy-mm-dd') and oper_type = '1000' and offer_inst_id =" + str(rows_offer[1]) cur_Oracle.execute(sql_Oracle_order) rows_order = cur_Oracle.fetchone() if rows_order != None: sql_update_order = "update targetdata_lhtarget5g set order_item_id =" + "'" + str(rows_order[0]) + "', create_date =" + "'" + str(rows_order[1]) + "'" + "WHERE product_offer_instance_id2 =" + offer cur_Mysql.execute(sql_update_order) Mysql_db.commit() # 提交 sql_Oracle_staff = "SELECT create_staff_name, create_org_name, region_id FROM ORD_SO.ORDER_ITEM @ to_crm30db where order_item_id =" + str(rows_order[0]) cur_Oracle.execute(sql_Oracle_staff) rows_staff = cur_Oracle.fetchone() sql_update_staff = "update targetdata_lhtarget5g set update_staff_name =" + "'" + str(rows_staff[0]) + "', " \ "update_org_name =" + "'" + str(rows_staff[1]) + "'," \ " region_id =" + "'" + str(rows_staff[2]) + "'"+\ "WHERE product_offer_instance_id2 =" + offer cur_Mysql.execute(sql_update_staff) Mysql_db.commit() # 提交 cur_Oracle.close() cur_Mysql.close() Oracle_db.close() Mysql_db.close() print('执行成功:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) sqlMigrate()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。