1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading #线程 6 import time 7 8 def Hi(num): #有一个参数 9 print("hello %s" %num) 10 time.sleep(3) 11 12 if __name__ == '__main__': 13 14 t1=threading.Thread(target=Hi,args=(10,)) #创建了一个线程对象t1,10做为一个参数,传给num 15 t1.start() 16 17 t2=threading.Thread(target=Hi,args=(9,)) #创建了一个线程对象t2,9做为一个参数,传给num 18 t2.start() 19 20 print("ending.........") #主线程输出ending
1 hello 10 #子线程 2 hello 9 #子线程 3 ending......... #主线程 4 #上面三个同时出来,再停顿三秒才结束 5 Process finished with exit code 0 #停顿3秒才结束
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 def music(): 9 print("begin to listen %s"%time.ctime()) 10 time.sleep(3) 11 print("stop to listen %s" %time.ctime()) 12 13 def game(): 14 print("begin to play game %s"%time.ctime()) 15 time.sleep(5) 16 print("stop to play game %s" %time.ctime()) 17 18 if __name__ == '__main__': 19 20 t1=threading.Thread(target=music) 21 t1.start() 22 t2=threading.Thread(target=game) 23 t2.start()
1 #总共花了5秒时间 2 3 begin to listen Sat Jan 14 12:34:43 2017 4 begin to play game Sat Jan 14 12:34:43 2017 #1、先打印2个 5 6 stop to listen Sat Jan 14 12:34:46 2017 #2、等待3秒再打印一个 7 8 stop to play game Sat Jan 14 12:34:48 2017 #3、再等待2秒,打印一个
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 def music(): 9 print("begin to listen %s"%time.ctime()) 10 time.sleep(3) 11 print("stop to listen %s" %time.ctime()) 12 13 def game(): 14 print("begin to play game %s"%time.ctime()) 15 time.sleep(5) 16 print("stop to play game %s" %time.ctime()) 17 18 if __name__ == '__main__': 19 20 t1=threading.Thread(target=music) 21 t2=threading.Thread(target=game) 22 23 t1.start() #运行实例的方法 24 t2.start() 25 26 t1.join() #子线程对象调用join()方法 27 t2.join() 28 29 print("ending") #在主线程中
begin to listen Sat Jan 14 12:58:34 2017 begin to play game Sat Jan 14 12:58:34 2017 #先打印2个 stop to listen Sat Jan 14 12:58:37 2017 #等待3秒,再打印一个 stop to play game Sat Jan 14 12:58:39 2017 #等待2秒,再打印两个 ending
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 def music(): 9 print("begin to listen %s"%time.ctime()) 10 time.sleep(3) 11 print("stop to listen %s" %time.ctime()) 12 13 def game(): 14 print("begin to play game %s"%time.ctime()) 15 time.sleep(5) 16 print("stop to play game %s" %time.ctime()) 17 18 if __name__ == '__main__': 19 20 t1=threading.Thread(target=music) 21 t2=threading.Thread(target=game) 22 23 t1.start() #运行实例的方法 24 t2.start() 25 26 t1.join() #t1线程不结束,谁都不往下走 27 28 print("ending")
1 begin to listen Sat Jan 14 13:06:07 2017 2 begin to play game Sat Jan 14 13:06:07 2017 #先打印这两行 3 4 stop to listen Sat Jan 14 13:06:10 2017 #再等待3秒打印这两行 5 ending 6 7 stop to play game Sat Jan 14 13:06:12 2017 #再等待2秒打印这行
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 def music(): 9 print("begin to listen %s"%time.ctime()) 10 time.sleep(3) 11 print("stop to listen %s" %time.ctime()) 12 13 def game(): 14 print("begin to play game %s"%time.ctime()) 15 time.sleep(5) 16 print("stop to play game %s" %time.ctime()) 17 18 if __name__ == '__main__': 19 20 t1=threading.Thread(target=music) 21 t2=threading.Thread(target=game) 22 23 t1.start() #运行实例的方法 24 t2.start() 25 26 t2.join() 27 28 print("ending") #在主线程中
1 begin to listen Sat Jan 14 13:12:34 2017 #先打印这两行 2 begin to play game Sat Jan 14 13:12:34 2017 3 4 stop to listen Sat Jan 14 13:12:37 2017 #等待3秒,打印这一行 5 6 stop to play game Sat Jan 14 13:12:39 2017 #等待2秒,打印这两行 7 ending
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 def music(): 9 print("begin to listen %s"%time.ctime()) 10 time.sleep(3) 11 print("stop to listen %s" %time.ctime()) 12 13 def game(): 14 print("begin to play game %s"%time.ctime()) 15 time.sleep(5) 16 print("stop to play game %s" %time.ctime()) 17 18 if __name__ == '__main__': 19 20 t1=threading.Thread(target=music) 21 t2=threading.Thread(target=game) 22 23 t1.start() 24 25 t1.join() 26 t2.start() 27 28 t2.join() 29 30 print("ending") #在主线程中
1 begin to listen Sat Jan 14 13:26:18 2017 #先打印条1行 2 3 stop to listen Sat Jan 14 13:26:21 2017 #等待3秒再打印2行 4 begin to play game Sat Jan 14 13:26:21 2017 5 6 stop to play game Sat Jan 14 13:26:26 2017 #等待5秒打印2行 7 ending
threading 模块建立在 thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread
进行二次封装,提供了更方便的 api 来处理线程。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 9 def sayhi(num): # 定义每个线程要运行的函数 10 11 print("running on number:%s" % num) 12 13 time.sleep(3) 14 15 16 if __name__ == '__main__': 17 t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例 18 t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一个线程实例 19 20 t1.start() # 启动线程 21 t2.start() # 启动另一个线程 22 23 print(t1.getName()) # 获取线程名 24 print(t2.getName())
1 running on number:1 2 running on number:2 3 Thread-1 4 Thread-2
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 #自己定制一个MyThread的类 9 class MyThread(threading.Thread): 10 def __init__(self, num): 11 threading.Thread.__init__(self) 12 self.num = num 13 14 def run(self): # 定义每个线程要运行的函数 15 16 print("running on number:%s" % self.num) 17 18 time.sleep(3) 19 20 21 if __name__ == '__main__': 22 t1 = MyThread(1) #继承这个类,把1这个参数,传给num ,t1就是个线程对象 23 t2 = MyThread(2) 24 t1.start() 25 t2.start() 26 27 print("ending......")
1 running on number:1 2 running on number:2 3 ending......
四、 threading.thread的实例方法
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 from time import ctime,sleep 7 import time 8 9 def ListenMusic(name): 10 11 print ("Begin listening to %s. %s" %(name,ctime())) 12 sleep(3) 13 print("end listening %s"%ctime()) 14 15 def RecordBlog(title): 16 17 print ("Begin recording the %s! %s" %(title,ctime())) 18 sleep(5) 19 print('end recording %s'%ctime()) 20 21 #创建一个列表,把t1和t2加到列表中去 22 threads = [] 23 t1 = threading.Thread(target=ListenMusic,args=('水手',)) 24 t2 = threading.Thread(target=RecordBlog,args=('python线程',)) 25 threads.append(t1) 26 threads.append(t2) 27 28 if __name__ == '__main__': 29 30 for t in threads: 31 t.start() 32 33 print ("all over %s" %ctime())
1 Begin listening to 水手. Sat Jan 14 13:44:10 2017 2 Begin recording the python线程! Sat Jan 14 13:44:10 2017 3 all over Sat Jan 14 13:44:10 2017 #先打印三个出来; 主线程结束了 4 5 end listening Sat Jan 14 13:44:13 2017 #等待3秒,打印这1个; 子线程还没有结束,会继续往下运行 6 7 end recording Sat Jan 14 13:44:15 2017 #再等待2秒,打印这1个
示例2: 用Daemon方法示例(设置t为守护线程,就是子线程,跟着主线程一起退出)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 from time import ctime,sleep 7 import time 8 9 def ListenMusic(name): 10 11 print ("Begin listening to %s. %s" %(name,ctime())) 12 sleep(3) 13 print("end listening %s"%ctime()) 14 15 def RecordBlog(title): 16 17 print ("Begin recording the %s! %s" %(title,ctime())) 18 sleep(5) 19 print('end recording %s'%ctime()) 20 21 #创建一个列表,把t1和t2加到列表中去 22 threads = [] 23 t1 = threading.Thread(target=ListenMusic,args=('水手',)) 24 t2 = threading.Thread(target=RecordBlog,args=('python线程',)) 25 threads.append(t1) 26 threads.append(t2) 27 28 if __name__ == '__main__': 29 30 for t in threads: 31 t.setDaemon(True) #设置t为守护线程; 注意:一定在start()之前设置,否则会报错 32 33 t.start() 34 35 print ("all over %s" %ctime())
1 Begin listening to 水手. Sat Jan 14 13:51:30 2017 #三个同时打印出来 2 Begin recording the python线程! Sat Jan 14 13:51:30 2017 3 all over Sat Jan 14 13:51:30 2017
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 from time import ctime,sleep 7 import time 8 9 def ListenMusic(name): 10 11 print ("Begin listening to %s. %s" %(name,ctime())) 12 sleep(3) 13 print("end listening %s"%ctime()) 14 15 def RecordBlog(title): 16 17 print ("Begin recording the %s! %s" %(title,ctime())) 18 sleep(5) 19 print('end recording %s'%ctime()) 20 21 #创建一个列表,把t1和t2加到列表中去 22 threads = [] 23 t1 = threading.Thread(target=ListenMusic,args=('水手',)) 24 t2 = threading.Thread(target=RecordBlog,args=('python线程',)) 25 threads.append(t1) 26 threads.append(t2) 27 28 if __name__ == '__main__': 29 30 t1.setDaemon(True) #设置t1为守护线程; 注意:一定在start之前设置,否则会报错 31 for t in threads: 32 33 t.start() 34 35 print ("all over %s" %ctime())
1 Begin listening to 水手. Sat Jan 14 14:02:07 2017 2 Begin recording the python线程! Sat Jan 14 14:02:07 2017 3 all over Sat Jan 14 14:02:07 2017 #设置t1为守护线程,所以会先把这三条先打印出来 4 5 end listening Sat Jan 14 14:02:10 2017 #再等待3秒打印t2, 6 7 end recording Sat Jan 14 14:02:12 2017 #再等待3秒打印这条出来
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 from time import ctime,sleep 7 import time 8 9 def ListenMusic(name): 10 11 print ("Begin listening to %s. %s" %(name,ctime())) 12 sleep(3) 13 print("end listening %s"%ctime()) 14 15 def RecordBlog(title): 16 17 print ("Begin recording the %s! %s" %(title,ctime())) 18 sleep(5) 19 print('end recording %s'%ctime()) 20 21 #创建一个列表,把t1和t2加到列表中去 22 threads = [] 23 t1 = threading.Thread(target=ListenMusic,args=('水手',)) 24 t2 = threading.Thread(target=RecordBlog,args=('python线程',)) 25 threads.append(t1) 26 threads.append(t2) 27 28 if __name__ == '__main__': 29 30 t2.setDaemon(True) # 设置t2为守护线程; 注意:一定在start之前设置,否则会报错 31 for t in threads: 32 33 t.start() 34 35 print ("all over %s" %ctime())
1 Begin listening to 水手. Sat Jan 14 14:17:09 2017 2 Begin recording the python线程! Sat Jan 14 14:17:09 2017 3 all over Sat Jan 14 14:17:09 2017 #先打印这三条 4 5 end listening Sat Jan 14 14:17:12 2017 #等待3秒,再打印这条;t1结束后,主线程也结束了。
1 #执行结果是什么? 2 3 i = 0 4 for i in range(10): 5 i += 1 6 print(i) 7 8 执行结果: 9 10
示例:getName()方法 (一般没什么用)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 from time import ctime,sleep 7 import time 8 9 def ListenMusic(name): 10 11 print ("Begin listening to %s. %s" %(name,ctime())) 12 sleep(3) 13 print("end listening %s"%ctime()) 14 15 def RecordBlog(title): 16 17 print ("Begin recording the %s! %s" %(title,ctime())) 18 sleep(5) 19 print('end recording %s'%ctime()) 20 21 #创建一个列表,把t1和t2加到列表中去 22 threads = [] 23 t1 = threading.Thread(target=ListenMusic,args=('水手',)) 24 t2 = threading.Thread(target=RecordBlog,args=('python线程',)) 25 threads.append(t1) 26 threads.append(t2) 27 28 if __name__ == '__main__': 29 30 t2.setDaemon(True) # 设置t为守护进程; 注意:一定在start之前设置,否则会报错 31 for t in threads: 32 t.start() 33 print(t.getName()) #返回线程名称:Thread-1 34 35 print ("all over %s" %ctime())
1 Begin listening to 水手. Sat Jan 14 14:36:44 2017 2 Thread-1 #返回线程名称 3 Begin recording the python线程! Sat Jan 14 14:36:44 2017 4 Thread-2 #返回默认的线程名称 5 all over Sat Jan 14 14:36:44 2017 6 end listening Sat Jan 14 14:36:47 2017
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 from time import ctime,sleep 7 import time 8 9 def ListenMusic(name): 10 11 print ("Begin listening to %s. %s" %(name,ctime())) 12 sleep(3) 13 print("end listening %s"%ctime()) 14 15 def RecordBlog(title): 16 17 print ("Begin recording the %s! %s" %(title,ctime())) 18 sleep(5) 19 print('end recording %s'%ctime()) 20 21 #创建一个列表,把t1和t2加到列表中去 22 threads = [] 23 t1 = threading.Thread(target=ListenMusic,args=('水手',)) 24 t2 = threading.Thread(target=RecordBlog,args=('python线程',)) 25 threads.append(t1) 26 threads.append(t2) 27 28 if __name__ == '__main__': 29 30 t2.setDaemon(True) #设置t为守护进程; 注意:一定在start之前设置,否则会报错 31 for t in threads: 32 t.start() 33 34 print("count:", threading.active_count()) #判断有多少个线程的数量 35 36 while threading.active_count()==1: #等于1就相当于只有一个主线程,没有子线程 37 38 print ("all over %s" %ctime())
1 Begin listening to 水手. Sat Jan 14 14:49:00 2017 2 count: 2 3 Begin recording the python线程! Sat Jan 14 14:49:00 2017 4 count: 3 #得到的线程数量 5 end listening Sat Jan 14 14:49:03 2017
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once.
This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features havegrown
to depend on the guarantees that it enforces.) 上面的核心意思就是:无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。
同步: 当进程执行到一个IO(等待外部数据)的时候你---->会一直等:同步 (示例: 打电话)
1、对于IO密集型的任务: python的多线程是有意义的,可以采用:多进程+协程的方式
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 def sub(): 9 global num 10 11 # num -= 1 12 temp=num 13 time.sleep(0.001) #别外75个线程,拿到100了,时间不固定。 14 num=temp-1 15 16 num =100 17 18 l=[] 19 20 for i in range(100): 21 t=threading.Thread(target=sub) 22 t.start() 23 l.append(t) 24 25 for t in l: 26 t.join() 27 28 print(num)
1 73 or 75 (这个值是随机的,会不断变化)
示例2:加锁 (加锁的作用:就是把多线程变成串行,结果不会变)
1 #加锁的作用:就是把多线程变成串行,结果就不会变) 2 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 #Author: nulige 6 7 import threading 8 import time 9 10 def sub(): 11 12 global num 13 14 # num -= 1 15 lock.acquire() #获取锁 16 temp=num 17 time.sleep(0.001) 18 num=temp-1 19 lock.release() #释放锁 20 21 num =100 22 23 l=[] 24 lock=threading.Lock() 25 26 for i in range(100): 27 t=threading.Thread(target=sub) 28 t.start() 29 l.append(t) 30 31 for t in l: 32 t.join() 33 34 print (num)
1 0
为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 ,具体我们通过下图进行讲解
既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,这可以说是Python早期版本的遗留问题。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # Author: nulige 4 5 import threading 6 import time 7 8 9 class MyThread(threading.Thread): 10 def actionA(self): 11 A.acquire() # count=1 12 print(self.name, "gotA", time.ctime()) 13 time.sleep(2) 14 15 B.acquire() # count=2 16 print(self.name, "gotB", time.ctime()) 17 time.sleep(1) 18 19 B.release() # count=1 20 A.release() # count=0 21 22 def actionB(self): 23 B.acquire() # count=1 24 print(self.name, "gotB", time.ctime()) 25 time.sleep(2) 26 27 A.acquire() # count=2 28 print(self.name, "gotA", time.ctime()) 29 time.sleep(1) 30 31 A.release() # count=1 32 B.release() # count=0 33 34 def run(self): 35 self.actionA() 36 self.actionB() 37 38 39 if __name__ == '__main__': 40 41 A = threading.Lock() 42 B = threading.Lock() 43 44 L = [] 45 46 for i in range(5): 47 t = MyThread() 48 t.start() 49 L.append(t) 50 51 for i in L: 52 i.join() 53 54 print("ending.....")
1 Thread-1 gotA Mon Jan 16 17:33:58 2017 2 Thread-1 gotB Mon Jan 16 17:34:00 2017 3 Thread-1 gotB Mon Jan 16 17:34:01 2017 4 Thread-2 gotA Mon Jan 16 17:34:01 2017 #死锁,一直卡在这里
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading 6 import time 7 8 class MyThread(threading.Thread): 9 10 def actionA(self): 11 12 r_lcok.acquire() #count=1 13 print(self.name,"gotA",time.ctime()) 14 time.sleep(2) 15 16 r_lcok.acquire() #count=2 17 print(self.name,"gotB",time.ctime()) 18 time.sleep(1) 19 20 r_lcok.release() #count=1 21 r_lcok.release() #count=0 22 23 24 def actionB(self): 25 26 r_lcok.acquire() #count=1 27 print(self.name,"gotB",time.ctime()) 28 time.sleep(2) 29 30 r_lcok.acquire() #count=2 31 print(self.name,"gotA",time.ctime()) 32 time.sleep(1) 33 34 r_lcok.release() #count=1 35 r_lcok.release() #count=0 36 37 def run(self): 38 39 self.actionA() 40 self.actionB() 41 42 if __name__ == '__main__': 43 44 r_lcok=threading.RLock() 45 L=[] 46 47 for i in range(5): 48 t=MyThread() 49 t.start() 50 L.append(t) 51 52 for i in L: 53 i.join() 54 55 print("ending.....")

