当前位置:   article > 正文

python系列——多线程之queue及线程池_python线程池和队列

python线程池和队列

参考博客:

https://www.cnblogs.com/franknihao/p/6627857.html

1、作用

Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。

2、种类

  • Queue.Queue(maxsize)  FIFO(先进先出队列)
  •     Queue.LifoQueue(maxsize)  LIFO(先进后出队列)
  •     Queue.PriorityQueue(maxsize)  为优先级越高的越先出来,对于一个队列中的所有元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于一般的数据,优先队列取什么东西作为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority作为优先度。

3、一些方法:

FIFO是常用的队列,其一些常用的方法有:

  •     Queue.qsize()  返回队列大小
  •     Queue.empty()  判断队列是否为空
  •     Queue.full()  判断队列是否满了
  •     Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。
  •     Queue.put(...[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
  •     Queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成
  •     Queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

消息队列实例中维护的有待完成任务变量。每接收到一个消息该值自增一次。每调用一次.task_done()可以使该值减1,当待完成任务值为0的时候,join函数才会返回。

4、queue例子:

  1. import threading
  2. import Queue
  3. import time
  4. class worker(threading.Thread):
  5. def __init__(self,queue):
  6. threading.Thread.__init__(self)
  7. self.queue=queue
  8. self.thread_stop=False
  9. def run(self):
  10. while not self.thread_stop:
  11. print("thread%d %s: waiting for tast" %(self.ident,self.name))
  12. try:
  13. task=q.get(block=True, timeout=20)#接收消息
  14. except Queue.Empty:
  15. print("Nothing to do!i will go home!")
  16. self.thread_stop=True
  17. break
  18. print("task recv:%s ,task No:%d" % (task[0],task[1]))
  19. print("i am working")
  20. time.sleep(3)
  21. print("work finished!")
  22. q.task_done()#完成一个任务
  23. res=q.qsize()#判断消息队列大小
  24. if res>0:
  25. print("fuck!There are still %d tasks to do" % (res))
  26. def stop(self):
  27. self.thread_stop = True
  28. if __name__ == "__main__":
  29. q=Queue.Queue(3)
  30. worker=worker(q)
  31. worker.start()
  32. q.put(["produce one cup!",1], block=True, timeout=None)#产生任务消息
  33. q.put(["produce one desk!",2], block=True, timeout=None)
  34. q.put(["produce one apple!",3], block=True, timeout=None)
  35. q.put(["produce one banana!",4], block=True, timeout=None)
  36. q.put(["produce one bag!",5], block=True, timeout=None)
  37. print("***************leader:wait for finish!")
  38. q.join()#等待所有任务完成
  39. print("***************leader:all task finished!")

输出:

  1. thread139958685849344 Thread-1: waiting for tast 1
  2. task recv:produce one cup! ,task No:1
  3. i am working
  4. work finished!
  5. fuck!There are still 3 tasks to do
  6. thread139958685849344 Thread-1: waiting for tast 1
  7. task recv:produce one desk! ,task No:2
  8. i am workingleader:wait for finish!
  9. work finished!
  10. fuck!There are still 3 tasks to do
  11. thread139958685849344 Thread-1: waiting for tast 1
  12. task recv:produce one apple! ,task No:3
  13. i am working
  14. work finished!
  15. fuck!There are still 2 tasks to do
  16. thread139958685849344 Thread-1: waiting for tast 1
  17. task recv:produce one banana! ,task No:4
  18. i am working
  19. work finished!
  20. fuck!There are still 1 tasks to do
  21. thread139958685849344 Thread-1: waiting for tast 1
  22. task recv:produce one bag! ,task No:5
  23. i am working
  24. work finished!
  25. thread139958685849344 Thread-1: waiting for tast 1
  26. ***************leader:all task finished!
  27. Nothing to do!i will go home!

运行一下就知道,上例中并没有性能的提升(毕竟还是只有一个线程在跑)。线程队列的意义并不是进一步提高运行效率,而是使线程的并发更加有组织。可以看到,在增加了线程队列之后,程序对于线程的并发数量就有了控制。新线程想要加入队列开始执行,必须等一个既存的线程完成之后才可以。

5、线程池

  1. # -*- coding:utf-8 -*-
  2. import threading
  3. import Queue
  4. import time
  5. import random
  6. from faker import Faker
  7. class MyThread(threading.Thread):
  8. '''
  9. 线程模型
  10. '''
  11. def __init__(self,queue):
  12. threading.Thread.__init__(self)
  13. self.queue = queue
  14. self.start() # 因为作为一个工具,线程必须永远“在线”,所以不如让它在创建完成后直接运行,省得我们手动再去start它
  15. def run(self):
  16. while True: # 除非确认队列中已经无任务,否则时刻保持线程在运行
  17. try:
  18. task = self.queue.get(block=False) # 如果队列空了,直接结束线程。根据具体场景不同可能不合理,可以修改
  19. time.sleep(random.random()) # 假设处理了一段时间
  20. print 'Task %s Done' % task # 提示信息而已
  21. self.queue.task_done()
  22. except Exception,e:
  23. break
  24. class MyThreadPool():
  25. def __init__(self,queue,size):
  26. self.queue = queue
  27. self.pool = []
  28. for i in range(size):
  29. self.pool.append(MyThread(queue))
  30. def joinAll(self):
  31. for thd in self.pool:
  32. if thd.isAlive(): thd.join()
  33. if __name__ == '__main__':
  34. q = Queue.Queue(10)
  35. fake = Faker()
  36. for i in range(5):
  37. q.put(fake.word())
  38. pool = MyThreadPool(queue=q,size=2)
  39. pool.joinAll()

网上有一部分示例,将队列作为一个属性维护在了线程池类中,也不失为一种办法,我这里为了能够条理清晰,没有放在类里面。这段程序首先生成了一个maxsize是10的队列。fake.word()可以随机生成一个单词,这里仅作测试用。所以向队列中添加了5个task。

  这里有个坑: 如果put的数量大于队列最大长度,而且put没有设置block=False的话,那么显然程序会阻塞在put这边。此时ThreadPool未被建立,也就是说工作线程都还没有启动,因此会引起这样一个死锁。如果把线程池的建立放到put之前也不行,此时线程发现队列为空,所以所有线程都会直接结束(当然这是线程中get的block是False的时候,如果为True那么也是死锁),最终队列中的task没人处理,程序输出为空。解决这个坑的办法,一个是像上面一样保持最开始put的量小于队列长度;第二个就是干脆不要限制队列长度,用q = Queue.Queue()生产队列即可。

  好的,继续往下,进入了线程池的生成。线程池内部的列表才是真·线程池,另外其关联了queue对象,所以在创建的时候可以将队列对象传递给线程对象。线程对象在创建时就启动了,并且被添加到线程池的那个列表中。线程池的大小由参数给出,线程启动后会去队列里面get任务,并且进行处理。处理完成后进行task_done声明并且再次去尝试get。如果队列为空那么就直接抛出异常,也就是跳出循环,线程结束。

  通过这样一个模型,根据线程池的大小,这才真正地给线程并发做了一个限制,可促进较大程度的资源利用。

6、进阶:

在上面这个示例中,实际上处理任务的实际逻辑是被写在了MyThread类里面。如果我们想要一个通用性更加高的工具类,那么势必要想想如何将这个线程类解耦具体逻辑。另一方面,队列中的任务的内容,不仅仅可以是字符串,也可以是任何python对象。这就使得灵活性大大提高。

  比如我们可以在队列中put内容是(func, args, kwargs)这样一个元组。其中func是一个函数对象,描述了任务的处理逻辑过程,args是一个元组,代表所有func函数的匿名参数,kwargs则是func函数的所有具名参数。如此,可以将线程类的run方法改写成这样:

  1. def run(self):
  2. while True:
  3. try:
  4. func,args,kwargs = self.queue.get()
  5. try:
  6. func(*args,**kwargs)
  7. except Exception,e:
  8. raise ('bad execution: %s' % str(e))
  9. self.queue.task_done()
  10. except Exception,e:
  11. break

这样一个run就可以做到很大程度的解耦了。

  类似的思想,线程池类和线程类也不必是一一对应的。可以将线程类作为一个参数传递给线程池类。这样一个线程池类就可以作为容器容纳各种各样的线程了。具体实例就不写了。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/189696
推荐阅读
相关标签
  

闽ICP备14008679号