当前位置:   article > 正文

python3 queue的多线程通信_python3 queue 多线程

python3 queue 多线程

queue分类

python3 queue分三类:

  • 先进先出队列
  • 后进先出的栈
  • 优先级队列
    他们的导入方式分别是:
from queue import Queue
from queue import LifoQueue
from queue import PriorityQueue
  • 1
  • 2
  • 3

具体方法见下面引用说明。

例子一、生产消费模式

Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。 当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。例如:

from queue import Queue
from threading import Thread

# 用来表示终止的特殊对象
_sentinel = object()


# A thread that produces data
def producer(out_q):
    for i in range(10):
        print("生产")
        out_q.put(i)
    out_q.put(_sentinel)


# A thread that consumes data
def consumer(in_q):
    while True:
        data = in_q.get()
        if data is _sentinel:
            in_q.put(_sentinel)
            break
        else:
            print("消费", data)


# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

结果

本例中有一个特殊的地方:消费者在读到这个特殊值之后立即又把它放回到队列中,将之传递下去。这样,所有监听这个队列的消费者线程就可以全部关闭了。 尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 Condition 变量来包装你的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列。

import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

例子二、task_donejoin

使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的 task_done()join()

from queue import Queue
from threading import Thread


class Producer(Thread):
    def __init__(self, q):
        super().__init__()
        self.count = 5
        self.q = q

    def run(self):
        while self.count > 0:
            print("生产")
            if self.count == 1:
                self.count -= 1
                self.q.put(2)
            else:
                self.count -= 1
                self.q.put(1)


class Consumer(Thread):

    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        while True:
            print("消费")
            data = self.q.get()
            if data == 2:
                print("stop because data=", data)
                # 任务完成,从队列中清除一个元素
                self.q.task_done()
                break
            else:
                print("data is good,data=", data)
                # 任务完成,从队列中清除一个元素
                self.q.task_done()


def main():
    q = Queue()
    p = Producer(q)
    c = Consumer(q)
    p.setDaemon(True)
    c.setDaemon(True)
    p.start()
    c.start()
    # 等待队列清空
    q.join()
    print("queue is complete")


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

结果:

例子三、多线程里用queue

设置俩队列,一个是要做的任务队列todo_queue,一个是已经完成的队列done_queue
每次执行线程,先从todo_queue队列里取出一个值,然后执行完,放入done_queue队列。
如果todo_queue为空,就退出。

import logging
import logging.handlers
import threading
import queue

log_mgr = None
todo_queue = queue.Queue()
done_queue = queue.Queue()


class LogMgr:
    def __init__(self, logpath):
        self.LOG = logging.getLogger('log')
        loghd = logging.handlers.RotatingFileHandler(logpath, "a", 0, 1)
        fmt = logging.Formatter("%(asctime)s %(threadName)-10s %(message)s", "%Y-%m-%d %H:%M:%S")
        loghd.setFormatter(fmt)
        self.LOG.addHandler(loghd)
        self.LOG.setLevel(logging.INFO)

    def info(self, msg):
        if self.LOG is not None:
            self.LOG.info(msg)


class Worker(threading.Thread):
    global log_mgr

    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        while True:
            try:
                task = todo_queue.get(False)
                if task:
                    log_mgr.info("HANDLE_TASK: %s" % task)
                    done_queue.put(1)
            except queue.Empty:
                break
        return


def main():
    global log_mgr
    log_mgr = LogMgr("mylog")

    for i in range(30):
        todo_queue.put("data"+str(i))

    workers = []
    for i in range(3):
        w = Worker("worker"+str(i))
        workers.append(w)

    for i in range(3):
        workers[i].start()

    for i in range(3):
        workers[i].join()

    total_num = done_queue.qsize()
    log_mgr.info("TOTAL_HANDLE_TASK: %d" % total_num)
    exit(0)


if __name__ == '__main__':
    main()


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

输出日志文件结果:

参考

https://docs.python.org/3/library/queue.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/143447
推荐阅读
相关标签
  

闽ICP备14008679号