当前位置:   article > 正文

python多进程与多线程_第十五章 Python多进程与多线程

from threading import thread, current_thread 就开始输出了

15.1 multiprocessing

multiprocessing是多进程模块,多进程提供了任务并发性,能充分利用多核处理器。避免了GIL(全局解释锁)对资源的影响。

有以下常用类:类描述

Process(group=None, target=None, name=None, args=(), kwargs={})派生一个进程对象,然后调用start()方法启动

Pool(processes=None, initializer=None, initargs=())返回一个进程池对象,processes进程池进程数量

Pipe(duplex=True)返回两个连接对象由管道连接

Queue(maxsize=0)返回队列对象,操作方法跟Queue.Queue一样

multiprocessing.dummy这个库是用于实现多线程

Process()类有以下些方法:run()

start()启动进程对象

join([timeout])等待子进程终止,才返回结果。可选超时。

name进程名字

is_alive()返回进程是否存活

daemon进程的守护标记,一个布尔值

pid返回进程ID

exitcode子进程退出状态码

terminate()终止进程。在unix上使用SIGTERM信号,在windows上使用TerminateProcess()。

Pool()类有以下些方法:apply(func, args=(), kwds={})等效内建函数apply()

apply_async(func, args=(), kwds={}, callback=None)异步,等效内建函数apply()

map(func, iterable, chunksize=None)等效内建函数map()

map_async(func, iterable, chunksize=None, callback=None)异步,等效内建函数map()

imap(func, iterable, chunksize=1)等效内建函数itertools.imap()

imap_unordered(func, iterable, chunksize=1)像imap()方法,但结果顺序是任意的

close()关闭进程池

terminate()终止工作进程,垃圾收集连接池对象

join()等待工作进程退出。必须先调用close()或terminate()

Pool.apply_async()和Pool.map_aysnc()又提供了以下几个方法:get([timeout])获取结果对象里的结果。如果超时没有,则抛出TimeoutError异常

wait([timeout])等待可用的结果或超时

ready()返回调用是否已经完成

successful()

举例:

1)简单的例子,用子进程处理函数from multiprocessing import Process

import os

def worker(name):

print name

print 'parent process id:', os.getppid()

print 'process id:', os.getpid()

if __name__ == '__main__':

p = Process(target=worker, args=('function worker.',))

p.start()

p.join()

print p.name

python test.py

function worker.

parent process id: 9079

process id: 9080

Process-1

Process实例传入worker函数作为派生进程执行的任务,用start()方法启动这个实例。

2)加以说明join()方法from multiprocessing import Process

import os

def worker(n):

print 'hello world', n

if __name__ == '__main__':

print 'parent process id:', os.getppid()

for n in range(5):

p = Process(target=worker, args=(n,))

p.start()

p.join()

print 'child process id:', p.pid

print 'child process name:', p.name

# python test.py

parent process id: 9041

hello world 0

child process id: 9132

child process name: Process-1

hello world 1

child process id: 9133

child process name: Process-2

hello world 2

child process id: 9134

child process name: Process-3

hello world 3

child process id: 9135

child process name: Process-4

hello world 4

child process id: 9136

child process name: Process-5

# 把p.join()注释掉再执行

# python test.py

parent process id: 9041

child process id: 9125

child process name: Process-1

child process id: 9126

child process name: Process-2

child process id: 9127

child process name: Process-3

child process id: 9128

child process name: Process-4

hello world 0

hello world 1

hello world 3

hello world 2

child process id: 9129

child process name: Process-5

hello world 4

可以看出,在使用join()方法时,输出的结果都是顺序排列的。相反是乱序的。因此join()方法是堵塞父进程,要等待当前子进程执行完后才会继续执行下一个子进程。否则会一直生成子进程去执行任务。

在要求输出的情况下使用join()可保证每个结果是完整的。

3)给子进程命名,方便管理from multiprocessing import Process

import os, time

def worker1(n):

print 'hello world', n

def worker2():

print 'worker2...'

if __name__ == '__main__':

print 'parent process id:', os.getppid()

for n in range(3):

p1 = Process(name='worker1', target=worker1, args=(n,))

p1.start()

p1.join()

print 'child process id:', p1.pid

print 'child process name:', p1.name

p2 = Process(name='worker2', target=worker2)

p2.start()

p2.join()