1 Thread-1 gotA Mon Jan 16 17:38:42 2017 2 Thread-1 gotB Mon Jan 16 17:38:44 2017 3 Thread-1 gotB Mon Jan 16 17:38:45 2017 4 Thread-1 gotA Mon Jan 16 17:38:47 2017 5 Thread-3 gotA Mon Jan 16 17:38:48 2017 6 Thread-3 gotB Mon Jan 16 17:38:50 2017 7 Thread-4 gotA Mon Jan 16 17:38:51 2017 8 Thread-4 gotB Mon Jan 16 17:38:53 2017 9 Thread-5 gotA Mon Jan 16 17:38:54 2017 10 Thread-5 gotB Mon Jan 16 17:38:56 2017 11 Thread-5 gotB Mon Jan 16 17:38:57 2017 12 Thread-5 gotA Mon Jan 16 17:38:59 2017 13 Thread-3 gotB Mon Jan 16 17:39:00 2017 14 Thread-3 gotA Mon Jan 16 17:39:02 2017 15 Thread-4 gotB Mon Jan 16 17:39:03 2017 16 Thread-4 gotA Mon Jan 16 17:39:05 2017 17 Thread-2 gotA Mon Jan 16 17:39:06 2017 18 Thread-2 gotB Mon Jan 16 17:39:08 2017 19 Thread-2 gotB Mon Jan 16 17:39:09 2017 20 Thread-2 gotA Mon Jan 16 17:39:11 2017 21 ending.....
An event is a simple synchronization object;the event represents an internal flag, and threads can wait for the flag to be set, or set or clear the flag themselves. event = threading.Event() # a client thread can wait for the flag to be set event.wait() # a server thread can set or reset it event.set() event.clear() If the flag is set, the wait method doesn’t do anything. If the flag is cleared, wait will block until it becomes set again. Any number of threads may wait for the same event.
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚大家都要加班到22:00。") 5 print(event.isSet()) 6 event.set() 7 time.sleep(5) 8 print("BOSS:<22:00>可以下班了。") 9 print(event.isSet()) 10 event.set() 11 class Worker(threading.Thread): 12 def run(self): 13 event.wait() 14 print("Worker:哎……命苦啊!") 15 time.sleep(1) 16 event.clear() 17 event.wait() 18 print("Worker:OhYeah!") 19 if __name__=="__main__": 20 event=threading.Event() 21 threads=[] 22 for i in range(5): 23 threads.append(Worker()) 24 threads.append(Boss()) 25 for t in threads: 26 t.start() 27 for t in threads: 28 t.join()

