当前位置:   article > 正文

Python多线程——队列(Queue)_python 多线程任务过程中产生的队列条件写入队列,

python 多线程任务过程中产生的队列条件写入队列,

Queue主要就是为多线程生产值、消费者之间线程通信提供服务,具有先进先出的数据结构。

首先我们组要明白为什么要使用队列,队列的性质,

多线程并发编程的重点,是线程之间共享数据的访问问题和线程之间的通信问题
为了解决线程之间数据共享问题, PYTHON 提供了一个数据类型【队列】可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突。
正常请求的多线程,如果是消费之和生产者,通过列表实现,多线程会对列表中的数据取值,会出现同时访问列表数据的情况,这时候就需要对线程进行加锁或者是线程等待,手动进行解决,过于麻烦,但是队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。

FIFO

Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

LIFO

Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上.

priority

构造一个优先队列。maxsize用法同上。

基本方法:

Queue.Queue(maxsize=0) #FIFO, 用来定义队列的长度,如果maxsize小于1就表示队列长度无限,
Queue.LifoQueue(maxsize=0) #LIFO, 如果maxsize小于1就表示队列长度无限
Queue.qsize() #返回队列的大小
Queue.empty() #如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息
Queue.full() # 如果队列满了,返回True,反之False,给生产者提醒
Queue.get([block[, timeout]]) 读队列,timeout等待时间
Queue.put(item, [block[, timeout]]) 写队列,timeout等待时间
Queue.queue.clear() 清空队列
task_done() #意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join() #阻塞调用线程,直到队列中的所有任务被处理掉。只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done((意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

队列需求一(爬虫的请求地址)

Python多线程主要是为了提高程序在IO方面的优势,在爬虫的过程中显得尤为重要。正常的爬虫请求直接封装多线程就ok,但是爬虫请求的过程中,对于url的请求需要通过队列来实现,这是队列的需求之一。
对于爬虫的请求地址来说,一般是有规律性可循的,如果是翻页数据,可以将请求到的url放到队列中,通过多线程对队列进行取数据,如果队列为空,线程判断自动等待,循环加入队列url,线程自动请求,以下伪代码,作为参考:

import threading
from queue import Queue

class ThreadCrawl(threading.Thread):
    def __init__(self, threadName, idQueue):
        # 继承父类的方法
        super(ThreadCrawl, self).__init__()
        self.threadName = threadName          # 线程名字
    def run(self):
        print('启动' + self.threadName)
        while not self.idQueue.empty():
            try:
                id = self.idQueue.get(False)  # False 如果队列为空,抛出异常
                time.sleep(1)
                print("~"*300)
                self.get_con(id)

            except Exception as e:
                print('队列为空。。。。。', e)
                pass
            print('#'*300)

    def get_con(self):  #自己封装的请求自定义
        pass
def get_id(m, n):
    conn = psycopg2.connect(database='postgres', user='postgres', password='123456', host='127.0.0.1', port='5432')
    cur = conn.cursor()
    sql1 = 'SELECT doc_id from id LIMIT {} offset {};'.format(m, n)
    cur.execute(sql1)
    data = cur.fetchall()
    conn.commit()
    return data
def main():
    n = 60
    while True:
        m = 20
        # m是固定值,一次去20条, n是第几条开始
        print('开始采集n的值为', n)
        if n == 200000:
            break

        # id的队列
        idQueue = Queue(20)
        if idQueue.empty():
            data = get_id(m, n)
            for i in data:
                idQueue.put(i[0])

        # 采集线程的数量
        crawlList = []
        for id in range(1, 2):
            name = '采集线程{}'.format(id)
            crawlList.append(name)

        # 存储采集线程的列表集合
        threadcrawl = []
        for threadName in crawlList:
            thread = ThreadCrawl(threadName, idQueue)
            thread.start()
            threadcrawl.append(thread)

        for thread in threadcrawl:
            thread.join()
        n = n + m
    print("主线程退出..............")
if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

以上代码是作者从数据库中取数据,间隔性取,然后拼装到url,进行请求。

队列需求二(生产者、消费者模型)

import threading
import time
from queue import Queue

def put_id():
    i = 0
    while True:
        i = i + 1
        print("添加数据", i, id_queue.qsize())
        time.sleep(0.1)
        id_queue.put(i)

def get_id(m):
    while True:
        i = id_queue.get()
        print("线程", m, '取值', i)


if __name__ == "__main__":


    id_queue = Queue(20)

    Th1 = threading.Thread(target=put_id, )

    Th2 = threading.Thread(target=get_id, args=(2, ))
    Th3 = threading.Thread(target=get_id, args=(3, ))
    Th5 = threading.Thread(target=get_id, args=(4, ))
    Th4 = threading.Thread(target=get_id, args=(5, ))

    Th2.start()
    Th1.start()

    Th3.start()
    Th4.start()
    Th5.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/189705
推荐阅读
相关标签
  

闽ICP备14008679号