print 'child process id:', p2.pid

print 'child process name:', p2.name

# python test.py

parent process id: 9041

hello world 0

child process id: 9248

child process name: worker1

hello world 1

child process id: 9249

child process name: worker1

hello world 2

child process id: 9250

child process name: worker1

worker2...

child process id: 9251

child process name: worker2

4)设置守护进程,父进程退出也不影响子进程运行from multiprocessing import Process

def worker1(n):

print 'hello world', n

def worker2():

print 'worker2...'

if __name__ == '__main__':

for n in range(3):

p1 = Process(name='worker1', target=worker1, args=(n,))

p1.daemon = True

p1.start()

p1.join()

p2 = Process(target=worker2)

p2.daemon = False

p2.start()

p2.join()

5)使用进程池#!/usr/bin/python

# -*- coding: utf-8 -*-

from multiprocessing import Pool, current_process

import os, time, sys

def worker(n):

print 'hello world', n

print 'process name:', current_process().name  # 获取当前进程名字

time.sleep(1)    # 休眠用于执行时有时间查看当前执行的进程

if __name__ == '__main__':

p = Pool(processes=3)

for i in range(8):

r = p.apply_async(worker, args=(i,))

r.get(timeout=5)  # 获取结果中的数据

p.close()

# python test.py

hello world 0

process name: PoolWorker-1

hello world 1

process name: PoolWorker-2

hello world 2

process name: PoolWorker-3

hello world 3

process name: PoolWorker-1

hello world 4

process name: PoolWorker-2

hello world 5

process name: PoolWorker-3

hello world 6

process name: PoolWorker-1

hello world 7

process name: PoolWorker-2

进程池生成了3个子进程,通过循环执行8次worker函数,进程池会从子进程1开始去处理任务,当到达最大进程时,会继续从子进程1开始。

在运行此程序同时,再打开一个终端窗口会看到生成的子进程:# ps -ef |grep python

root      40244   9041  4 16:43 pts/3   00:00:00 python test.py

root      40245  40244  0 16:43 pts/3    00:00:00 python test.py

root      40246  40244  0 16:43 pts/3    00:00:00 python test.py

root      40247  40244  0 16:43 pts/3    00:00:00 python test.py

6)进程池map()方法

map()方法是将序列中的元素通过函数处理返回新列表。from multiprocessing import Pool

def worker(url):

return 'http://%s' % url

urls = ['www.baidu.com', 'www.jd.com']

p = Pool(processes=2)

r = p.map(worker, urls)

p.close()

print r

# python test.py

['http://www.baidu.com', 'http://www.jd.com']

7)Queue进程间通信

multiprocessing支持两种类型进程间通信:Queue和Pipe。

Queue库已经封装到multiprocessing库中,在第十章 Python常用标准库已经讲解到Queue库使用,有需要请查看以前博文。

例如:一个子进程向队列写数据,一个子进程读取队列数据#!/usr/bin/python

# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue

# 写数据到队列

def write(q):

for n in range(5):

q.put(n)

print 'Put %s to queue.' % n

# 从队列读数据

def read(q):

while True:

if not q.empty():

value = q.get()

print 'Get %s from queue.' % value

else:

break

if __name__ == '__main__':

q = Queue()

pw = Process(target=write, args=(q,))

pr = Process(target=read, args=(q,))

pw.start()

pw.join()

pr.start()

pr.join()

# python test.py

Put 0 to queue.

Put 1 to queue.

Put 2 to queue.

Put 3 to queue.

Put 4 to queue.

Get 0 from queue.

Get 1 from queue.

Get 2 from queue.

Get 3 from queue.

Get 4 from queue.

8)Pipe进程间通信from multiprocessing import Process, Pipe

def f(conn):

conn.send([42, None, 'hello'])

conn.close()

if __name__ == '__main__':

parent_conn, child_conn = Pipe()

p = Process(target=f, args=(child_conn,))

p.start()

print parent_conn.recv()

p.join()

# python test.py

[42, None, 'hello']

Pipe()创建两个连接对象,每个链接对象都有send()和recv()方法,

9)进程间对象共享

Manager类返回一个管理对象,它控制服务端进程。提供一些共享方式:Value()、Array()、list()、dict()、Event()等

创建Manger对象存放资源,其他进程通过访问Manager获取。from multiprocessing import Process, Manager