1 BOSS:今晚大家都要加班到22:00。 2 False 3 Worker:哎……命苦啊! 4 Worker:哎……命苦啊! 5 Worker:哎……命苦啊! 6 Worker:哎……命苦啊! 7 Worker:哎……命苦啊! 8 BOSS:<22:00>可以下班了。 9 False 10 Worker:OhYeah! 11 Worker:OhYeah! 12 Worker:OhYeah! 13 Worker:OhYeah! 14 Worker:OhYeah! 15 ending.....
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import threading,time 6 7 class myThread(threading.Thread): 8 def run(self): #启动后,执行run方法 9 if semaphore.acquire(): #加把锁,可以放进去多个(相当于5把锁,5个钥匙,同时有5个线程) 10 print(self.name) 11 time.sleep(5) 12 semaphore.release() 13 14 if __name__=="__main__": 15 semaphore=threading.Semaphore(5) #同时能有几个线程进去(设置为5就是一次5个线程进去),类似于停车厂一次能停几辆车 16 17 thrs=[] #空列表 18 for i in range(100): #100个线程 19 thrs.append(myThread()) #加线程对象 20 21 for t in thrs: 22 t.start() #分别启动
1 Thread-1 2 Thread-2 3 Thread-3 4 Thread-4 5 Thread-5 #5个线程同时出来 6 7 Thread-8 8 Thread-6 9 Thread-9 10 Thread-7 11 Thread-10 #每隔3秒再打印5个出来 12 13 部分省略.......
十一、多线程利器---队列(queue) ( 重点掌握)
1 #两个线程同时删除5,所以会报错,因为只有一个5,一个线程删除了就没有啦。 2 3 import threading,time 4 5 li=[1,2,3,4,5] 6 7 def pri(): 8 while li: 9 a=li[-1] #取值 10 print(a) 11 time.sleep(1) 12 li.remove(a) #remove按索引去删除内容 13 14 t1=threading.Thread(target=pri,args=()) #线程1 15 t1.start() 16 t2=threading.Thread(target=pri,args=()) #线程2 17 t2.start()

