赞
踩
你好,我是goldsun,一个喜欢敲代码时那种感觉的普通大学生。
让我们加油,为了更美好的明天而战!
在Python中有一个全局解释器锁GIL(Global Interpreter Lock)。GIL源于Python设计之初的考虑,目的是使数据更加安全。现在我们见到的各种电脑基本上都是多核CPU的,多核CUP比单核CPU性能要更高,为了更好的利用多核处理器的性能就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。解决多线程之间数据一致性、完整性和状态同步的最简单的方法就是加锁,于是就有了GIL这把超级大锁,每一个单独的进程拥有唯一的GIL锁。
对CPU而言,在同一时间只能执行一个线程,在单核CPU下的多线程执行方式其实都是并发(Concurrent),而不是并行(Parallel)。并发和并行宏观上表现都为同时处理多路请求的情况,但它们具有本质区别。
举个例子来讲,一个进程代表一个工人,线程指工人做的工作,多线程就是一个工人同时干几样工作,但是工作时同时只能干一个工作却能快速的切换,比如他要洗衣服、做饭、扫地,你可以理解为他是按顺序来做这三件事,但是当做每件事的时间用时非常少的时候,宏观上就表现为他同时把三件事完成了。这样说你可能还不太理解,如果我们假设他需要做的工作为网购、洗衣服、做饭,假设他先去洗衣服,当把洗衣服放进洗衣机打开之后,多线程讲使他在此之间可以去做饭和网购,而不需要等洗衣机把衣服洗好之后才能去做饭和网购,这也就是我们所说的IO型代码。
但多进程就不一样了,每个进程代表一个工人,假如你电脑为8核CPU,那么你可以开8个进程也就等于雇了8个工人,他们8个就可以真正同时干8件工作了。
在Python多线程下,每个线程执行方式如下:
每个线程想要执行,必须拿到所在进程的GIL,我们可以把每个进程都看成有一把大锁将其锁着,只有拿到GIL这把唯一钥匙才能运行,因此每个进程同一时刻仅运行一个线程执行。在Python3中,GIL使用计数器进行控局,即若有一个线程执行到一定时间后还没执行完毕,那么它的GIL将被强制收回分配给别的线程。而实际上,每次“拿锁放锁”是会消耗系统资源的,因此有时候使用多线程并不会提高效率,反而会降低效率。分类说明:
一般来说多核的多线程比单核多线程效率更低,因为当单核多线程每次释放GIL的时候,下一个线程能直接获取到GIL,能够无缝执行,当多核环境中某个CPU释放GIL后,本该在其它CPU的线程也都会竞争获得此CPU的GIL,但很大可能GIL又被此CPU下的某个线程拿到,导致其它几个CPU上被唤醒的线程醒着等待到切换时间后又进入调度状态,这样会造成线程颠簸(Thrashing),导致效率更低。而多进程的每个进程有各自独立的GIL,这才保证程序真正的并行执行,因此多进程效率一般来说效率更高。
有一句话说的很好,进程是线程的容器。
在Python3中有两个模块可以创建多线程,分别是:
_thread
threading
threading
模块。threading
模块的一些使用方法(后文将会给出一些重要方法的使用示例):对象 | 作用 |
---|---|
Thread | 一个线程对象 |
Lock | 一个线程锁对象 |
Rlock | 可重入锁对象,使一个线程可以再次或多次获得已经持有的锁(递归锁),释放的次数必须和获取的次数相同才会真正释放该锁 |
Condition | 条件变量对象,使得一个线程等待另外一个线程满足特定的条件,比如改变状态或者某个数据值 |
Event | 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有的线程都将被激活 |
Semaphore | 为线程间的有限资源提供一个计数器,没有可用资源时会被阻塞 |
BoundedSemaphore | 与Semaphore相似,不过它不允许超过初始值 |
Timer | 与Thread类似,不过它要在运行前等待一定时间 |
Barrier | 一个障碍对象,必须达到指定数量的线程到达后才可以继续 |
Thread
类是threading
模块中非常重要和常用的功能,下面给出其属性和方法:
名称 | 作用 |
---|---|
name | 获得该线程的名字 |
ident | 获得该线程的标识符,就像身份证一样,每个线程都有 |
daemon | 布尔值,表明这个线程是否为守护线程 |
守护线程是一个很重要的概念,对于一个程序而言,至少拥有一个线程即主线程,如果你新建了一个线程,并且把它设置成了守护线程,那么在主线程执行结束的时候,不管新建的线程是否执行结束都会被强制结束,例如假如你打开了一个游戏,你操控着主线程,而背景音乐也是一个单独的被创建的线程,在后台背景音乐是个死循环不会执行结束,但是当你只要把游戏关了,就是说背景音乐这个线程会随着主线程而结束。如果不是守护线程的话,它会继续执行,整个程序也就不会结束。
#可以这样让一个线程变为守护线程
Thread1.daemon = True
名称 | 作用 |
---|---|
__init__ (group=None,target=None,name=None,args=(),kwargs=None,*,daemon=None) | Thread类的构造方法,target为执行任务对象,args为target的参数,为元组类型,kwargs为字典形式传入的参数,daemon即为守护线程设定 |
start() | 开始执行已经创建的该线程 |
run() | 线程方法,在start中已包含,此方法一般在开发者定义新方法时重写 |
join(timeout=None) | 直至启动的线程终止之前一直挂起,除非给出了timeout(s),否则一直阻塞 |
创建一个线程一般用两种方法:
在创建线程时一般根据实际需要选择创建方法,第一种和第三种方法使用较多,当需要一个更加符合面向对象的接口时,更倾向于使用第三种方法,而在使用第三种方法时,就需要重写父类的run()
方法。
#导入所用模块 import threading import time #定义一个任务函数 def wait(num): print('我将休息{}秒'.format(num)) time.sleep(num) print('休息{}秒结束'.format(num)) #主程序 if __name__ == '__main__': now = time.time() t1 = threading.Thread(target=wait,args=(3,))#实例化一个新线程 t2 = threading.Thread(target=wait,args=(5,))#实例化一个新线程 t1.start() t2.start() wait(2) print('程序到现在执行{}秒'.format(time.time()-now)) #执行结果 我将休息3秒 我将休息5秒 我将休息2秒 休息2秒结束 程序到现在执行2.001131772994995秒 休息3秒结束 休息5秒结束
其中需要注意的是,在实例化Thread时,其中的args
为元组类型,因此在参数只有一个的时候不要忘了跟一个逗号表明类型。并且每一个线程在start()
方法之后才会开始运行。且target
参数接收的函数后是不能加小括号的,因为加了小括号就默认是要运行这个函数,是不正确的,只需要给出函数名即可。
import threading import time #创建我们的线程类 class NewThread(threading.Thread): def __init__(self,func,args): threading.Thread.__init__(self) self.func = func self.args = args def run(self): self.func(*self.args) def loop(nloop,nsec): print('开始循环',nloop,'在:',time.ctime()) time.sleep(nsec) print('结束循环',nloop,'于:',time.ctime()) def main(): loops = [3,1] print('程序开始于:',time.ctime()) threads = [] nloops = range(len(loops)) for i in nloops: t = NewThread(loop,(i,loops[i]),) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() print('所有的任务完成于:',time.ctime()) if __name__ == '__main__': main() #执行结果 程序开始于: Sat Mar 28 16:55:43 2020 开始循环 0 在: Sat Mar 28 16:55:43 2020 开始循环 1 在: Sat Mar 28 16:55:43 2020 结束循环 1 于: Sat Mar 28 16:55:44 2020 结束循环 0 于: Sat Mar 28 16:55:46 2020 所有的任务完成于: Sat Mar 28 16:55:46 2020
上面例子中定义了一个新的线程类NewThread
,其实这个继承并没有改什么东西,和原类是差不多的,只是接收的参数变成了两个而已,同时这个示例中加入了join()
方法,这个方法的作用是保证在该线程运行到此处之前主线程不会往后走,我把join()
方法去掉,大家看下输出:
程序开始于: Sat Mar 28 17:44:55 2020
开始循环 0 在: Sat Mar 28 17:44:55 2020
开始循环 1 在: Sat Mar 28 17:44:55 2020
所有的任务完成于: Sat Mar 28 17:44:55 2020
结束循环 1 于: Sat Mar 28 17:44:56 2020
结束循环 0 于: Sat Mar 28 17:44:58 2020
可以看到主线程中的最后一个打印在其它两个线程没有运行完的时候就运行了,而加了join()
就保证主线程在该线程运行完毕到此处时主线程才继续往后运行。
另外,重建了线程类中重写了RUN方法,RUN方法的内容就是START将要执行的内容!!例子中实际上并没有重写,只是把名称换了下。
Lock也是很重要的一个东西,GIL是一把大锁保证程序只能并发而不能并行,Lock则是一把小锁,因为并发近似等于并行,因此有时候当多个线程“同时”修改某一个共同享有的数据时,就可能出现错误,比如这一段程序:
counter = 0
def add():
global counter
for i in range(100000):
counter+=1
for i in range(10):
t = threading.Thread(target=add)
t.start()
print(counter)
print(100000*10)
简单看一下,我们启动了10个线程对counter进行加一操作,最终输出结果按理说应该是10*100000=100w,而实际输出是多少呢,如下:
988356
1000000
实际上,你每次运行的counter输出结果是不一样的,但几乎不可能等于100w,这是为什么呢,举个简单的例子,如你所在班级制定了一项任务,班级内30个人,然后要求完成任务之后在群内报数,比如第一个完成了就报1,第二个就报2,以此来看最终完成了多少人,但你可能会发现,当报数到了12的时候,突然有两个人甚至更多人几乎在同一时刻报了13,假如说大家没有修改,并且后边的人接着14往后报了,那么即使所有人都报数了,最后一个人报的也肯定不是30,这就是并发造成的BUG,因此,面对这种情况,可以在程序中加一个LOCK,只有拿到它的人才允许继续报数,该人报完书数之后释放LOCK,这就使得不会出现错误,但它已经和串行执行没有什么区别了,这就是以牺牲效率的方法保证数据安全,示例代码如下:
counter = 0 #创建一个互斥锁 lock = threading.Lock() def add(): global counter for i in range(100000): #得到锁 lock.acquire() counter+=1 #释放锁 lock.release() threads = [] for i in range(10): t = threading.Thread(target=add) t.start() threads.append(t) for i in threads: i.join() print(counter) print(100000*10) #输出结果 1000000 1000000
这样就保证了数据不会出错,但是执行效率降低了很多。
想要充分利用多核CPU资源,在Python中大部分情况都需要使用多进程,Python提供了multiprocessing
这个模块来实现多进程。multiprocessing
支持子进程、进程间的同步与通信,提供了Process、Quene(队列)、Pipe、Lock等组件。
multiprocessing和threading在形式上具有高度的一致性,如创建八个子进程:
import multiprocessing import time def add(num): print('我是线程{}'.format(num)) if __name__ =='__main__': new = time.time() mus = [] for i in range(8): t = multiprocessing.Process(target=add,args=(i,)) t.start() mus.append(t) for i in mus: i.join() print(time.time()-new) #输出 我是线程0 我是线程1 我是线程2 我是线程3 我是线程5 我是线程4 我是线程6 我是线程7 0.21280956268310547
创建形式是和线程差不多的,并且其它方法和属性等也都是高度相似,相关方法和属性可以参考前文的Thread类,一些不同的点是,使用多进程的话,程序中必须要有:
if __name__ == '__main__':
如果多进程的程序没有这个入口的话程序会报错无法执行。并且多进程的各进程间的数据不共享,也就是说你用全局变量是没有用的。
不过Python提供了一些方法供进程间进行通讯。
队列是线程间交换数据非常常用的方法之一,实例化一个队列的时候,一般会用到它的一个参数,如:
q = multiprocessing.Queue(maxsize = 10)
设置maxsize指定队列中最多有几个数据,不设置的话默认队列无大小限制,队列的常用方法如下:
方法 | 作用 |
---|---|
put(data) | 将一个数据插入到队列当中,同时此方法有两个默认参数block 和timeout ,block为布尔值,当为True的时候(默认值),一旦队列满了,其会阻塞等待,如果设置为False,则若队列满,其会立即抛出异常,而timeout则是设置队列满了之后最大等待时间。时间到了如果还满就抛异常。默认为无限等待。 |
get(data) | 从队列中取出一个数据,参数和put一样,作用类比一下哈。 |
empty() | 布尔值,队列为空返回True |
full() | 布尔值,队列满返回True |
qsize() | 返回队列中当前项目的数量 |
close() | 关闭队列,后台进程继续写入已经入队了的数据,但不会再有别的数据入队。 |
join_thread() | 连接队列的后台进程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用 |
cancel_join_thread() | 不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞 |
和Queue不同,队列是生产者-消费者类型,对应多对多的情景,而Pipe(管道)常用来两个进程间进行通信,两个进程分别位于管道两端。常用方法是
send()
和recv()
即也是发送数据和接收数据,
实例化一个管道一般这样:
p1,p2 = multiprocessing.Pipe()
即用元组的形式创建一个管道,P1和P2分别为两个进程,代表管道的两端。而管道的发送和接收方法是没有block和timeout参数的,即默认为阻塞式的,不会报错,但满了会一直等待。
使用close()
方法关闭管道。
当有多个进程需要执行时,使用进程池能更方便的管理各个进程,使用如下方法建立一个进程池:
import multiprocessing as mp
ppool = mp.Pool(processes = 8)
其中的processes指的是创建n各进程的进程池供用户调用,如示例中创建了有8个进程的进程池,刚好匹配我电脑CPU的数量,当进程池内任务不满时,有新进程会被立即执行,如果进程池满了,新的任务会等待至池内有空位才被执行。像进程池内提交进程有两种方法:
apply_async(func, args=(), kwds={})
apply( func, args=(), kwds={})
多线程是共享内存的,即各个线程都可以对一个Global全局变量进行访问,而多进程不是这样,每个进程单独分配内存,各个内存里的数据只有该进程才可以访问,不过我们可以使用方法让各进程共享内存。
不过既然是多进程,我们还是尽量避免使用内存共享,单独存放最后汇总也可呀,避免出现很多问题。
ctypes
模块,如下:import multiprocessing def met1(value): value.value = 3.333 def met2(array): array[0] = 5 if __name__ == '__main__': value = multiprocessing.Value('d',0) array = multiprocessing.Array('i',range(10)) p1 = multiprocessing.Process(target=met1,args=(value,)) p2 = multiprocessing.Process(target=met2,args=(array,)) p1.start() p2.start() p1.join() p2.join() print(value.value) print(list(array)) #输出 3.333 [5, 1, 2, 3, 4, 5, 6, 7, 8, 9]
可以看到共享的数据被修改了。
#多导入这个包
import ctypes
#换两处
value = multiprocessing.Value(ctypes.c_double,0)
array = multiprocessing.Array(ctypes.c_int,range(10))
这样的话输出和上面使用'd'
和'i'
是一样的,因为这都对应着C语言的数据类型,大家可以查一下ctypes模块中的各个属性分别对应哪种数据类型,同时在Python中可以用哪个字符来代替。
在使用共享内存的时候,Value和Array只提供了简单的数据结构,服务进程管理器Manager
则可以支持list/dict/Lock/Rlock/Condition/Event/Queue/Value/Array等类型。服务进程管理器比共享内存对象更灵活,不过速度上更慢了。
如下一个示例:
import multiprocessing def met1(mlist): mlist.append('Yes') def met2(mdict): mdict['c'] = 'cool' if __name__ == '__main__': m = multiprocessing.Manager() mlist = m.list(['goldsun','cool']) mdict = m.dict({'g':'golusun'}) p1 = multiprocessing.Process(target=met1,args=(mlist,)) p2 = multiprocessing.Process(target=met2,args=(mdict,)) p1.start() p2.start() p1.join() p2.join() print(mlist) print(mdict) #输出 ['goldsun', 'cool', 'Yes'] {'g': 'golusun', 'c': 'cool'}
可以看到数据被各线程更改了,同时也可以使用其它类型哦,用到的时候再自己摸索吧。
在前边多线程的时候讲到了互斥锁Lock,也就是每把锁只有一个钥匙,保证同一时刻只能允许一个线程执行,其实互斥锁在多进程也有,其实在多线程中还有一个概念叫信号量
,其实这个概念在多进程也有,其实基本每个概念在多线程和多进程都有哈哈哈,这里把信号量放在多进程写是因为多进程毕竟是并行的,我觉得适合。
信号量是什么意思呢,简单来说如前问互斥锁,你要进一个房间的话只要1把钥匙只能一个人进,信号量其实可以立即为钥匙的数量,假如5个信号量,就等于这个房间有5把钥匙,可同时最多进去5个人。
举个例子30个人去商场买东西,但是只要5个收银台,也就是同时最多5个人结账,如下示例:
import time import multiprocessing as mp def pay(num,s): s.acquire() #申请收银台 time.sleep(1) #付款时间 print('{}号付款完毕!'.format(num)) s.release() #释放收银台 if __name__ == '__main__': s = mp.Semaphore(5) people = [] for i in range(30): p = mp.Process(target=pay,args=(i,s)) p.start() people.append(p) for i in people: i.join() print('所有人付款完毕!') #输出 1号付款完毕! 0号付款完毕! 2号付款完毕! 3号付款完毕! 4号付款完毕! 5号付款完毕! 6号付款完毕! 7号付款完毕! 8号付款完毕! 9号付款完毕! 10号付款完毕! 11号付款完毕! 12号付款完毕! 13号付款完毕! 14号付款完毕! 15号付款完毕! 16号付款完毕! 17号付款完毕! 18号付款完毕! 20号付款完毕! 19号付款完毕! 21号付款完毕! 22号付款完毕! 23号付款完毕! 24号付款完毕! 25号付款完毕! 27号付款完毕! 26号付款完毕! 28号付款完毕! 29号付款完毕! 所有人付款完毕!
如果在电脑上运行脚本你会发现是5个5个一组的付款完毕,这也就是信号量的作用。同时最高可供最大信号量数量的线程运行,当然,如果你的信号量设置的很高,高过了你的CPU数量,那么多进程还是最多运行8个子进程,但是多线程的话就可以高并发了,只不过效率会很低。
事件又是指什么呢,比方说我们大家去上课,有可能你去的比较早,上课铃声还没响呢,但你已经到了教室,这时候没有开始上课,你想干什么都可以,对应的就是程序中的阻塞,也有可能有的人去的比较晚,铃声已经响过了,那么他就是迟到了,在这个程序中,所说的事件也就是指响铃是否发生过,线程的下一步就是开始上课,也就是事件发生了,就直接开始上课,响铃没发生你就可以做别的事情,等待响铃发生上课。总的来说,Event是控制子进程执行还是阻塞的一个机制。示例如下:
import time import multiprocessing as mp def study(id, E): """进程函数""" print('{}号到了教室'.format(id)) if E.is_set(): # 响过铃声了 print('{}号迟到了'.format(id)) else: # 没响过 print('{}号跟别人聊天中...'.format(id)) E.wait() # 等上课铃声 print('{}号开始上课了...'.format(id)) time.sleep(10) # 上课3s后下课 print('{}号下课'.format(id)) if __name__ == '__main__': E = mp.Event() # 创建事件 E.clear() #清空事件,也就是让事件未发生过 people = [] #人员列表 for i in range(2): p = mp.Process(target=study,args=(i,E)) p.start() people.append(p) time.sleep(2)#2秒后响铃 E.set() #响铃 time.sleep(2)#响铃后2秒有个学生才到 p = mp.Process(target=study,args=(3,E)) p.start() people.append(p) for i in people: i.join() print('下课!') #输出 0号到了教室 0号跟别人聊天中... 1号到了教室 1号跟别人聊天中... 0号开始上课了... 1号开始上课了... 3号到了教室 3号迟到了 3号开始上课了... 1号下课 0号下课 3号下课 下课!
我们可以把条件(Condition)理解为一把高级的锁,它的功能比Lock、RLock更加高级,允许我们能够控制复杂的进程同步问题。而实际上Event是一个简化版本的Condition,一般只用来让主进程控制子进程。而Condition可以更复杂的控制进程。
一个普通的新条件如下:
C = multiprocessning.Condition()
常用方法:
名称 | 作用 |
---|---|
wait(timeout = None) | 将此进程挂起,收到notify通知后继续运行 |
notify(n=1) | 唤醒一个线程,默认唤醒第一个挂起的线程,可以指定唤醒某一线程 |
notifyAll() | 唤醒所有已经挂起的线程 |
acquire(block=True,timeout=None) | 和锁类似,获得锁 |
release() | 释放锁 |
至于使用方法就可以参考Event和Lock的使用方法了,实际上还是差不多的。
这些多线程/进程的知识只是个入门,深入学习的话需要提升自己的程序设计思想,一般来讲大项目的时候用多线程/进程比较合适,需要多看些代码提升自己思想方面的知识。
最后,既然都看到这儿了,亲给这篇文章点个赞吧,谢谢!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。