def f(v, a, l, d):

v.value = 100

a[0] = 123

l.append('Hello')

d['a'] = 1

mgr = Manager()

v = mgr.Value('v', 0)

a = mgr.Array('d', range(5))

l = mgr.list()

d = mgr.dict()

p = Process(target=f, args=(v, a, l, d))

p.start()

p.join()

print(v)

print(a)

print(l)

print(d)

# python test.py

Value('v', 100)

array('d', [123.0, 1.0, 2.0, 3.0, 4.0])

['Hello']

{'a': 1}

10)写一个多进程的例子

比如:多进程监控URL是否正常from multiprocessing import Pool, current_process

import urllib2

urls = [

'http://www.baidu.com',

'http://www.jd.com',

'http://www.sina.com',

'http://www.163.com',

]

def status_code(url):

print 'process name:', current_process().name

try:

req = urllib2.urlopen(url, timeout=5)

return req.getcode()

except urllib2.URLError:

return

p = Pool(processes=4)

for url in urls:

r = p.apply_async(status_code, args=(url,))

if r.get(timeout=5) == 200:

print "%s OK" %url

else:

print "%s NO" %url

# python test.py

process name: PoolWorker-1

http://www.baidu.com OK

process name: PoolWorker-2

http://www.jd.com OK

process name: PoolWorker-3

http://www.sina.com OK

process name: PoolWorker-4

http://www.163.com OK

博客地址:http://lizhenliang.blog.51cto.com

QQ群:323779636(Shell/Python运维开发群)

15.2 threading

threading模块类似于multiprocessing多进程模块,使用方法也基本一样。threading库是对thread库进行二次封装,我们主要用到Thread类,用Thread类派生线程对象。

1)使用Thread类实现多线程from threading import Thread, current_thread

def worker(n):

print 'thread name:', current_thread().name

print 'hello world', n

for n in range(5):

t = Thread(target=worker, args=(n, ))

t.start()

t.join()  # 等待主进程结束

# python test.py

thread name: Thread-1

hello world 0

thread name: Thread-2

hello world 1

thread name: Thread-3

hello world 2

thread name: Thread-4

hello world 3

thread name: Thread-5

hello world 4

2)还有一种方式继承Thread类实现多线程,子类可以重写__init__和run()方法实现功能逻辑。#!/usr/bin/python

# -*- coding: utf-8 -*-

from threading import Thread, current_thread

class Test(Thread):

# 重写父类构造函数,那么父类构造函数将不会执行

def __init__(self, n):

Thread.__init__(self)

self.n = n

def run(self):

print 'thread name:', current_thread().name

print 'hello world', self.n

if __name__ == '__main__':

for n in range(5):

t = Test(n)

t.start()

t.join()

# python test.py

thread name: Thread-1

hello world 0

thread name: Thread-2

hello world 1

thread name: Thread-3

hello world 2

thread name: Thread-4

hello world 3

thread name: Thread-5

hello world 4

3)Lockfrom threading import Thread, Lock, current_thread

lock = Lock()

class Test(Thread):

# 重写父类构造函数,那么父类构造函数将不会执行

def __init__(self, n):

Thread.__init__(self)

self.n = n

def run(self):

lock.acquire()  # 获取锁

print 'thread name:', current_thread().name

print 'hello world', self.n

lock.release()  # 释放锁

if __name__ == '__main__':

for n in range(5):

t = Test(n)

t.start()

t.join()

众所周知,Python多线程有GIL全局锁,意思是把每个线程执行代码时都上了锁,执行完成后会自动释放GIL锁,意味着同一时间只有一个线程在运行代码。由于所有线程共享父进程内存、变量、资源,很容易多个线程对其操作,导致内容混乱。

当你在写多线程程序的时候如果输出结果是混乱的,这时你应该考虑到在不使用锁的情况下,多个线程运行时可能会修改原有的变量,导致输出不一样。

由此看来Python多线程是不能利用多核CPU提高处理性能,但在IO密集情况下,还是能提高一定的并发性能。也不必担心,多核CPU情况可以使用多进程实现多核任务。Python多进程是复制父进程资源,互不影响,有各自独立的GIL锁,保证数据不会混乱。能用多进程就用吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/74911
推荐阅读
相关标签
  

闽ICP备14008679号