1 #会报错,因为只有一个5,删除了就没有啦,不能两个线程同时删除。 2 3 5 4 5 5 4 6 Exception in thread Thread-2: 7 Traceback (most recent call last): 8 File "C:\Python3.5\lib\threading.py", line 914, in _bootstrap_inner 9 self.run() 10 File "C:\Python3.5\lib\threading.py", line 862, in run 11 self._target(*self._args, **self._kwargs) 12 File "D:/python/day34/s10.py", line 34, in pri 13 li.remove(a) 14 ValueError: list.remove(x): x not in list 15 16 3 17 2 18 1
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
1、先进先出模式 (谁先进去,谁先出来) ---->class queue.Queue(maxsize)
2、先进后出模式 (先进去的,最后出来) ---->class queue.LifoQueue(maxsize)
3、优先级模式 (优先级越低,先出来) ---->class queue.PriorityQueue(maxsize)
1 #先进先出 (原则:谁先进去,谁就先出来) 2 import queue #线程 队列 3 4 q=queue.Queue() #先进先出 5 q.put(12) 6 q.put("hello") 7 q.put({"name":"yuan"}) 8 9 while 1: 10 data=q.get() 11 print(data) 12 print("-----------")
1 12 #他是第1个进去的,所以他先出来 2 ----------- 3 hello 4 ----------- 5 {'name': 'yuan'} 6 -----------
1 import queue #队列,解决多线程问题 (注意:python2.7 Queue的首字母是大写的) 2 3 4 # q=queue.Queue(3) #1、设置3就是满了,默认(FIFO 先进先出 ) #先进后出(手枪弹夹,后压进去的,先出来) 5 q=queue.Queue() 6 q.put(12) 7 q.put("hello") 8 q.put({"name":"yuan"}) 9 10 q.put(34) 11 # q.put(34,False) #2、blook=True,如果改成Flase,提示你满了,会报错,但不会卡在这里 12 13 14 while 1: 15 data=q.get() #1、会卡着,等值进来 16 # data=q.get(block=False) #3、队列为空 17 print(data) 18 print("-----------")
1 #先进后出 2 import queue 3 4 q=queue.LifoQueue() #先进后出 5 6 q.put(12) 7 q.put("hello") 8 q.put({"name":"yuan"}) 9 10 while 1: 11 data=q.get() #卡着,等值进来, 12 print(data) 13 print("-----------")
1 {'name': 'yuan'} #后进来的先出去 2 ----------- 3 hello 4 ----------- 5 12 6 -----------
1 #优先级 2 import queue 3 4 q=queue.PriorityQueue() #优先级 5 6 q.put([3,12]) 7 q.put([2,"hello"]) #2先出来,按优化级 级别是:2--->3--->4 从级到高 8 q.put([4,{"name":"yuan"}]) 9 10 while 1: 11 data=q.get() 12 print(data[1]) 13 print("-----------------------")
1 hello #2先出来,按优先级 2 ----------------------- 3 12 4 ----------------------- 5 {'name': 'yuan'} 6 -----------------------
创建一个“队列”对象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True, get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。 此包中的常用方法(q = Queue.Queue()): q.qsize() 返回队列的大小 q.empty() 如果队列为空,返回True,反之False q.full() 如果队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 相当q.get(False) 非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 相当q.put(item, False) q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操作
示例1: q.qsize() and q.empty() and q.full
1 import queue 2 3 q=queue.Queue() 4 q.put(12) 5 q.put("hello") 6 q.put({"name":"yuan"}) 7 8 print(q.qsize()) #判断队列大小 9 print(q.empty()) #判断队列是否为空 10 print(q.full) #判断队列是否满了 11 12 while 1: 13 data=q.get() 14 print(data) 15 print("-----------")
3 --->q.qsize() False ---->q.empty() <bound method Queue.full of <queue.Queue object at 0x01315A70>> --->full
示例2:g.put_nowait() 相当于q.get(Flase)
1 import queue 2 3 q=queue.Queue(3) 4 5 q.put(12) 6 q.put([2,"hello"]) 7 q.put([4,{"name":"yuan"}]) 8 9 q.put_nowait(56) #相当于q.get(Flase) 10 11 while 1: 12 data=q.get() 13 print(data)
1 Traceback (most recent call last): 2 File "D:/python/day34/s7.py", line 79, in <module> 3 q.put_nowait(56) #相当于q.get(Flase) 4 File "C:\Python3.5\lib\queue.py", line 184, in put_nowait 5 return self.put(item, block=False) 6 File "C:\Python3.5\lib\queue.py", line 130, in put 7 raise Full 8 queue.Full
1 #生产者消费者模型(生产者先执行,再吃包子。) 2 3 import time,random 4 import queue,threading 5 6 q = queue.Queue() 7 8 def Producer(name): 9 count = 0 10 while count <10: 11 print("making........") 12 time.sleep(random.randrange(3)) #产生一个随机数(1-2秒之间) 13 q.put(count) 14 print('Producer %s has produced %s baozi..' %(name, count)) 15 count +=1 16 print("ok......") 17 18 def Consumer(name): 19 count = 0 20 while count <10: 21 time.sleep(random.randrange(4)) #产生一个随机数(1-3秒之间) 22 if not q.empty(): 23 data = q.get() 24 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 25 else: 26 print("-----no baozi anymore----") 27 count +=1 28 29 p1 = threading.Thread(target=Producer, args=('A君',)) 30 c1 = threading.Thread(target=Consumer, args=('B君',)) 31 32 p1.start() 33 c1.start()

