赞
踩
目录
对于新手来说,首先要理解线程的概念以及为什么需要采用多线程进行编程。什么是线程呢?网上一般都是这样定义的:线程(thread)是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。是不是听的一脸懵,我觉得这样的定义纯粹是自说自话,新手看完了一脸懵逼。那么我们可以用白话进行解释一下:
python实现多进程的方法主要有两种:os模块下的fork方法、multiprocessing模块。前者只适用于Unix/Linux系统,后者则是跨平台的实现方式。
- import os
-
- # 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
- pid = os.fork()
-
- if pid == 0:
- print('哈哈1')
- else:
- print('哈哈2')
注意:fork()函数只能在Unix/Linux/Mac上面运行,不可以在Windows上面运行。
说明:
在Unix/Linux操作系统中,提供了一个fork()系统函数,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的ID。
这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。我们可以通过os.getpid()获取当前进程ID,通过os.getppid()获取父进程ID。
那么,父子进程之间的执行有顺序吗?答案是没有!这完全取决于操作系统的调度算法。
而多次fork()就会产生一个树的结构:
multiprocessing提供了一个Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数;用start ()方法启动进程,用join ()方法实现进程间的同步:
- import os
- from multiprocessing import Process
-
- def run_proc(name):
- print("Child process (%s) (%s) running..." % (name, os.getpid()))
-
-
- if __name__ == '__main__':
- print("Current process (%s) start..." % (os.getpid()))
- for i in range(5):
- p = Process(target=run_proc, args=str(i))
- print("Process will start.")
- p.start()
- p.join()
- print("Process end.")
输出结果为:
- Current process (26811) start...
- Process will start.
- Process will start.
- Process will start.
- Child process (0) (26872) running...
- Process will start.
- Child process (1) (26874) running...
- Process will start.
- Child process (2) (26876) running...
- Child process (3) (26882) running...
- Child process (4) (26885) running...
- Process end.
使用Process方法的不足在于需要启动大量的子进程,只能适用于被操作对象数目不大的情况,而使用Pool便能解决这一问题。简单说来,Pool可以指定进程的数量,默认为CPU的核数,同一时候最多有指定数量的进程执行:
- import os, time, random
- from multiprocessing import Pool
-
- def run_task(name):
- print("Task %s (pid = %s) is running..." % (name, os.getpid()))
- time.sleep(random.random() * 3)
- print("Task %s end." % name)
-
- if __name__ == '__main__':
- print("Current process (%s) start..." % (os.getpid()))
- p = Pool(processes=3)
- for i in range(5):
- p.apply_async(run_task, args=(i, ))
- print("Waiting for all subprocess done...")
- p.close()
- p.join()
- print("All subprocess done.")
输出结果为:
- Current process (4202) start...
- Waiting for all subprocess done...
- Task 0 (pid = 4255) is running...
- Task 1 (pid = 4256) is running...
- Task 2 (pid = 4257) is running...
- Task 0 end.
- Task 3 (pid = 4255) is running...
- Task 2 end.
- Task 4 (pid = 4257) is running...
- Task 4 end.
- Task 1 end.
- Task 3 end.
- All subprocess done.
需要注意的是Pool对象调用 join ()方法会等待所有子进程执行完毕,调用join () 之前必须调用close ();调用close ()之后就不能继续添加新的Process。
Python提供了多种进程间通信的方式,本文主要讲Queue和Pipe这两种。
主要用于多个进程间的通信,操作如下:
为了更好地说明利用Queue进行通信,举例如下:在父进程中创建三个子进程,其中两个子进程往Queue中写入数据,另一个子进程从Queue中读取数据。
- import os, time
- from multiprocessing import Process, Queue
-
- """Write to process."""
- def proc_write(q, urls):
- print("Process (%d) is writing..." % os.getpid())
- for url in urls:
- q.put(url)
- print('Put %s to queue...' % url)
- time.sleep(0.1)
-
- """Read form process."""
- def proc_read(q):
- print("Process (%d) is reading..." % (os.getpid()))
- while True:
- url = q.get(True)
- print("Get %s from queue." % url)
-
- if __name__ == '__main__':
- # 创建父进程
- q = Queue()
- writer1 = Process(target=proc_write, args=(q, ['张飞', '黄忠', "孙尚香"]))
- writer2 = Process(target=proc_write, args=(q, ['马超', '关羽', "赵云"]))
- reader = Process(target=proc_read, args=(q, ))
- # 启动
- writer1.start()
- writer2.start()
- reader.start()
- writer1.join()
- writer2.join()
- # 读操作是死循环,必须强行终止
- reader.terminate()
输出结果为:
- Process (19307) is writing...
- Put 张飞 to queue...
- Process (19308) is writing...
- Put 马超 to queue...
- Process (19309) is reading...
- Get 张飞 from queue.
- Get 马超 from queue.
- Put 黄忠 to queue...
- Get 黄忠 from queue.
- Put 关羽 to queue...
- Get 关羽 from queue.
- Put 孙尚香 to queue...
- Get 孙尚香 from queue.
- Put 赵云 to queue...
- Get 赵云 from queue
Pipe()返回两个连接对象代表Pipe的两端,每个连接对象都有send()方法和recv()方法。
但是如果两个进程或线程对象同时读取或写入管道两端的数据时,管道中的数据有可能会损坏。当进程使用的是管道两端不同的数据则不会有数据损坏的风险。
- import os, time
- from multiprocessing import Process, Pipe
-
- def proc_send(p, urls):
- print("Process (%d) is sending..." % os.getpid())
- for url in urls:
- p.send(url)
- print('Send %s...' % url)
- time.sleep(0.1)
-
- def proc_recv(p):
- print("Process (%d) is receiving..." % (os.getpid()))
- while True:
- print("Receive %s" % p.recv())
- time.sleep(0.1)
-
- if __name__ == '__main__':
- # 创建父进程
- p = Pipe()
- p1 = Process(target=proc_send, args=(p[0], ['张飞' + str(i) for i in range(3)]))
- p2 = Process(target=proc_recv, args=(p[1], ))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
输出结果为:
- Process (39203) is sending...
- Send 张飞0...
- Process (39204) is receiving...
- Receive 张飞0
- Send 张飞1...
- Receive 张飞1
- Send 张飞2...
- Receive 张飞2
多线程类似于执行多个不同的程序,具有以下优点:
Python的标准库提供了两个模块:thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。threading.Thread的更多方法:
方法1:传入函数并创建Thread实例,再start()运行:
- import time, threading
-
- def thread_run(urls):
- print("Current %s is running..." % threading.current_thread().name)
- for url in urls:
- print("%s --->>> %s" % (threading.current_thread().name, url))
- time.sleep(0.1)
- print("%s ended." % threading.current_thread().name)
-
- if __name__ == '__main__':
- print("%s is running..." % threading.current_thread().name)
- t1 = threading.Thread(target=thread_run, name='t1', args=(['唐僧', '孙悟空', '猪八戒'],))
- t2 = threading.Thread(target=thread_run, name='t2', args=(['张飞', '关于', '刘备'],))
- t1.start()
- t2.start()
- t1.join()
- t1.join()
- print("%s ended." % threading.current_thread().name)
输出结果为:
- MainThread is running...
- Current t1 is running...Current t2 is running...
-
- t2 --->>> 张飞
- t1 --->>> 唐僧
- t2 --->>> 关于t1 --->>> 孙悟空
-
- t2 --->>> 刘备t1 --->>> 猪八戒
-
- t1 ended.t2 ended.
-
- MainThread ended.
方法2:从threading.Thread 继承并创建线程类,然后重写__init__ 方法和run方法:
- import time, threading
-
- class testThread(threading.Thread):
-
- def __init__(self, name, urls):
- threading.Thread.__init__(self, name=name)
- self.urls = urls
-
- def run(self):
- print("Current %s is running..." % threading.current_thread().name)
- for url in self.urls:
- print("%s --->>> %s" % (threading.current_thread().name, url))
- time.sleep(0.1)
- print("%s ended." % threading.current_thread().name)
-
- if __name__ == '__main__':
- print("%s is running..." % threading.current_thread().name)
- t1 = testThread(name='t1', urls=['唐僧', '孙悟空', '猪八戒'])
- t2 = testThread(name='t2', urls=['张飞', '关于', '刘备'])
- t1.start()
- t2.start()
- t1.join()
- t1.join()
- print("%s ended." % threading.current_thread().name)
输出结果为:
- MainThread is running...
- Current t1 is running...Current t2 is running...
- t1 --->>> 唐僧
-
- t2 --->>> 张飞
- t1 --->>> 孙悟空t2 --->>> 关于
-
- t1 --->>> 猪八戒t2 --->>> 刘备
-
- t1 ended.t2 ended.
-
- MainThread ended.
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。具体说明:
- import threading
-
- test_lock = threading.RLock()
- num = 0
-
- class testThread(threading.Thread):
- def __init__(self, name):
- threading.Thread.__init__(self, name=name)
-
- def run(self):
- global num
- while True:
- test_lock.acquire()
- print("%s locked, Number: %d" % (threading.current_thread().name, num))
- if num >= 4:
- test_lock.release()
- print("%s released, Number: %d" % (threading.current_thread().name, num))
- break
- num += 1
- print("%s released, Number: %d" % (threading.current_thread().name, num))
- test_lock.release()
-
- if __name__ == '__main__':
- t1 = testThread('孙悟空先上')
- t2 = testThread("终于到八戒了")
- t1.start()
- t2.start()
输出结果为:
- 孙悟空先上 locked, Number: 0
- 孙悟空先上 released, Number: 1
- 孙悟空先上 locked, Number: 1
- 孙悟空先上 released, Number: 2
- 孙悟空先上 locked, Number: 2
- 孙悟空先上 released, Number: 3
- 孙悟空先上 locked, Number: 3
- 孙悟空先上 released, Number: 4
- 孙悟空先上 locked, Number: 4
- 孙悟空先上 released, Number: 4终于到八戒了 locked, Number: 4
-
- 终于到八戒了 released, Number: 4
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。如下所示的例子就是死锁:
- from threading import Thread,Lock
- import time
-
- mutexA = Lock()
- mutexB = Lock()
-
- class testThread(Thread):
- def run(self):
- self.func1()
- self.func2()
- def func1(self):
- mutexA.acquire()
- print('%s 拿到A锁' %self.name)
-
- mutexB.acquire()
- print('%s 拿到B锁' %self.name)
- mutexB.release()
- mutexA.release()
-
- def func2(self):
- mutexB.acquire()
- print('%s 拿到B锁' %self.name)
- time.sleep(2)
-
- mutexA.acquire()
- print('%s 拿到A锁' %self.name)
- mutexA.release()
-
- mutexB.release()
-
- if __name__ == '__main__':
- for i in range(5):
- t = testThread()
- t.start()
输出结果为:
- Thread-1 拿到A锁
- Thread-1 拿到B锁
- Thread-1 拿到B锁Thread-2 拿到A锁
分析如上代码是如何产生死锁的:
启动5个线程,执行run方法,假如thread1首先抢到了A锁,此时thread1没有释放A锁,紧接着执行代码mutexB.acquire(),抢到了B锁,在抢B锁时候,没有其他线程与thread1争抢,因为A锁没有释放,其他线程只能等待,然后A锁就执行完func1代码,然后继续执行func2代码,与之同时,在func2中,执行代码 mutexB.acquire(),抢到了B锁,然后进入睡眠状态,在thread1执行完func1函数,释放AB锁时候,其他剩余的线程也开始抢A锁,执行func1代码,如果thread2抢到了A锁,接下来thread2要抢B锁,ok,在这个时间段,thread1已经执行func2抢到了B锁,然后在sleep(2),持有B锁没有释放,为什么没有释放,因为没有其他的线程与之争抢,他只能睡着,然后thread1握着B锁,thread2要抢B锁,ok,这样就形成了死锁。
上面我们分析了死锁,那么python里面是如何解决死锁这样的问题呢? 在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
- from threading import Thread,RLock
- import time
-
- mutexA = mutexB = RLock()
-
- class testThread(Thread):
- def run(self):
- self.f1()
- self.f2()
-
- def f1(self):
- mutexA.acquire()
- print('%s 拿到A锁' %self.name)
- mutexB.acquire()
- print('%s 拿到B锁' %self.name)
- mutexB.release()
- mutexA.release()
-
- def f2(self):
- mutexB.acquire()
- print('%s 拿到B锁' % self.name)
- time.sleep(0.1)
- mutexA.acquire()
- print('%s 拿到A锁' % self.name)
- mutexA.release()
- mutexB.release()
-
- if __name__ == '__main__':
- for i in range(5):
- t=testThread()
- t.start()
输出结果为:
- Thread-1 拿到A锁
- Thread-1 拿到B锁
- Thread-1 拿到B锁
- Thread-1 拿到A锁
- Thread-2 拿到A锁
- Thread-2 拿到B锁
- Thread-2 拿到B锁
- Thread-2 拿到A锁
- Thread-4 拿到A锁
- Thread-4 拿到B锁
- Thread-4 拿到B锁
- Thread-4 拿到A锁
- Thread-3 拿到A锁
- Thread-3 拿到B锁
- Thread-3 拿到B锁
- Thread-3 拿到A锁
- Thread-5 拿到A锁
- Thread-5 拿到B锁
- Thread-5 拿到B锁
- Thread-5 拿到A锁
解释下递归锁的代码:
由于锁A,B是同一个递归锁,thread1拿到A,B锁,counter记录了acquire的次数2次,然后在func1执行完毕,就释放递归锁,在thread1释放完递归锁,执行完func1代码,接下来会有2种可能,1、thread1在次抢到递归锁,执行func2代码 2、其他的线程抢到递归锁,去执行func1的任务代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。