当前位置:   article > 正文







  1. #方式一
  2. from threading import Thread
  3. import time
  4. def sayhi(name):
  5. time.sleep(2)
  6. print('%s say hello' %name)
  7. if __name__ == '__main__':
  8. t=Thread(target=sayhi,args=('egon',))
  9. t.start()
  10. print('主线程')


  1. # 方式二
  2. from threading import Thread
  3. import time
  4. class Sayhi(Thread):
  5. def __init__(self,name):
  6. super().__init__()
  7. self.name=name
  8. def run(self):
  9. time.sleep(2)
  10. print('%s say hello' % self.name)
  11. if __name__ == '__main__':
  12. t = Sayhi('ly')
  13. t.start()
  14. print('主线程')


1 谁的开启速度快

  1. from threading import Thread
  2. from multiprocessing import Process
  3. import os
  4. def work():
  5. print('hello')
  6. if __name__ == '__main__':
  7. # 在主进程下开启线程
  8. t=Thread(target=work)
  9. t.start()
  10. print('主线程/主进程')
  11. '''
  12. 打印结果:
  13. hello
  14. 主线程/主进程
  15. '''
  16. # 在主进程下开启子进程
  17. t=Process(target=work)
  18. t.start()
  19. print('主线程/主进程')
  20. '''
  21. 打印结果:
  22. 主线程/主进程
  23. hello
  24. '''

2 瞅一瞅pid

  1. from threading import Thread
  2. from multiprocessing import Process
  3. import os
  4. def work():
  5. print('hello',os.getpid())
  6. if __name__ == '__main__':
  7. # part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
  8. t1=Thread(target=work)
  9. t2=Thread(target=work)
  10. t1.start()
  11. t2.start()
  12. print('主线程/主进程pid',os.getpid())
  13. # part2:开多个进程,每个进程都有不同的pid
  14. p1=Process(target=work)
  15. p2=Process(target=work)
  16. p1.start()
  17. p2.start()
  18. print('主线程/主进程pid',os.getpid())