1 making........ 2 Producer A君 has produced 0 baozi.. 3 ok...... 4 making........ 5 Consumer B君 has eat 0 baozi... 6 Producer A君 has produced 1 baozi.. 7 ok...... 8 making........ 9 Consumer B君 has eat 1 baozi... 10 Producer A君 has produced 2 baozi.. 11 ok...... 12 making........ 13 Consumer B君 has eat 2 baozi... 14 Producer A君 has produced 3 baozi.. 15 ok...... 16 making........ 17 Producer A君 has produced 4 baozi.. 18 ok...... 19 making........ 20 Consumer B君 has eat 3 baozi... 21 Producer A君 has produced 5 baozi.. 22 ok...... 23 making........ 24 Producer A君 has produced 6 baozi.. 25 ok...... 26 making........ 27 Consumer B君 has eat 4 baozi... 28 Producer A君 has produced 7 baozi.. 29 ok...... 30 making........ 31 Producer A君 has produced 8 baozi.. 32 ok...... 33 making........ 34 Producer A君 has produced 9 baozi.. 35 ok...... 36 Consumer B君 has eat 5 baozi... 37 Consumer B君 has eat 6 baozi... 38 Consumer B君 has eat 7 baozi... 39 Consumer B君 has eat 8 baozi... 40 Consumer B君 has eat 9 baozi... 41 -----no baozi anymore---- 42 -----no baozi anymore---- 43 -----no baozi anymore---- 44 -----no baozi anymore---- 45 -----no baozi anymore---- 46 -----no baozi anymore---- 47 -----no baozi anymore---- 48 -----no baozi anymore---- 49 -----no baozi anymore---- 50 -----no baozi anymore----
示例2: 供不应求,吃包子的人太多了(1个人在生产包子,3个人在吃包子)
1 #生产者消费者模型(供不应求,吃的人太多了,生产不赢) 2 3 import time,random 4 import queue,threading 5 6 q = queue.Queue() 7 8 def Producer(name): 9 count = 0 10 while count <10: 11 print("making........") 12 time.sleep(random.randrange(3)) #产生一个随机数(1-2秒之间) 13 q.put(count) 14 print('Producer %s has produced %s baozi..' %(name, count)) 15 count +=1 16 print("ok......") 17 18 def Consumer(name): 19 count = 0 20 while count <10: 21 time.sleep(random.randrange(4)) #产生一个随机数(1-3秒之间) 22 if not q.empty(): 23 data = q.get() 24 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 25 else: 26 print("-----no baozi anymore----") 27 count +=1 28 29 p1 = threading.Thread(target=Producer, args=('A君',)) #1个人生产包子 30 c1 = threading.Thread(target=Consumer, args=('B君',)) 31 c2 = threading.Thread(target=Consumer, args=('C君',)) #3个人在吃包子,导致吃包子的人太多啦,生产不赢 32 c3 = threading.Thread(target=Consumer, args=('D君',)) 33 34 p1.start() 35 c1.start() 36 c2.start() 37 c3.start()

1 making........ 2 -----no baozi anymore---- #生产不赢,供不应求,吃包子的人太多了 3 -----no baozi anymore---- 4 Producer A君 has produced 0 baozi.. 5 ok...... 6 making........ 7 Producer A君 has produced 1 baozi.. 8 ok...... 9 making........ 10 Consumer C君 has eat 0 baozi... 11 Consumer D君 has eat 1 baozi... 12 -----no baozi anymore---- 13 -----no baozi anymore---- 14 -----no baozi anymore---- 15 -----no baozi anymore---- 16 -----no baozi anymore---- 17 -----no baozi anymore---- 18 Producer A君 has produced 2 baozi.. 19 ok...... 20 making........ 21 Producer A君 has produced 3 baozi.. 22 ok...... 23 making........ 24 Producer A君 has produced 4 baozi.. 25 ok...... 26 making........ 27 Consumer C君 has eat 2 baozi... 28 Consumer D君 has eat 3 baozi... 29 Consumer D君 has eat 4 baozi... 30 -----no baozi anymore---- 31 -----no baozi anymore---- 32 Producer A君 has produced 5 baozi.. 33 ok...... 34 making........ 35 Producer A君 has produced 6 baozi.. 36 ok...... 37 making........ 38 Producer A君 has produced 7 baozi.. 39 ok...... 40 making........ 41 Producer A君 has produced 8 baozi.. 42 ok...... 43 making........ 44 Consumer B君 has eat 5 baozi... 45 Consumer C君 has eat 6 baozi... 46 Consumer D君 has eat 7 baozi... 47 Producer A君 has produced 9 baozi.. 48 ok...... 49 Consumer B君 has eat 8 baozi... 50 Consumer C君 has eat 9 baozi... 51 -----no baozi anymore---- 52 -----no baozi anymore---- 53 -----no baozi anymore---- 54 -----no baozi anymore---- 55 -----no baozi anymore---- 56 -----no baozi anymore---- 57 -----no baozi anymore---- 58 -----no baozi anymore---- 59 -----no baozi anymore---- 60 -----no baozi anymore---- 61 -----no baozi anymore---- 62 -----no baozi anymore---- 63 -----no baozi anymore---- 64 -----no baozi anymore---- 65 -----no baozi anymore---- 66 -----no baozi anymore---- 67 -----no baozi anymore---- 68 -----no baozi anymore---- 69 -----no baozi anymore---- 70 -----no baozi anymore----
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import time,random 6 import queue,threading 7 8 q = queue.Queue() 9 10 def Producer(name): 11 count = 0 12 while count <10: 13 print("making........") 14 time.sleep(5) 15 q.put(count) 16 print('Producer %s has produced %s baozi..' %(name, count)) 17 count +=1 18 q.task_done() #发信号告诉队列在生产包子,让join接收,就开始吃包子 19 print("ok......") 20 21 def Consumer(name): 22 count = 0 23 while count <10: 24 time.sleep(random.randrange(4)) #产生一个随机数(1秒-3秒之间) 25 print("waiting...等待包子做的过程中...") 26 q.join() #join开始接收 27 data = q.get() 28 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 29 count +=1 30 31 p1 = threading.Thread(target=Producer, args=('A君',)) 32 c1 = threading.Thread(target=Consumer, args=('B君',)) 33 c2 = threading.Thread(target=Consumer, args=('C君',)) 34 c3 = threading.Thread(target=Consumer, args=('D君',)) 35 36 p1.start() 37 c1.start() 38 c2.start() 39 c3.start()

