赞
踩
首先我们组要明白为什么要使用队列,队列的性质,
多线程并发编程的重点,是线程之间共享数据的访问问题和线程之间的通信问题
为了解决线程之间数据共享问题, PYTHON 提供了一个数据类型【队列】可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突。
正常请求的多线程,如果是消费之和生产者,通过列表实现,多线程会对列表中的数据取值,会出现同时访问列表数据的情况,这时候就需要对线程进行加锁或者是线程等待,手动进行解决,过于麻烦,但是队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。
Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上.
构造一个优先队列。maxsize用法同上。
Queue.Queue(maxsize=0)
#FIFO, 用来定义队列的长度,如果maxsize小于1就表示队列长度无限,
Queue.LifoQueue(maxsize=0)
#LIFO, 如果maxsize小于1就表示队列长度无限
Queue.qsize()
#返回队列的大小
Queue.empty()
#如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息
Queue.full()
# 如果队列满了,返回True,反之False,给生产者提醒
Queue.get([block[, timeout]])
读队列,timeout等待时间
Queue.put(item, [block[, timeout]])
写队列,timeout等待时间
Queue.queue.clear()
清空队列
task_done()
#意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join()
#阻塞调用线程,直到队列中的所有任务被处理掉。只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done((意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
Python多线程主要是为了提高程序在IO方面的优势,在爬虫的过程中显得尤为重要。正常的爬虫请求直接封装多线程就ok,但是爬虫请求的过程中,对于url的请求需要通过队列来实现,这是队列的需求之一。
对于爬虫的请求地址来说,一般是有规律性可循的,如果是翻页数据,可以将请求到的url放到队列中,通过多线程对队列进行取数据,如果队列为空,线程判断自动等待,循环加入队列url,线程自动请求,以下伪代码,作为参考:
import threading from queue import Queue class ThreadCrawl(threading.Thread): def __init__(self, threadName, idQueue): # 继承父类的方法 super(ThreadCrawl, self).__init__() self.threadName = threadName # 线程名字 def run(self): print('启动' + self.threadName) while not self.idQueue.empty(): try: id = self.idQueue.get(False) # False 如果队列为空,抛出异常 time.sleep(1) print("~"*300) self.get_con(id) except Exception as e: print('队列为空。。。。。', e) pass print('#'*300) def get_con(self): #自己封装的请求自定义 pass def get_id(m, n): conn = psycopg2.connect(database='postgres', user='postgres', password='123456', host='127.0.0.1', port='5432') cur = conn.cursor() sql1 = 'SELECT doc_id from id LIMIT {} offset {};'.format(m, n) cur.execute(sql1) data = cur.fetchall() conn.commit() return data def main(): n = 60 while True: m = 20 # m是固定值,一次去20条, n是第几条开始 print('开始采集n的值为', n) if n == 200000: break # id的队列 idQueue = Queue(20) if idQueue.empty(): data = get_id(m, n) for i in data: idQueue.put(i[0]) # 采集线程的数量 crawlList = [] for id in range(1, 2): name = '采集线程{}'.format(id) crawlList.append(name) # 存储采集线程的列表集合 threadcrawl = [] for threadName in crawlList: thread = ThreadCrawl(threadName, idQueue) thread.start() threadcrawl.append(thread) for thread in threadcrawl: thread.join() n = n + m print("主线程退出..............") if __name__ == '__main__': main()
以上代码是作者从数据库中取数据,间隔性取,然后拼装到url,进行请求。
import threading import time from queue import Queue def put_id(): i = 0 while True: i = i + 1 print("添加数据", i, id_queue.qsize()) time.sleep(0.1) id_queue.put(i) def get_id(m): while True: i = id_queue.get() print("线程", m, '取值', i) if __name__ == "__main__": id_queue = Queue(20) Th1 = threading.Thread(target=put_id, ) Th2 = threading.Thread(target=get_id, args=(2, )) Th3 = threading.Thread(target=get_id, args=(3, )) Th5 = threading.Thread(target=get_id, args=(4, )) Th4 = threading.Thread(target=get_id, args=(5, )) Th2.start() Th1.start() Th3.start() Th4.start() Th5.start()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。