赞
踩
参考博客:
https://www.cnblogs.com/franknihao/p/6627857.html
1、作用
Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。
2、种类
3、一些方法:
FIFO是常用的队列,其一些常用的方法有:
消息队列实例中维护的有待完成任务变量。每接收到一个消息该值自增一次。每调用一次.task_done()可以使该值减1,当待完成任务值为0的时候,join函数才会返回。
4、queue例子:
- import threading
- import Queue
- import time
- class worker(threading.Thread):
- def __init__(self,queue):
- threading.Thread.__init__(self)
- self.queue=queue
- self.thread_stop=False
-
- def run(self):
- while not self.thread_stop:
- print("thread%d %s: waiting for tast" %(self.ident,self.name))
- try:
- task=q.get(block=True, timeout=20)#接收消息
- except Queue.Empty:
- print("Nothing to do!i will go home!")
- self.thread_stop=True
- break
- print("task recv:%s ,task No:%d" % (task[0],task[1]))
- print("i am working")
- time.sleep(3)
- print("work finished!")
- q.task_done()#完成一个任务
- res=q.qsize()#判断消息队列大小
- if res>0:
- print("fuck!There are still %d tasks to do" % (res))
-
- def stop(self):
- self.thread_stop = True
-
- if __name__ == "__main__":
- q=Queue.Queue(3)
- worker=worker(q)
- worker.start()
- q.put(["produce one cup!",1], block=True, timeout=None)#产生任务消息
- q.put(["produce one desk!",2], block=True, timeout=None)
- q.put(["produce one apple!",3], block=True, timeout=None)
- q.put(["produce one banana!",4], block=True, timeout=None)
- q.put(["produce one bag!",5], block=True, timeout=None)
- print("***************leader:wait for finish!")
- q.join()#等待所有任务完成
- print("***************leader:all task finished!")
输出:
- thread139958685849344 Thread-1: waiting for tast 1
- task recv:produce one cup! ,task No:1
- i am working
- work finished!
- fuck!There are still 3 tasks to do
- thread139958685849344 Thread-1: waiting for tast 1
- task recv:produce one desk! ,task No:2
- i am workingleader:wait for finish!
- work finished!
- fuck!There are still 3 tasks to do
- thread139958685849344 Thread-1: waiting for tast 1
- task recv:produce one apple! ,task No:3
- i am working
- work finished!
- fuck!There are still 2 tasks to do
- thread139958685849344 Thread-1: waiting for tast 1
- task recv:produce one banana! ,task No:4
- i am working
- work finished!
- fuck!There are still 1 tasks to do
- thread139958685849344 Thread-1: waiting for tast 1
- task recv:produce one bag! ,task No:5
- i am working
- work finished!
- thread139958685849344 Thread-1: waiting for tast 1
- ***************leader:all task finished!
- Nothing to do!i will go home!
运行一下就知道,上例中并没有性能的提升(毕竟还是只有一个线程在跑)。线程队列的意义并不是进一步提高运行效率,而是使线程的并发更加有组织。可以看到,在增加了线程队列之后,程序对于线程的并发数量就有了控制。新线程想要加入队列开始执行,必须等一个既存的线程完成之后才可以。
5、线程池:
- # -*- coding:utf-8 -*-
-
- import threading
- import Queue
- import time
- import random
-
- from faker import Faker
-
- class MyThread(threading.Thread):
- '''
- 线程模型
- '''
- def __init__(self,queue):
- threading.Thread.__init__(self)
- self.queue = queue
- self.start() # 因为作为一个工具,线程必须永远“在线”,所以不如让它在创建完成后直接运行,省得我们手动再去start它
-
- def run(self):
- while True: # 除非确认队列中已经无任务,否则时刻保持线程在运行
- try:
- task = self.queue.get(block=False) # 如果队列空了,直接结束线程。根据具体场景不同可能不合理,可以修改
- time.sleep(random.random()) # 假设处理了一段时间
- print 'Task %s Done' % task # 提示信息而已
- self.queue.task_done()
- except Exception,e:
- break
-
- class MyThreadPool():
- def __init__(self,queue,size):
- self.queue = queue
- self.pool = []
- for i in range(size):
- self.pool.append(MyThread(queue))
-
- def joinAll(self):
- for thd in self.pool:
- if thd.isAlive(): thd.join()
-
- if __name__ == '__main__':
- q = Queue.Queue(10)
- fake = Faker()
- for i in range(5):
- q.put(fake.word())
- pool = MyThreadPool(queue=q,size=2)
- pool.joinAll()
网上有一部分示例,将队列作为一个属性维护在了线程池类中,也不失为一种办法,我这里为了能够条理清晰,没有放在类里面。这段程序首先生成了一个maxsize是10的队列。fake.word()可以随机生成一个单词,这里仅作测试用。所以向队列中添加了5个task。
这里有个坑: 如果put的数量大于队列最大长度,而且put没有设置block=False的话,那么显然程序会阻塞在put这边。此时ThreadPool未被建立,也就是说工作线程都还没有启动,因此会引起这样一个死锁。如果把线程池的建立放到put之前也不行,此时线程发现队列为空,所以所有线程都会直接结束(当然这是线程中get的block是False的时候,如果为True那么也是死锁),最终队列中的task没人处理,程序输出为空。解决这个坑的办法,一个是像上面一样保持最开始put的量小于队列长度;第二个就是干脆不要限制队列长度,用q = Queue.Queue()生产队列即可。
好的,继续往下,进入了线程池的生成。线程池内部的列表才是真·线程池,另外其关联了queue对象,所以在创建的时候可以将队列对象传递给线程对象。线程对象在创建时就启动了,并且被添加到线程池的那个列表中。线程池的大小由参数给出,线程启动后会去队列里面get任务,并且进行处理。处理完成后进行task_done声明并且再次去尝试get。如果队列为空那么就直接抛出异常,也就是跳出循环,线程结束。
通过这样一个模型,根据线程池的大小,这才真正地给线程并发做了一个限制,可促进较大程度的资源利用。
6、进阶:
在上面这个示例中,实际上处理任务的实际逻辑是被写在了MyThread类里面。如果我们想要一个通用性更加高的工具类,那么势必要想想如何将这个线程类解耦具体逻辑。另一方面,队列中的任务的内容,不仅仅可以是字符串,也可以是任何python对象。这就使得灵活性大大提高。
比如我们可以在队列中put内容是(func, args, kwargs)这样一个元组。其中func是一个函数对象,描述了任务的处理逻辑过程,args是一个元组,代表所有func函数的匿名参数,kwargs则是func函数的所有具名参数。如此,可以将线程类的run方法改写成这样:
- def run(self):
- while True:
- try:
- func,args,kwargs = self.queue.get()
- try:
- func(*args,**kwargs)
- except Exception,e:
- raise ('bad execution: %s' % str(e))
- self.queue.task_done()
- except Exception,e:
- break
这样一个run就可以做到很大程度的解耦了。
类似的思想,线程池类和线程类也不必是一一对应的。可以将线程类作为一个参数传递给线程池类。这样一个线程池类就可以作为容器容纳各种各样的线程了。具体实例就不写了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。