1 making........ 2 waiting...等待包子做的过程中... 3 waiting...等待包子做的过程中... 4 waiting...等待包子做的过程中... 5 Producer A君 has produced 0 baozi.. 6 ok...... 7 making........ 8 Consumer D君 has eat 0 baozi... 9 waiting...等待包子做的过程中... 10 Producer A君 has produced 1 baozi.. 11 Consumer B君 has eat 1 baozi... 12 waiting...等待包子做的过程中... 13 ok...... 14 making........ 15 部分代码省略......
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import time,random 6 import queue,threading 7 8 q = queue.Queue() 9 10 def Producer(name): 11 count = 0 12 while count <10: 13 print("making.....正在制作包子...") 14 time.sleep(5) 15 q.put(count) 16 print('Producer %s has produced %s baozi..' %(name, count)) 17 count +=1 18 q.join() 19 print("ok......") 20 21 def Consumer(name): 22 count = 0 23 while count <10: 24 time.sleep(random.randrange(4)) #产生一个随机数(1秒-3秒之间) 25 data = q.get() 26 print("eating.......") 27 time.sleep(4) #4秒钟这后 28 q.task_done() #给他发一个信号,才打印ok 29 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 30 count +=1 31 32 p1 = threading.Thread(target=Producer, args=('A君',)) 33 c1 = threading.Thread(target=Consumer, args=('B君',)) 34 c2 = threading.Thread(target=Consumer, args=('C君',)) 35 c3 = threading.Thread(target=Consumer, args=('D君',)) 36 37 p1.start() 38 c1.start() 39 c2.start() 40 c3.start()

1 making.....正在制作包子... 2 Producer A君 has produced 0 baozi.. 3 eating....... 4 Consumer B君 has eat 0 baozi... 5 ok...... 6 making.....正在制作包子... 7 Producer A君 has produced 1 baozi.. 8 eating....... 9 Consumer C君 has eat 1 baozi... 10 ok...... 11 making.....正在制作包子... 12 Producer A君 has produced 2 baozi.. 13 eating....... 14 Consumer D君 has eat 2 baozi... 15 ok...... 16 making.....正在制作包子... 17 Producer A君 has produced 3 baozi.. 18 eating....... 19 Consumer B君 has eat 3 baozi... 20 ok...... 21 making.....正在制作包子... 22 Producer A君 has produced 4 baozi.. 23 eating....... 24 Consumer C君 has eat 4 baozi... 25 ok...... 26 making.....正在制作包子... 27 Producer A君 has produced 5 baozi.. 28 eating....... 29 Consumer D君 has eat 5 baozi... 30 ok...... 31 making.....正在制作包子... 32 Producer A君 has produced 6 baozi.. 33 eating....... 34 Consumer B君 has eat 6 baozi... 35 ok...... 36 making.....正在制作包子...
(例如:put or get) 对方就是join。如果我put or get 你就处理。(类似于收到信号就处理)
multiprocessing (主要解决GIL问题)
multiprocessing (主要解决GIL问题)
is a package that supports spawning processes using an API similar to the threading module. The ultiprocessing
package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.multiprocessing
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情景。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 #多进程调用(并行) 6 7 from multiprocessing import Process 8 import time 9 10 11 def f(name): 12 time.sleep(1) 13 print('hello', name,time.ctime()) 14 15 if __name__ == '__main__': 16 p_list=[] 17 for i in range(3): #子进程 18 19 p = Process(target=f, args=('alvin',)) 20 p_list.append(p) 21 p.start() 22 23 for i in p_list: 24 i.join() 25 26 print('end') #主进程
1 hello alvin Mon Jan 16 18:38:08 2017 #并行,三个同时出现 2 hello alvin Mon Jan 16 18:38:08 2017 3 hello alvin Mon Jan 16 18:38:08 2017 4 end
示例1: 1秒钟这后,4条消息同时执行
1 from multiprocessing import Process 2 import time 3 4 class MyProcess(Process): 5 6 def run(self): 7 time.sleep(1) 8 print ('hello', self.name,time.ctime()) 9 10 11 if __name__ == '__main__': 12 p_list=[] 13 for i in range(3): 14 p = MyProcess() #进程对象 15 p.start() #启动执行run方法 16 p_list.append(p) 17 18 for p in p_list: 19 p.join() #子进程没有执行完,主进程会一直等待 20 21 print('end')
1 hello MyProcess-1 Mon Jan 16 18:56:58 2017 #结果同时出来 2 hello MyProcess-2 Mon Jan 16 18:56:58 2017 3 hello MyProcess-3 Mon Jan 16 18:56:58 2017 4 end
示例2:daemon=True 是属性,不是方法
1 #设置为守护进程,打印的就是end 2 3 from multiprocessing import Process 4 import time 5 6 class MyProcess(Process): 7 8 def run(self): 9 time.sleep(1) 10 print ('hello', self.name,time.ctime()) 11 12 if __name__ == '__main__': 13 p_list=[] 14 15 for i in range(3): 16 p = MyProcess() #进程对象 17 p.daemon=True #是属性,不是方法 18 p.start() #启动执行sun方法 19 p_list.append(p) 20 21 print('end') #主进程执行完之后,不管守护进程
1 end
1 from multiprocessing import Process 2 import os 3 import time 4 5 def info(title): 6 print("title:", title) 7 print('parent process:', os.getppid()) #父进程的pid 8 print('process id:', os.getpid()) #打印进程号 9 10 def f(name): 11 info('function f') 12 print('hello', name) 13 14 if __name__ == '__main__': 15 info('main process line') 16 17 time.sleep(1) 18 print("------------------") 19 p = Process(target=info, args=('yuan',)) 20 p.start() 21 p.join()
1 title: main process line 2 parent process: 4204 3 process id: 7280 4 ------------------ 5 title: yuan 6 parent process: 7280 7 process id: 3596
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import time 6 from multiprocessing import Process 7 8 class MyProcess(Process): 9 10 def __init__(self,num): 11 super(MyProcess,self).__init__() 12 self.num=num 13 14 def run(self): 15 time.sleep(1) 16 print(self.pid) 17 print(self.is_alive()) 18 print(self.num) 19 time.sleep(1) 20 21 22 if __name__ == '__main__': 23 p_list=[] 24 for i in range(10): 25 p = MyProcess(i) 26 p_list.append(p) 27 28 for p in p_list: 29 p.start() 30 31 print('main process end')