3 同一进程内的线程共享该进程的数据?

  1. from threading import Thread
  2. from multiprocessing import Process
  3. import os
  4. def work():
  5. global n
  6. n=0
  7. if __name__ == '__main__':
  8. # n=100
  9. # p=Process(target=work)
  10. # p.start()
  11. # p.join()
  12. # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
  13. n=1
  14. t=Thread(target=work)
  15. t.start()
  16. t.join()
  17. print('主',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据




  1. # -*- coding: UTF-8 -*-
  2. #!/usr/bin/env python3
  3. import multiprocessing
  4. import threading
  5. import socket
  6. s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  7. s.bind(('',8080))
  8. s.listen(5)
  9. def action(conn):
  10. while True:
  11. data=conn.recv(1024)
  12. print(data)
  13. conn.send(data.upper())
  14. if __name__ == '__main__':
  15. while True:
  16. conn,addr=s.accept()
  17. p=threading.Thread(target=action,args=(conn,))
  18. p.start()


  1. # -*- coding: UTF-8 -*-
  2. #!/usr/bin/env python3
  3. import socket
  4. s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  5. s.connect(('',8080))
  6. while True:
  7. msg=input('>>: ').strip()
  8. if not msg:continue
  9. s.send(msg.encode('utf-8'))
  10. data=s.recv(1024)
  11. print(data)


  1. from threading import Thread
  2. msg_l=[]
  3. format_l=[]
  4. def talk():
  5. while True:
  6. msg=input('>>: ').strip()
  7. if not msg:continue
  8. msg_l.append(msg)
  9. def format_msg():
  10. while True:
  11. if msg_l:
  12. res=msg_l.pop()
  13. format_l.append(res.upper())
  14. def save():
  15. while True:
  16. if format_l:
  17. with open('db.txt','a',encoding='utf-8') as f:
  18. res=format_l.pop()
  19. f.write('%s\n' %res)
  20. if __name__ == '__main__':
  21. t1=Thread(target=talk)
  22. t2=Thread(target=format_msg)
  23. t3=Thread(target=save)
  24. t1.start()
  25. t2.start()
  26. t3.start()


  1. Thread实例对象的方法
  2. # isAlive(): 返回线程是否活动的。
  3. # getName(): 返回线程名。
  4. # setName(): 设置线程名。
  5. threading模块提供的一些方法:
  6. # threading.currentThread(): 返回当前的线程变量。
  7. # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  8. # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
  9. from threading import Thread
  10. import threading
  11. from multiprocessing import Process
  12. import os
  13. def work():
  14. import time
  15. time.sleep(3)
  16. print(threading.current_thread().getName())
  17. if __name__ == '__main__':
  18. #在主进程下开启线程
  19. t=Thread(target=work)
  20. t.start()
  21. print(threading.current_thread().getName())
  22. print(threading.current_thread()) #主线程
  23. print(threading.enumerate()) #连同主线程在内有两个运行的线程
  24. print(threading.active_count())
  25. print('主线程/主进程')
  26. '''
  27. 打印结果:
  28. MainThread
  29. <_MainThread(MainThread, started 140735268892672)>
  30. [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
  31. 主线程/主进程
  32. Thread-1
  33. '''


  1. from threading import Thread
  2. import time
  3. def sayhi(name):
  4. time.sleep(2)
  5. print('%s say hello' %name)
  6. if __name__ == '__main__':
  7. t=Thread(target=sayhi,args=('ly',))
  8. t.start()
  9. t.join()
  10. print('主线程')
  11. print(t.is_alive())
  12. '''
  13. ly say hello
  14. 主线程
  15. False
  16. '''






  1. #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
  2. #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
  3. from threading import Thread
  4. import time
  5. def sayhi(name):
  6. time.sleep(2)
  7. print('%s say hello' %name)
  8. if __name__ == '__main__':
  9. t=Thread(target=sayhi,args=('ly',))
  10. t.setDaemon(True) #必须在t.start()之前设置
  11. t.start()
  12. print('主线程')
  13. print(t.is_alive())
  14. '''
  15. 主线程
  16. True
  17. '''


  1. from threading import Thread
  2. import time
  3. def foo():
  4. print(123)
  5. time.sleep(1)
  6. print("end123")
  7. def bar():
  8. print(456)
  9. time.sleep(3)
  10. print("end456")
  11. t1=Thread(target=foo)
  12. t2=Thread(target=bar)
  13. t1.daemon=True
  14. t1.start()
  15. t2.start()
  16. print("main-------")

七、Python GIL(Global Interpreter Lock)


  1. '''
  2. 定义:
  3. In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
  4. native threads from executing Python bytecodes at once. This lock is necessary mainly
  5. because CPython’s memory management is not thread-safe. (However, since the GIL
  6. exists, other features have grown to depend on the guarantees that it enforces.)
  7. '''
  8. 结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

首先需要明确的一点是 GIL 并不是 Python 的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比 C++ 是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器,例如:GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有 GIL 。然而因为 CPython 是大部分环境下默认的Python执行环境。所以在很多人的概念里 CPython 就是 Python ,也就想当然的把 GIL归结为Python语言的缺陷。所以这里要先明确一点: GIL 并不是 Python 的特性,Python完全可以不依赖于GIL。




要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程

  1. '''
  2. # 验证python test.py只会产生一个进程
  3. # test.py内容
  4. import os,time
  5. print(os.getpid())
  6. time.sleep(1000)
  7. '''
  8. python3 test.py
  9. # 在windows下
  10. tasklist |findstr python
  11. # 在linux下
  12. ps aux |grep python


#1 所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)

#2 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。










#1. cpu到底是用来做计算的,还是用来做I/O的?

#2. 多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能

#3. 每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处






# 分析:

# 单核情况下,分析结果:

# 多核情况下,分析结果:

# 结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。



  1. from multiprocessing import Process
  2. from threading import Thread
  3. import os,time
  4. def work():
  5. res=0
  6. for i in range(100000000):
  7. res*=i
  8. if __name__ == '__main__':
  9. l=[]
  10. print(os.cpu_count()) # 本机为4核
  11. start=time.time()
  12. for i in range(4):
  13. p=Process(target=work) # 耗时5s多
  14. p=Thread(target=work) # 耗时18s多
  15. l.append(p)
  16. p.start()
  17. for p in l:
  18. p.join()
  19. stop=time.time()
  20. print('run time is %s' %(stop-start))


  1. from multiprocessing import Process
  2. from threading import Thread
  3. import threading
  4. import os,time
  5. def work():
  6. time.sleep(2)
  7. print('===>')
  8. if __name__ == '__main__':
  9. l=[]
  10. print(os.cpu_count()) # 本机为4核
  11. start=time.time()
  12. for i in range(400):
  13. # p=Process(target=work) # 耗时12s多,大部分时间耗费在创建进程上
  14. p=Thread(target=work) # 耗时2s多
  15. l.append(p)
  16. p.start()
  17. for p in l:
  18. p.join()
  19. stop=time.time()
  20. print('run time is %s' %(stop-start))


多线程用于IO密集型,如:socket,爬虫,web 多进程用于计算密集型,如:金融分析

6、CPU 和 GIL 必须都具备才可以执行代码

拿到 CPU 权限 -> 拿到 GIL 解释器锁 -> 执行代码

在 Python 3.2 之后 GIL 有了新的实现,目的是为了解决 That GIL Thrashing 问题,这是Antoine Pitrou 的功劳

7、GIL 解释器锁会在两种情况下释放

遇到 IO 操作或者分配的 CPU 时间片到时间了。





/ Python/ceval.c /*
static volatile int gil_drop_request = 0;




gil_drop_request = 1;线程thread1就会被强制释放GIL,然后线程thread2开始运行并返回一个ack给线程thread1,线程thread1开始调用cv_wait(gil,TIMEOUT)




#3. 一定要看本小节最后的GIL与互斥锁的经典分析




最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock










  1. 因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序里的线程和py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。
  2. from threading import Thread
  3. import os,time
  4. def work():
  5. global n
  6. temp=n
  7. time.sleep(0.1)
  8. n=temp-1
  9. if __name__ == '__main__':
  10. n=100
  11. l=[]
  12. for i in range(100):
  13. p=Thread(target=work)
  14. l.append(p)
  15. p.start()
  16. for p in l:
  17. p.join()
  18. print(n) #结果可能为99


  1. import threading
  2. R=threading.Lock()
  3. R.acquire()
  4. '''
  5. 对公共数据的操作
  6. '''
  7. R.release()
  8. from threading import Thread,Lock
  9. import os,time
  10. def work():
  11. global n
  12. lock.acquire()
  13. temp=n
  14. time.sleep(0.1)
  15. n=temp-1
  16. lock.release()
  17. if __name__ == '__main__':
  18. lock=Lock()
  19. n=100
  20. l=[]
  21. for i in range(100):
  22. p=Thread(target=work)
  23. l.append(p)
  24. p.start()
  25. for p in l:
  26. p.join()
  27. print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全



#1. 100个线程去抢GIL锁,即抢执行权限

#2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()

#3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻 塞,被迫交出执行权限,即释放GIL

#4. 直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程


  1. # 不加锁:并发执行,速度快,数据不安全
  2. from threading import current_thread,Thread,Lock
  3. import os,time
  4. def task():
  5. global n
  6. print('%s is running' %current_thread().getName())
  7. temp=n
  8. time.sleep(0.5)
  9. n=temp-1
  10. if __name__ == '__main__':
  11. n=100
  12. lock=Lock()
  13. threads=[]
  14. start_time=time.time()
  15. for i in range(100):
  16. t=Thread(target=task)
  17. threads.append(t)
  18. t.start()
  19. for t in threads:
  20. t.join()
  21. stop_time=time.time()
  22. print('主:%s n:%s' %(stop_time-start_time,n))
  23. '''
  24. Thread-1 is running
  25. Thread-2 is running
  26. ......
  27. Thread-100 is running
  28. 主:0.5216062068939209 n:99
  29. '''
  30. # 不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
  31. from threading import current_thread,Thread,Lock
  32. import os,time
  33. def task():
  34. # 未加锁的代码并发运行
  35. time.sleep(3)
  36. print('%s start to run' %current_thread().getName())
  37. global n
  38. # 加锁的代码串行运行
  39. lock.acquire()
  40. temp=n
  41. time.sleep(0.5)
  42. n=temp-1
  43. lock.release()
  44. if __name__ == '__main__':
  45. n=100
  46. lock=Lock()
  47. threads=[]
  48. start_time=time.time()
  49. for i in range(100):
  50. t=Thread(target=task)
  51. threads.append(t)
  52. t.start()
  53. for t in threads:
  54. t.join()
  55. stop_time=time.time()
  56. print('主:%s n:%s' %(stop_time-start_time,n))
  57. '''
  58. Thread-1 is running
  59. Thread-2 is running
  60. ......
  61. Thread-100 is running
  62. 主:53.294203758239746 n:0
  63. '''
  64. # 有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊
  65. # 没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是
  66. # start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的
  67. # 单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.
  68. from threading import current_thread,Thread,Lock
  69. import os,time
  70. def task():
  71. time.sleep(3)
  72. print('%s start to run' %current_thread().getName())
  73. global n
  74. temp=n
  75. time.sleep(0.5)
  76. n=temp-1
  77. if __name__ == '__main__':
  78. n=100
  79. lock=Lock()
  80. start_time=time.time()
  81. for i in range(100):
  82. t=Thread(target=task)
  83. t.start()
  84. t.join()
  85. stop_time=time.time()
  86. print('主:%s n:%s' %(stop_time-start_time,n))
  87. '''
  88. Thread-1 start to run
  89. Thread-2 start to run
  90. ......
  91. Thread-100 start to run
  92. 主:350.6937336921692 n:0 #耗时是多么的恐怖
  93. '''



所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

  1. from threading import Thread,Lock
  2. import time
  3. mutexA=Lock()
  4. mutexB=Lock()
  5. class MyThread(Thread):
  6. def run(self):
  7. self.func1()
  8. self.func2()
  9. def func1(self):
  10. mutexA.acquire()
  11. print('\033[41m%s 拿到A锁\033[0m' %self.name)
  12. mutexB.acquire()
  13. print('\033[42m%s 拿到B锁\033[0m' %self.name)
  14. mutexB.release()
  15. mutexA.release()
  16. def func2(self):
  17. mutexB.acquire()
  18. print('\033[43m%s 拿到B锁\033[0m' %self.name)
  19. time.sleep(2)
  20. mutexA.acquire()
  21. print('\033[44m%s 拿到A锁\033[0m' %self.name)
  22. mutexA.release()
  23. mutexB.release()
  24. if __name__ == '__main__':
  25. for i in range(10):
  26. t=MyThread()
  27. t.start()
  28. '''
  29. Thread-1 拿到A锁
  30. Thread-1 拿到B锁
  31. Thread-1 拿到B锁
  32. Thread-2 拿到A锁
  33. 然后就卡住,死锁了
  34. '''



mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止


同进程的一样,Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器-1; 调用release() 时内置计数器+1; 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。


  1. from threading import Thread,Semaphore
  2. import threading
  3. import time
  4. # def func():
  5. # if sm.acquire():
  6. # print (threading.currentThread().getName() + ' get semaphore')
  7. # time.sleep(2)
  8. # sm.release()
  9. def func():
  10. sm.acquire()
  11. print('%s get sm' %threading.current_thread().getName())
  12. time.sleep(3)
  13. sm.release()
  14. if __name__ == '__main__':
  15. sm=Semaphore(5)
  16. for i in range(23):
  17. t=Thread(target=func)
  18. t.start()




线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

  1. event.isSet():返回event的状态值;
  2. event.wait():如果 event.isSet()==False将阻塞线程;
  3. event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
  4. event.clear():恢复event的状态值为False


  1. from threading import Thread,Event
  2. import threading
  3. import time,random
  4. def conn_mysql():
  5. count=1
  6. while not event.is_set():
  7. if count > 3:
  8. raise TimeoutError('链接超时')
  9. print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
  10. event.wait(0.5)
  11. count+=1
  12. print('<%s>链接成功' %threading.current_thread().getName())
  13. def check_mysql():
  14. print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
  15. time.sleep(random.randint(2,4))
  16. event.set()
  17. if __name__ == '__main__':
  18. event=Event()
  19. conn1=Thread(target=conn_mysql)
  20. conn2=Thread(target=conn_mysql)
  21. check=Thread(target=check_mysql)
  22. conn1.start()
  23. conn2.start()
  24. check.start()



  1. import threading
  2. def run(n):
  3. con.acquire()
  4. con.wait()
  5. print("run the thread: %s" %n)
  6. con.release()
  7. if __name__ == '__main__':
  8. con = threading.Condition()
  9. for i in range(10):
  10. t = threading.Thread(target=run, args=(i,))
  11. t.start()
  12. while True:
  13. inp = input('>>>')
  14. if inp == 'q':
  15. break
  16. con.acquire()
  17. con.notify(int(inp))
  18. con.release()
  19. def condition_func():
  20. ret = False
  21. inp = input('>>>')
  22. if inp == '1':
  23. ret = True
  24. return ret
  25. def run(n):
  26. con.acquire()
  27. con.wait_for(condition_func)
  28. print("run the thread: %s" %n)
  29. con.release()
  30. if __name__ == '__main__':
  31. con = threading.Condition()
  32. for i in range(10):
  33. t = threading.Thread(target=run, args=(i,))
  34. t.start()



  1. from threading import Timer
  2. def hello():
  3. print("hello, world")
  4. t = Timer(1, hello)
  5. t.start() # after 1 seconds, "hello, world" will be printed


  1. from threading import Timer
  2. import random,time
  3. class Code:
  4. def __init__(self):
  5. self.make_cache()
  6. def make_cache(self,interval=5):
  7. self.cache=self.make_code()
  8. print(self.cache)
  9. self.t=Timer(interval,self.make_cache)
  10. self.t.start()
  11. def make_code(self,n=4):
  12. res=''
  13. for i in range(n):
  14. s1=str(random.randint(0,9))
  15. s2=chr(random.randint(65,90))
  16. res+=random.choice([s1,s2])
  17. return res
  18. def check(self):
  19. while True:
  20. inp=input('>>: ').strip()
  21. if inp.upper() == self.cache:
  22. print('验证成功',end='\n')
  23. self.t.cancel()
  24. break
  25. if __name__ == '__main__':
  26. obj=Code()
  27. obj.check()


queue队列 :使用import queue,用法与进程Queue一样

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

• class queue.Queue(maxsize=0) 先进先出

  1. import queue
  2. q=queue.Queue()
  3. q.put('first')
  4. q.put('second')
  5. q.put('third')
  6. print(q.get())
  7. print(q.get())
  8. print(q.get())
  9. '''
  10. 结果(先进先出):
  11. first
  12. second
  13. third
  14. '''
  • class queue.LifoQueue(maxsize=0) 先进先出 # last in fisrt out 后进先出
  1. import queue
  2. q=queue.LifoQueue()
  3. q.put('first')
  4. q.put('second')
  5. q.put('third')
  6. print(q.get())
  7. print(q.get())
  8. print(q.get())
  9. '''
  10. 结果(后进先出):
  11. third
  12. second
  13. first
  14. '''
  • class queue.PriorityQueue(maxsize=0) # 存储数据时可设置优先级的队列
  1. import queue
  2. q=queue.PriorityQueue()
  3. #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
  4. q.put((20,'a'))
  5. q.put((10,'b'))
  6. q.put((30,'c'))
  7. print(q.get())
  8. print(q.get())
  9. print(q.get())
  10. '''
  11. 结果(数字越小优先级越高,优先级高的优先出队):
  12. (10, 'b')
  13. (20, 'a')
  14. (30, 'c')
  15. '''


  1. Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
  2. 构造一个优先级队列,其中maxsize是一个整数,用于设置可以放入队列的项目数量的上限.一旦达到这个上限,插入就会阻塞,直到队列中有项目被消耗。如果maxsize小于或等于0,则队列长度为无穷大。
  3. The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).
  4. 首先检索最低值的条目(最低值的条目是指列表经过排序后取到的索引为0的那个元素,一般条目是(优先级数字,数据)这种元组的形式
  5. exception queue.Empty
  6. Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.
  7. 当表示非阻塞的get()或get_nowait()在一个空的队列对象中被调用时,会抛出异常
  8. exception queue.Full
  9. Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.
  10. 当表示非阻塞的put()或put_nowait()在一个满的队列对象中被调用时,会抛出异常
  11. Queue.qsize()
  12. Queue.empty() #return True if empty
  13. 当队列为空返回True
  14. Queue.full() # return True if full
  15. 当队列为满返回True
  16. Queue.put(item, block=True, timeout=None)
  17. Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).
  18. 将一个项放入队列。如果可选参数block为true并且timeout为None(默认值),则在必要时阻塞,直到有空闲槽可用。如果参数timeout是一个正数,它最多阻塞timeout秒,如果在这段时间内没有可用的空闲槽,则会引发Full异常。否则(block为false),如果有空闲槽可用,则将一个项目放入队列中,否则引发Full异常(在这种情况下,timeout被忽略)。
  19. Queue.put_nowait(item)
  20. Equivalent to put(item, False).
  21. Queue.get(block=True, timeout=None)
  22. Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).
  23. 从队列中移除并返回一个项。如果可选参数block为true并且timeout为None(默认值),则在必要时阻塞,直到有可用的项。如果timeout为正数,则最多阻塞timeout秒,如果在该时间内没有可用项,则抛出Empty异常。否则(block为false),如果一个项目可用,则返回那个项目,否则引发Empty异常(在这种情况下,timeout被忽略)。
  24. Queue.get_nowait()
  25. Equivalent to get(False).
  26. Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
  27. 提供了两种方法来支持追踪进入队列的任务是否已被生产者的守护线程完全处理。
  28. Queue.task_done()
  29. Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
  30. 假定先前进入队列的任务已完成。并且被队列生产者使用。对于每个用于获取任务的get(),后续对task_done()的调用都会告诉队列任务的处理已经完成。
  31. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
  32. 如果join()当前正被阻塞,它将在所有项都被处理完时恢复(这意味着对于每个已经put()到队列中的项都接收到task_done()调用)。
  33. Raises a ValueError if called more times than there were items placed in the queue.
  34. 如果调用次数超过放入队列的项数,将引发ValueError。
  35. Queue.join()
  36. 阻塞,直到queue被消费完毕


  1. # 1 介绍
  2. concurrent.futures模块提供了高度封装的异步调用接口
  3. ThreadPoolExecutor:线程池,提供异步调用
  4. ProcessPoolExecutor: 进程池,提供异步调用
  5. Both implement the same interface, which is defined by the abstract Executor class.
  6. # 2 基本方法
  7. # submit(fn, *args, **kwargs)
  8. 异步提交任务
  9. # map(func, *iterables, timeout=None, chunksize=1)
  10. 取代for循环submit的操作
  11. # shutdown(wait=True)
  12. 相当于进程池的pool.close()+pool.join()操作
  13. wait=True,等待池内所有任务执行完毕回收完资源后才继续
  14. wait=False,立即返回,并不会等待池内的任务执行完毕
  15. 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
  16. submit和map必须在shutdown之前
  17. # result(timeout=None)
  18. 取得结果
  19. # add_done_callback(fn)
  20. 回调函数


  1. # 介绍
  2. The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
  3. ProcessPoolExecutor类是Executor的子类,它使用一个进程池来异步执行调用。ProcessPoolExecutor会调用多进程模块,这允许它避开全局解释器锁,但也意味着只能执行和返回可pickle的对象。
  4. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
  5. An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
  6. 能够异步调用数量不超过参数max_workers的子进程,如果max_workers为None或未给出,则默认值为机器上的处理器数。如果max_workers小于或等于0,则会抛出异常ValueError
  7. # 用法
  8. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  9. import os,time,random
  10. def task(n):
  11. print('%s is runing' %os.getpid())
  12. time.sleep(random.randint(1,3))
  13. return n**2
  14. if __name__ == '__main__':
  15. executor=ProcessPoolExecutor(max_workers=3)
  16. futures=[]
  17. for i in range(11):
  18. future=executor.submit(task,i)
  19. futures.append(future)
  20. executor.shutdown(True)
  21. print('+++>')
  22. for future in futures:
  23. print(future.result())


ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
3.5版本中的变化:如果max_workers为None或者没有被指定,它将默认为计算机的处理器个数乘以5,假设ThreadPoolExecutor(线程池)通常用于重复I / O操作而不是CPU的计算,那么它的实际效率会低于ProcessPoolExecutor(进程池)

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.



  1. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  2. import os,time,random
  3. def task(n):
  4. print('%s is runing' %os.getpid())
  5. time.sleep(random.randint(1,3))
  6. return n**2
  7. if __name__ == '__main__':
  8. executor=ThreadPoolExecutor(max_workers=3)
  9. # for i in range(11):
  10. # future=executor.submit(task,i)
  11. executor.map(task,range(1,12)) #map取代了for+submit
  12. 回调函数
  13. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  14. from multiprocessing import Pool
  15. import requests
  16. import json
  17. import os
  18. def get_page(url):
  19. print('<进程%s> get %s' %(os.getpid(),url))
  20. respone=requests.get(url)
  21. if respone.status_code == 200:
  22. return {'url':url,'text':respone.text}
  23. def parse_page(res):
  24. res=res.result()
  25. print('<进程%s> parse %s' %(os.getpid(),res['url']))
  26. parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
  27. with open('db.txt','a') as f:
  28. f.write(parse_res)
  29. if __name__ == '__main__':
  30. urls=[
  31. 'https://www.baidu.com',
  32. 'https://www.python.org',
  33. 'https://www.openstack.org',
  34. 'https://help.github.com/',
  35. 'http://www.sina.com.cn/'
  36. ]
  37. # p=Pool(3)
  38. # for url in urls:
  39. # p.apply_async(get_page,args=(url,),callback=pasrse_page)
  40. # p.close()
  41. # p.join()
  42. p=ProcessPoolExecutor(3)
  43. for url in urls:
  44. p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果


