多线程是加速程序计算的有效方式,Python的多线程模块threading上手快速简单,学习莫烦多线程教程动手操作了一遍,这里记录一下。
1 Threading
1.1 添加线程
- import threading
- #获取已激活的线程数
- print(threading.active_count()) #1
-
- #查看所有线程信息
- print(threading.enumerate()) #[<_MainThread(MainThread, started 18496)>]
-
- #查看现在正在运行的线程
- print(threading.current_thread()) #<_MainThread(MainThread, started 18496)>
-
-
- import threading
-
- def thread_job():
- print('This is a thread of %s' % threading.current_thread())
-
- def runMain():
- thread = threading.Thread(target=thread_job,) #定义线程
- thread.start() #线程开始工作
-
- if __name__ == '__main__':
- runMain()
-
- #输出
- This is a thread of <Thread(Thread-1, started 12324)>
1.2 join功能
不加join功能,线程任务还未完成便输出all done。
- import threading
- import time
-
- def thread_job():
- print('T1 start.\n')
- for i in range(10):
- time.sleep(0.1) #任务间隔0.1秒
- print('T1 finish.\n')
-
- add_thread = threading.Thread(target=thread_job, name='T1')
- add_thread.start()
-
- print('all done.\n')
- #输出
- T1 start.
- all done.
- T1 finish.
若要遵循顺序,在启动线程后调用join , 使用join控制多个线程的执行顺序,效果如下。
- import threading
- import time
-
- def thread_job():
- print('T1 start.\n')
- for i in range(10):
- time.sleep(0.1) #任务间隔0.1秒
- print('T1 finish.\n')
-
- add_thread = threading.Thread(target=thread_job, name='T1')
- add_thread.start()
- add_thread.join()
-
- print('all done.\n')
- #输出
- T1 start.
- T1 finish.
- all done.
1.3 存储进程结果Queue
将数据列表中的数据传入,使用四个线程处理,将结果保存在Queue中,线程执行完后,从Queue中获取存储的结果
- #导入线程 队列的标准模块
- import threading
- import time
- from queue import Queue
定义一个被多线程调用的函数:函数的参数时一个列表l和一个队列q,函数的功能是对列表的每个元素进行平方计算,将结果保存在队列中
- def job(l,q):
- for i in range(len(l)):
- l[i] = l[i]**2
- q.put(l)
定义一个多线程函数:在多线程函数中定义一个Queue ,用来保存返回值 ,代替return ,定义一个多线程列表 ,初始化一个多维数据列表
- def mulithreading():
- q = Queue() #q中存放返回值 代替return的返回值
- threads = []
- data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
在多线程函数中定义四个线程,启动线程,将每个线程添加到多线程的列表中
- for i in range(4): #定义四个线程
- t = threading.Thread(target=job,args=(data[i],q))
- t.start()
- threads.append(t) #把每个线程append到线程列表中
分别join四个线程到主线程
- for thread in threads:
- thread.join()
定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results
- results = []
- for _ in range(4):
- results.append(q.get()) #q.get()按顺序从q中拿出一个值
- print(results)
完整代码:
- #导入线程 队列的标准模块
- import threading
- import time
- from queue import Queue
-
- #定义一个被多线程调用的函数
- def job(l,q):
- for i in range(len(l)):
- l[i] = l[i]**2
- q.put(l)
- #定义一个多线程函数
- def mulithreading():
- q = Queue() #q中存放返回值 代替return的返回值
- threads = []
- data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
- for i in range(4): #定义四个线程
- t = threading.Thread(target=job,args=(data[i],q))
- t.start()
- threads.append(t) #把每个线程append到线程列表中
- #分别join四个线程到主线程
- for thread in threads:
- thread.join()
- #定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results
- results = []
- for _ in range(4):
- results.append(q.get()) #q.get()按顺序从q中拿出一个值
- print(results)
- if __name__ == '__main__':
- mulithreading()
-
- #输出
- [[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]
1.4 GIL 不一定有效率
python 的多线程 threading 有时候并不是特别理想. 最主要的原因是就是, Python 的设计上, 有一个必要的环节, 就是 Global Interpreter Lock (GIL). 这个东西让 Python 还是一次性只能处理一个东西.
- import threading
- from queue import Queue
- import copy
- import time
-
- def job(l, q):
- res = sum(l)
- q.put(res)
-
- def multithreading(l):
- q = Queue()
- threads = []
- for i in range(4):
- t = threading.Thread(target=job, args=(copy.copy(l), q), name='T%i' % i)
- t.start()
- threads.append(t)
- [t.join() for t in threads]
- total = 0
- for _ in range(4):
- total += q.get()
- print(total)
-
- def normal(l):
- total = sum(l)
- print(total)
-
- if __name__ == '__main__':
- l = list(range(10000000))
- s_t = time.time()
- normal(l*4)
- print('normal: ',time.time()-s_t)
- s_t = time.time()
- multithreading(l)
- print('multithreading: ', time.time()-s_t)
- #输出
- 199999980000000
- normal: 1.7343778610229492
- 199999980000000
- multithreading: 2.218825340270996
程序 threading 和 Normal 运行了一样多次的运算. 但是我们发现 threading 却没有快多少, 按理来说, 我们预期会要快3-4倍, 因为有建立4个线程, 但是并没有. 这就是其中的 GIL 在作怪.
1.5 线程锁
不使用锁
- import threading
-
- def job1(): #全局变量A的值每次加1,循环10次
- global A
- for i in range(10):
- A += 1
- print('job1',A)
-
- def job2(): #全局变量A的值每次加10,循环10次
- global A
- for i in range(10):
- A += 10
- print('job2',A)
-
- if __name__ == '__main__':
- A = 0
- t1 = threading.Thread(target=job1)
- t2 = threading.Thread(target=job2)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
-
- #输出 打印结果非常混乱
- job1 1
- job1 2
- job1 3
- job1 4
- job2 14
- job1 15
- job2 25
- job1 26
- job2 36
- job1 37
- job2 47
- job1 48
- job2 58
- job1 59
- job2 69
- job1 70
- job2 80
- job2 90
- job2 100
- job2 110
使用锁
lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。
- import threading
-
- def job1():
- global A;lock = threading.Lock()
- lock.acquire()
- for i in range(10):
- A += 1
- print('job1',A)
- lock.release()
-
- def job2():
- global A;lock = threading.Lock()
- lock.acquire()
- for i in range(10):
- A += 10
- print('job2',A)
- lock.release()
-
- if __name__ == '__main__':
- A = 0
- t1 = threading.Thread(target=job1)
- t2 = threading.Thread(target=job2)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
-
- #输出 使用lock后 执行完一个线程后再执行另一个线程。使用lock和不使用lock,最后打印输出的结果是不同的。
- job1 1
- job1 2
- job1 3
- job1 4
- job1 5
- job1 6
- job1 7
- job1 8
- job1 9
- job1 10
- job2 20
- job2 30
- job2 40
- job2 50
- job2 60
- job2 70
- job2 80
- job2 90
- job2 100
- job2 110
2 Multiprocessing
多进程 Multiprocessing 和多线程 threading 类似, 都是在 python 中用来并行运算的。不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 因为要用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL, python 把 multiprocessing 和 threading 的使用方法做的几乎差不多,使用多线程发挥电脑多核系统的威力。
2.1添加Process
- #导入线程进程标准模块
- import multiprocessing as mp
- import threading as td
-
- #定义一个被线程和进程调用的函数
- def job(a,d):
- print('AA')
-
- #创建线程和进程
- t1=td.Thread(target=job,args=(1,2))
- p1=mp.Process(target=job,args=(1,2))
-
- #启动线程和进程
- t1.start()
- p1.start()
-
- #连接线程和进程
- t1.join()
- p1.join()
-
- #可以看出线程和进程的使用方式相似
完整代码
- #导入线程进程标准模块
- import multiprocessing as mp
- import threading as td
-
- #定义一个被进程调用的函数
- def job(a,d):
- print('AA')
-
- if __name__ == '__main__':
- p1 = mp.Process(target=job, args=(1, 2)) #创建进程
- p1.start() #启动进程
- p1.join() #连接进程
- #输出
- AA
2.2 存储进程输出 Queue
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。因为多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果。
定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果
- #定义一个多线程调用函数
- def job(q): #注:该函数没有返回值
- res = 0
- for i in range(1000):
- res += i+i**2+i**3
- q.put(res) #queue
定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错
- p1 = mp.Process(target=job,args=(q,))
- p2 = mp.Process(target=job,args=(q,))
完整代码实现
- import multiprocessing as mp
-
- #定义一个多线程调用函数
- def job(q): #注:该函数没有返回值
- res = 0
- for i in range(1000):
- res += i+i**2+i**3
- q.put(res) #queue
- if __name__ == '__main__':
- q=mp.Queue() #定义一个多线程队列 存储结果
- p1 = mp.Process(target=job, args=(q,))
- p2 = mp.Process(target=job, args=(q,))
- p1.start() #启动线程 分两批处理
- p2.start()
- p1.join() #连接线程
- p2.join()
- res1=q.get() #分两批输出 将结果分别保存
- res2=q.get()
- print(res1+res2)
- #输出
- 499667166000
2.3效率对比
对比下多进程,多线程和什么都不做时的消耗时间,看看哪种方式更有效率。
- import multiprocessing as mp
- def job(q):
- res=0
- for i in range(1000000):
- res += i + i**2 + i**3
- q.put(res)
-
- #由于多进程是多核运算 多进程代码命名为multicore()
- def multicore():
- q = mp.Queue()
- p1 = mp.Process(target=job, args=(q,))
- p2 = mp.Process(target=job, args=(q,))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- res1 = q.get()
- res2 = q.get()
- print('multicore:',res1 + res2)
-
- #创建多线程
- import threading as td
- def multithread():
- q = mp.Queue() # thread可放入process同样的queue中
- t1 = td.Thread(target=job, args=(q,))
- t2 = td.Thread(target=job, args=(q,))
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- res1 = q.get()
- res2 = q.get()
- print('multithread:', res1 + res2)
-
- #创建普通函数
- def normal():
- res = 0
- for _ in range(2):
- for i in range(1000000):
- res += i + i**2 + i**3
- print('normal:', res)
-
- import time
- if __name__ == '__main__':
- st = time.time()
- normal()
- st1 = time.time()
- print('normal time:', st1 - st)
- multithread()
- st2 = time.time()
- print('multithread time:', st2 - st1)
- multicore()
- print('multicore time:', time.time() - st2)
-
- #输出
- normal: 499999666667166666000000
- normal time: 1.6875250339508057
- multithread: 499999666667166666000000
- multithread time: 3.1562907695770264
- multicore: 499999666667166666000000
- multicore time: 1.0937612056732178
这次运行时间依然是:多进程 < 普通 < 多线程。 发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间居然比什么都不做的程序还要慢一点,说明多线程还是有短板。
2.4 进程池Pool
进程池就是将所要运行的东西,放到池子里,Python会自行解决多进程的问题
2.4.1 进程池Pool()和map()
- #定义一个Pool
- pool = mp.Pool()
有了池子之后,就可以让池子对应某一个函数,向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。
接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果 res = pool.map(job, range(10))
- import multiprocessing as mp
-
- def job(x):
- return x*x
-
- def multicore():
- pool = mp.Pool()
- res = pool.map(job,range(10))
- print(res)
-
- if __name__ == '__main__':
- multicore()
-
- #输出
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
2.4.2 自定义核数量
怎么知道Pool是否真的调用了多个核呢?可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况
打开CPU负载:活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,
- def multicore():
- pool = mp.Pool(processes=3) # 定义CPU核数量为3
- res = pool.map(job, range(10))
- print(res)
2.4.3 apply_async()
Pool除了map()外,还有可以返回结果的方式,就是apply_async()。apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值
- def multicore():
- pool = mp.Pool()
- res = pool.map(job, range(10))
- print(res)
- res = pool.apply_async(job, (2,))
- # 用get获得结果
- print(res.get())
-
- #运行结果
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
- 4
2.4.4 apply_async()输出多个结果
在apply_async()中多传入几个值res = pool.apply_async(job, (2,3,4,)) #报错 TypeError: job() takes exactly 1 argument (3 given) 即apply_async()只能输入一组参数。
将apply_async() 放入迭代器中,定义一个新的multi_resmulti_res = [pool.apply_async(job, (i,)) for i in range(10)]
取出值时需要一个一个取出来print([res.get() for res in multi_res])
合并代码
- def multicore():
- pool = mp.Pool()
- res = pool.map(job, range(10))
- print(res)
- res = pool.apply_async(job, (2,))
- # 用get获得结果
- print(res.get())
- # 迭代器,i=0时apply一次,i=1时apply一次等等
- multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
- # 从迭代器中取出
- print([res.get() for res in multi_res])
- #运行结果
-
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
- 4
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res
- Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
- map() 放入迭代参数,返回多个结果
- apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代
2.5 共享内存 shared memory
2.5.1 Shared Value
使用Value数据存储在一个共享的内存表中。
- import multiprocessing as mp
-
- value1 = mp.Value('i', 0)
- value2 = mp.Value('d', 3.14)
其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型
2.5.2 Shared Array
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据array = mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。
2.6 进程锁
2.6.1 不加锁
- import multiprocessing as mp
-
- def job(x):
- return x*x
-
- def multicore():
- pool = mp.Pool()
- res = pool.map(job, range(10))
- print(res)
- res = pool.apply_async(job, (2,))
- # 用get获得结果
- print(res.get())
- # 迭代器,i=0时apply一次,i=1时apply一次等等
- multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
- # 从迭代器中取出
- print([res.get() for res in multi_res])
-
- if __name__ == '__main__':
- multicore()
- #输出
- 1
- 4
- 5
- 8
- 9
- 12
- 13
- 16
- 17
- 20
上面代码中定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。所以来看下这两个进程是否会出现冲突。
2.6.2 加锁
- import multiprocessing as mp
- import time
-
- def job(v, num, l):
- l.acquire() # 锁住
- for _ in range(5):
- time.sleep(0.1)
- v.value += num # 获取共享内存
- print(v.value)
- l.release() # 释放
-
- def multicore():
- l = mp.Lock() # 定义一个进程锁
- v = mp.Value('i', 0) # 定义共享内存
- p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
- p2 = mp.Process(target=job, args=(v,3,l))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
-
- if __name__ == '__main__':
- multicore()
- #运行一下,看看是否还会出现抢占资源的情况
-
- 1
- 2
- 3
- 4
- 5
- 8
- 11
- 14
- 17
- 20
运行结果显示,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行