1 main process end 2 7104 3 True 4 2 5 7960 6 True 7 3 8 9016 9 True 10 1 11 4604 12 True 13 7 14 1280 15 True 16 6 17 7896 18 True 19 5 20 7704 21 True 22 0 23 8760 24 True 25 9 26 7088 27 True 28 8 29 2976 30 True 31 4
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 #进程通信 (拷贝了一份数据),在linux上ID是一样的。 6 import time 7 import multiprocessing 8 9 def foo(q): 10 time.sleep(1) 11 print("son process",id(q)) 12 q.put(123) 13 q.put("yuan") 14 15 if __name__ == '__main__': 16 q=multiprocessing.Queue() 17 p=multiprocessing.Process(target=foo,args=(q,)) #传q是数据复制了一份,消耗内存 18 p.start() 19 print("main process",id(q)) 20 print(q.get()) 21 print(q.get())
1 main process 31888848 #在windows上运行结果 2 son process 35289968 3 123 4 yuan 5 ------------------------------------------------------------ 6 ('main process', 20253136) #在linux上运行结果 7 ('son process', 20253136) 8 123 9 yuan
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 #管道 6 from multiprocessing import Process, Pipe 7 8 def f(conn): 9 conn.send([12, {"name":"yuan"}, 'hello']) #发送消息 10 response=conn.recv() #接收消息 11 print("response",response) 12 conn.close() 13 print("q_ID2:",id(conn)) 14 15 if __name__ == '__main__': 16 17 parent_conn, child_conn = Pipe() 18 print("q_ID1:",id(child_conn)) 19 p = Process(target=f, args=(child_conn,)) 20 p.start() 21 22 print(parent_conn.recv()) #接收消息 prints "[42, None, 'hello']" 23 parent_conn.send("儿子你好!") #发送消息 24 p.join()
1 q_ID1: 11578512 2 [12, {'name': 'yuan'}, 'hello'] 3 response 儿子你好! 4 q_ID2: 31830928
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 from multiprocessing import Process, Manager 6 7 def f(d, l,n): 8 d[n] = '1' #{0:"1"} 9 d['2'] = 2 #{0:"1","2":2} 10 l.append(n) #[] 11 # print(l) 12 print("son process:",id(d),id(l)) 13 14 if __name__ == '__main__': 15 16 with Manager() as manager: #用with相当于打开文件就不要关闭了 17 18 d = manager.dict() #{} #主进程 19 20 l = manager.list(range(5)) #[0,1,2,3,4] 21 22 print("main process:",id(d),id(l)) 23 24 p_list = [] 25 26 for i in range(10): #10个子进程 27 p = Process(target=f, args=(d,l,i)) #主进程传过去 28 p.start() 29 p_list.append(p) 30 31 for res in p_list: 32 res.join() 33 34 print(d) 35 print(l)
1 main process: 23793136 50481712 2 son process: 34508592 34286736 3 son process: 23695120 23342192 4 son process: 30969648 30682256 5 son process: 23760656 23473264 6 son process: 23432976 23080048 7 son process: 31428368 31140976 8 son process: 18976592 18754736 9 son process: 20025072 19606608 10 son process: 23301904 23014512 11 son process: 26185488 25898096 12 {0: '1', 1: '1', 2: '1', 3: '1', 4: '1', '2': 2, 6: '1', 7: '1', 8: '1', 9: '1', 5: '1'} 13 [0, 1, 2, 3, 4, 1, 2, 3, 9, 6, 0, 8, 5, 4, 7]
python2: 在python2中是元组,因为共享一个屏幕,会出现串行的情况,所以需要加把锁
python3: 在python3中,加不加锁,都不会出现串行的问题
1 #有锁的情况 2 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 #Author: nulige 6 7 from multiprocessing import Process,Lock 8 import time 9 10 def f(l, i): 11 l.acquire() 12 time.sleep(1) 13 print('hello world %s' % i) 14 l.release() 15 16 if __name__ == '__main__': 17 lock = Lock() 18 19 for num in range(10): 20 Process(target=f, args=(lock, num)).start()
1 hello world 4 2 hello world 5 3 hello world 1 4 hello world 2 5 hello world 0 6 hello world 8 7 hello world 3 8 hello world 9 9 hello world 7 10 hello world 6
示例2: 有锁的情况
1 #有锁的情况 2 from multiprocessing import Process,Lock 3 4 def f(l,i): 5 l.acquire() 6 7 print('hello world',i) #python2是元组(因为共享一个屏幕,在python2中会串行,所以需要加把锁) 8 # print('hello world %s' %i) 9 10 l.release() 11 12 if __name__ == '__main__': 13 lock = Lock() 14 15 for num in range(10): 16 Process(target=f,args=(lock,num)).start()
1 hello world 1 2 hello world 7 3 hello world 3 4 hello world 2 5 hello world 4 6 hello world 0 7 hello world 8 8 hello world 9 9 hello world 6 10 hello world 5
1 #没有锁的情况(在python2.7下,才能看到效果) 2 from multiprocessing import Process,Lock 3 4 def f(l,i): 5 print('hello world %s' %i) 6 7 if __name__ == '__main__': 8 lock = Lock() 9 10 for num in range(100): 11 Process(target=f,args=(lock,num)).start()

