当前位置:   article > 正文

python pymysql multiprocessing.dummy多线程 读写数据库报错_from multiprocessing.dummy import queue 不存在

from multiprocessing.dummy import queue 不存在


一、例子

  • 需求
    使用多线程下载视频到本地,将视频的名字保存在数据库表中,数据库表中不能保存重复视频名字
  • demo.py
from multiprocessing.dummy import Pool
import traceback
import requests
import pymysql
import os
# 习惯函数名开头大写,变量名开头小写,还没适应Python写代码规范,见谅

# 数据库链接类
class KsMySql:
    def __init__(self):
        self.conn = pymysql.Connect(host='127.0.0.1', port=3306, user='root', password='1234567', db='pythonspider', charset='utf8mb4') # 普通链接
        self.cursor = self.conn.cursor()

    # 是否保存过了通过视频名称查找, 返回true为不存在,false为存在
    def IsSaveVideoByName(self, filename):
        try:
            self.cursor.execute('select * from ksvideoinfo where filename = "%s"' %(filename))
            result = self.cursor.fetchone()
            return result is not None
        except:
            print('IsSaveVideoByName 查询错误')
            traceback.print_exc()
            return True

    # 插入视频信息
    def SaveVideoInfo(self, filename):
        try:
            self.cursor.execute('insert into ksvideoinfo(filename) values("%s")'%(filename))
            self.conn.commit()
            print('SaveVideoInfo 插入数据成功')
        except Exception as e:
            self.conn.rollback()
            print('SaveVideoInfo 插入数据错误')
            print(e)
            traceback.print_exc()

    def __del__(self):
        self.cursor.close()
        self.conn.close()

# 全局变量
ksmysql = KsMySql()# 数据库类实例
infolist = []
dirName = 'E:/AllWorkSpace1/Pytharm/pythonProjectPaWeb/Testdemo'# 保存目录
if not os.path.exists(dirName):
    os.mkdir(dirName)

def Select():
    name = '杨洋迪丽热巴《烟火星辰》,用歌声致敬中国航天'
    # 需求3:数据库表中不能保存重复视频名字(这里只是模拟)
    isSave = ksmysql.IsSaveVideoByName(name)

    # 为了方便,默认为不存在,直接添加url到list中
    mp4url = 'https://video.pearvideo.com/mp4/short/20220206/cont-1751191-15823342-hd.mp4'
    infolist.append({'name': name, 'videoUrl': mp4url})

def SaveInfo(dic):
    name = dic['name']
    pathName = dirName + '/' + name + '.mp4'
    url = dic['videoUrl']
    try:
        # if not os.path.exists(pathName):
        mp4Data = requests.get(url=url).content # 从网络下载视频
        with open(pathName, 'wb') as f:# 需求1:视频保存在本地
            f.write(mp4Data)
            print(name, "下载完成")
        # else:
        #     print(name,'已存在,无需下载')
        # 需求2:视频的名字保存在数据库表中
        ksmysql.SaveVideoInfo(name)
    except Exception as e:
        print(name, '下载失败失败或者保存数据库失败')
        print(e)
        traceback.print_exc()

def Main():
    pool1 = Pool(20) # 线程池
    for cur in range(0, 100):
        infolist.clear()
        Select()
        pool1.map(SaveInfo, infolist) # 使用多线程下载
    pool1.close()
    pool1.join()

Main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

二、报错及原因

  • 常见错误
    1). Packet sequence number wrong
    2). Exception _mysql_exceptions.OperationalError: (2013, ‘Lost connection to MySQL server during query’)
    3). pymysql AttributeError: ‘NoneType‘ object has no attribute ‘settimeout‘
  • 原因
    如上demo.py,是因为各个线程共享同一个数据库链接而导致的错误

三、解决方法

1.在每个execute前加上互斥锁

如:

...同上
import threading
class KsMySql:
    def __init__(self):
        self.conn = pymysql.Connect(host='127.0.0.1', port=3306, user='root', password='tiger', db='pythonspider', charset='utf8mb4') # 普通链接
        self.cursor = self.conn.cursor()
		self.lock = threading.Lock()# 实例化
		
	# 是否保存过了通过视频名称查找, 返回true为不存在,false为存在
    def IsSaveVideoByName(self, filename):
        try:
        	self.lock.acquire() # 上锁
            self.cursor.execute('select * from ksvideoinfo where filename = "%s"' %(filename))
            result = self.cursor.fetchone()
            self.lock.release() # 解锁
            return result is not None
        except:
            print('IsSaveVideoByName 查询错误')
            traceback.print_exc()
            return True
...同上
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

但经过我个人测试发现,没有用,还是会报新错,这个方法理论上是没问题的,但是在multiprocessing.dummy多线程情况下却不行。仅代表我个人想法,也许自己能力不足,哪里写错了

2.在pool1.map(func, list)中参数的func函数中,实例化一个数据库对象

...
def SaveInfo(dic):
	ksmysql = KsMySql()# 数据库类实例
    name = dic['name']
    pathName = dirName + '/' + name + '.mp4'
    url = dic['videoUrl']
    try:
        # if not os.path.exists(pathName):
        mp4Data = requests.get(url=url).content # 从网络下载视频
        with open(pathName, 'wb') as f:# 需求1:视频保存在本地
            f.write(mp4Data)
            print(name, "下载完成")
        # else:
        #     print(name,'已存在,无需下载')
        # 需求2:视频的名字保存在数据库表中
        ksmysql.SaveVideoInfo(name)
    except Exception as e:
        print(name, '下载失败失败或者保存数据库失败')
        print(e)
        traceback.print_exc()
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

可以完美解决,因为这样每个线程都有自己的数据库链接对象。
优点:简单、方便
缺点:每调用SaveInfo函数一次就建立一个数据库链接,并函数结束时关闭链接,可能性能有损

3.在KsMySql数据库链接类中使用数据库链接池获取链接,将pool链接池为类对象

...
from dbutils.pooled_db import PooledDB

class KsMySql:
    pool = None
    def __init__(self):
        # self.conn = pymysql.Connect(host='127.0.0.1', port=3306, user='root', password='tiger', db='pythonspider', charset='utf8mb4') # 普通链接,每实例化一个对象就会新建一个链接
        self.conn = KsMySql.Getmysqlconn()# 从链接池中获取链接
        self.cursor = self.conn.cursor()

    # 静态方法
    @staticmethod
    def Getmysqlconn():
        if KsMySql.pool is None:
            mysqlInfo = {
                "host": '127.0.0.1',
                "user": 'root',
                "passwd": 'tiger',
                "db": 'pythonspider',
                "port": 3306,
                "charset": 'utf8mb4'
            }
            KsMySql.pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host=mysqlInfo['host'],
                              user=mysqlInfo['user'], passwd=mysqlInfo['passwd'], db=mysqlInfo['db'],
                              port=mysqlInfo['port'], charset=mysqlInfo['charset'], blocking=True)
            print(KsMySql.pool)
        # else:
            # print('新KsMySql实例,从数据库链接池获取链接')
        return KsMySql.pool.connection()
        ...
         def __del__(self):
	        # 链接不是真正的被关闭,而是放回链接池中
	        self.cursor.close()
	        self.conn.close()

def SaveInfo(dic):
	ksmysql = KsMySql()# 同样要写上实例化数据库类对象
	...
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

注意

KsMySql.pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host=mysqlInfo['host'],
                              user=mysqlInfo['user'], passwd=mysqlInfo['passwd'], db=mysqlInfo['db'],
                              port=mysqlInfo['port'], charset=mysqlInfo['charset'], blocking=True)
'''
blocking参数,代表当链接都被占用了,是否等待新的空闲链接
True :等待, 可能影响程序速度
False:不等待,(个人猜测。。好像是代表同用已占有的数据库链接对象,会重复一开始的报错),反正会报错,最好写成True
'''
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可以完美解决,因为这样每个线程也都有自己的数据库链接对象。
优点:从链接池中获取自己的链接,优化点性能把
缺点:代码稍微复杂,坑多。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/74914
推荐阅读
相关标签
  

闽ICP备14008679号