1 #在python2.7中,没有锁会出现串行的情况 2 3 D:\python\day35>python2 s6.py 4 hello world 1 5 hello world 0 6 hello world 6 7 hello world 2 8 hello world 5 9 hhello world 4ello world 3 10 11 hhhhello world 15 12 hello world 9 13 hhello world 24 14 hhhello world 25 15 hello world 29 16 ello world 12hello world 28hello world 10hello world 49 17 hhhhhello world 59 18 hello world 63 19 hello world 78 20 hello world 77 21 ello world 46hello world 70hhello world 57hhello world 58 22 h 23 hhello world 13hhello world 54ello world 39h 24 ello world 44hhello world 69 25 hello world 88 26 hhhhhhello world 45 27 hello world 38hello world 22 28 hhello world 42 29 ello world 52hhello world 90ello world 30 30 hh 31 32 hhhello world 18 33 ello world 21h 34 hhhello world 32hhhello world 91 35 hh 36 ello world 89ello world 53hello world 85hh 37 hhhello world 27ello world 35 38 h 39 h 40 ello world 62h 41 hhhello world 23ello world 68ello world 14 42 ello world 65ello world 66ello world 73 43 hhhhhello world 71hhello world 37hh 44 45 ello world 11hello world 7ello world 8ello world 20ello world 50ello world 81ello world 17 46 ello world 31hello world 82 47 48 49 ello world 16ello world 83 50 51 ello world 94ello world 61 52 53 ello world 26hello world 96ello world 40ello world 64ello world 51ello world 48 54 55 ello world 33hello world 86hhello world 55ello world 75ello world 79ello world 41 56 ello world 74 57 58 59 60 ello world 76ello world 67ello world 56ello world 95 61 62 ello world 47ello world 84 63 64 ello world 72
- apply:同步方法
- apply_async :异步方法
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 from multiprocessing import Process,Pool 6 import time,os 7 8 def Foo(i): 9 time.sleep(1) 10 print(i) 11 if __name__ == '__main__': 12 13 pool = Pool(5) #进程池的数量 14 for i in range(100): #进程数 15 # pool.apply(func=Foo, args=(i,)) #同步接口 16 pool.apply_async(func=Foo, args=(i,)) #用异步接口 17 18 pool.close() 19 pool.join() #join和close顺序是固定的 20 print('end')

1 0 2 2 3 3 4 1 5 4 6 7 5 8 7 9 8 10 6 11 9 12 13 10 14 11 15 13 16 12 17 14 18 19 15 20 18 21 16 22 17 23 19 24 省略部分代码.......
1 from multiprocessing import Process,Pool 2 import time,os 3 4 def Foo(i): 5 time.sleep(1) 6 print(i) 7 print("son:",os.getpid()) 8 return "HELLO %s "%i 9 10 def Bar(arg): #回调函数 11 print(arg) 12 # print("hello") 13 # print("Bar:",os.getpid()) 14 15 if __name__ == '__main__': 16 17 pool = Pool(5) #进程池的数量 18 print("main pid", os.getpid()) 19 for i in range(100): #进程数 20 #pool.apply(func=Foo, args=(i,)) 21 # pool.apply_async(func=Foo, args=(i,)) 22 23 #回调函数: 就是某个动作或者函数执行成功后再去执行的函数 24 pool.apply_async(func=Foo, args=(i,),callback=Bar) 25 26 pool.close() 27 pool.join() 28 print('end')
1 main pid 1220 2 0 3 son: 3920 4 HELLO 0 5 1 6 son: 8660 7 HELLO 1 8 2 9 son: 1948 10 HELLO 2 11 3 12 son: 5808 13 HELLO 3 14 4 15 部分结果省略............
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 from multiprocessing import Process,Lock,Pool 6 import time 7 8 #进程池 9 def f(i): 10 print('hello world %s'% i) 11 time.sleep(1) 12 13 return i 14 15 def callback(data): #i的返回值给callback,其实就是个回执,告诉你干完活了。 16 print("exec done--->",data) 17 18 if __name__=='__main__': 19 lock = Lock() 20 21 pool = Pool(processes=5) 22 for num in range(100): 23 24 pool.apply_async(func=f, args=(num,), callback=callable) #子进程干完的结果,返回给callable 25 26 pool.close() 27 pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
1 hello world 0 #分别5行,5行输出结果 2 hello world 1 3 hello world 2 4 hello world 3 5 hello world 4 6 7 hello world 5 8 hello world 6 9 hello world 7 10 hello world 8 11 hello world 9 12 13 hello world 10 14 hello world 11 15 hello world 12 16 hello world 13 17 hello world 14
优点1: 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
优点2: 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
1 #生成器 2 def f(): 3 4 print("ok") 5 s = yield 6 6 print(s) 7 print("ok2") 8 yield 9 10 gen=f() 11 # print(gen) 12 # next(gen) #方法一 13 # next(gen) 14 15 RET=gen.__next__() #方法二 16 print(RET) 17 18 gen.send(5) #方法三
1 ok 2 6 3 5 4 ok2
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import time 6 import queue 7 8 def consumer(name): 9 print("--->ready to eat baozi........") 10 while True: 11 new_baozi = yield #yield实现上下文切换,传包子进来 12 print("[%s] is eating baozi %s" % (name,new_baozi)) 13 #time.sleep(1) 14 15 def producer(): 16 17 r = con.__next__() 18 r = con2.__next__() 19 n = 0 20 while 1: 21 time.sleep(1) 22 print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) 23 con.send(n) #发送告诉他有包子了 24 con2.send(n+1) 25 26 n +=2 27 28 if __name__ == '__main__': 29 con = consumer("c1") 30 con2 = consumer("c2") 31 producer()
1 --->ready to eat baozi........ 2 --->ready to eat baozi........ 3 [producer] is making baozi 0 and 1 4 [c1] is eating baozi 0 5 [c2] is eating baozi 1 6 [producer] is making baozi 2 and 3 7 [c1] is eating baozi 2 8 [c2] is eating baozi 3 9 [producer] is making baozi 4 and 5 10 [c1] is eating baozi 4 11 [c2] is eating baozi 5 12 [producer] is making baozi 6 and 7 13 [c1] is eating baozi 6 14 [c2] is eating baozi 7 15 [producer] is making baozi 8 and 9 16 [c1] is eating baozi 8 17 [c2] is eating baozi 9 18 [producer] is making baozi 10 and 11 19 [c1] is eating baozi 10 20 [c2] is eating baozi 11 21 省略部分........
1 from greenlet import greenlet 2 3 4 def test1(): 5 print(12) 6 gr2.switch() 7 print(34) 8 gr2.switch() 9 10 11 def test2(): 12 print(56) 13 gr1.switch() 14 print(78) 15 16 17 gr1 = greenlet(test1) 18 gr2 = greenlet(test2) 19 gr1.switch()
20 test2()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author: nulige 4 5 import gevent 6 7 import requests,time 8 9 start=time.time() 10 11 def f(url): 12 print('GET: %s' % url) 13 resp =requests.get(url) 14 data = resp.text 15 print('%d bytes received from %s.' % (len(data), url)) 16 17 gevent.joinall([ 18 19 gevent.spawn(f, 'https://www.python.org/'), 20 gevent.spawn(f, 'https://www.yahoo.com/'), 21 gevent.spawn(f, 'https://www.baidu.com/'), 22 gevent.spawn(f, 'https://www.sina.com.cn/'), 23 24 ]) 25 26 #上下时间对比 27 28 # f('https://www.python.org/') 29 # 30 # f('https://www.yahoo.com/') 31 # 32 # f('https://baidu.com/') 33 # 34 # f('https://www.sina.com.cn/') 35 36 print("cost time:",time.time()-start)