当前位置:   article > 正文

【Python】python中多进程处理_python 多进程处理

python 多进程处理


活动地址:CSDN21天学习挑战赛

**

学习日记 Day7

**

一、多进程的概念

1. 进程

程序,比如xxx.py是一个程序,是静态文件,占用的资源是文件本身的大小。
进程,一个程序运行起来后,代码+用到的资源称为进程,它是操作系统分配资源的基本单元。不仅可以通过线程完成多任务,进程也是可以的。

进程的状态
工作中,任务数往往是大于CPU的核数,即一定有一些任务正在执行,而另外一些任务在等待CPU进行执行,因此导致有了不同的状态。
进程的各种状态

  • 就绪态,运行的条件都已经就绪,正在等待CPU执行
  • 执行态,CPU正在执行
  • 等待态:等待某些条件满足,例如一个程序sleep,那么就处于等待态

二、multiprocessing模块的多进程处理

1. multiprocessing模块的Process类

multiprocessing模块通过创建Process对象,然后调用start()方法来生成进程,Process与threading.Thread的API相同。
multiprocessing.Process类的语法格式:

multiprocessing.Process(
	group=None, 
	target=None, 
	name=None, 
	args=(),
	kwargs={}, 
	*, 
	daemon=None)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 参数 group 指定进程组,大多数情况用不到该参数
  • 参数 target 如果传递了函数的引用,可以认为子进程执行该函数里的代码
  • 参数 name 给进程设定一个名字,可以不设定
  • 参数 aegs 给target指定的函数传递的参数,以元组的方式传递
  • 参数 kwargs 给target指定的函数传递命名参数

multiprocessing.Process类有如下方法和属性

方法名/属性说明
run()进程具体执行的方法
start()启动子进程实例,即创建子进程
join([timeout])timeout=None,阻塞至调用join()方法的进程终止;如果timeout是一个正数,则最多会阻塞timeout秒
is_alive()判断进程/子进程是否还在活着
terminate()不管任务是否完成,立即终止子进程
kill()与terminate()相同,在UNIX上使用SIGKILL信号
close()关闭Process对象,释放与之关联的所有资源
name当前进程的别名,默认为Process-N,N为从1开始递增的整数
pid当前进程的pid,进程号
exitcode子进程的退出代码
daemon进程的守护标志,是一个布尔值
authkey进程的身份验证密钥
sentinel系统对象的数字句柄,当进程结束时将变为ready

2. 子进程使用示例

创建子进程时,需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动。

# 子进程运行实例
from multiprocessing import Process
import time


def run_proc():
    '''
    子进程要执行的代码
    '''
    while True:
        print("---- 2 ---")
        time.sleep(1)


if __name__ == "__main__":
    p = Process(target=run_proc)
    p.start()
    while True:
        print("--- 1 ---")
        time.sleep(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

显示结果:
子进程运行显示结果

3. 获取进程的pid

使用os模块的getpid()获取当前进程的进程号

import os
os.getpid()
  • 1
  • 2

使用示例:

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time


def run_proc():
    print("子进程运行,pid = %d" % os.getpid())
    print("子进程将要结束")
    time.sleep(1)


if __name__ == "__main__":
    print("父进程执行中,父进程pid = %d" % os.getpid())
    p = Process(target=run_proc)
    p.start()
    time.sleep(1)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

显示结果:
获取进程pid

4. 给子进程指定的函数传递参数

在创建进程时,设置参数args和kwargs可以实现创建子进程时给特定的函数传递参数。
使用示例:

# 创建子进程时给指定的函数传递参数
from multiprocessing import Process
import os
import time


def run_proc(name,age,**kwargs):
    for i in range(10):
        print("子进程运行中--%d--,name=%s, age=%d, pid=%d" % (i,name,age,os.getpid()))
        print(kwargs)
        time.sleep(0.3)


if __name__ == "__main__":
    p = Process(target=run_proc,args=("tom",12),kwargs={"jerry":10})
    p.start()
    time.sleep(1)
    p.terminate()  # 1s后,终止子进程
    p.join()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

显示结果:
给子进程指定的函数传递参数

5. 进程间不同享全局变量

from multiprocessing import Process
import os
import time

# 全局变量
nums = [11,22]


# 子进程1
def work1():
    print("in process1 pid = %d, nums = %s" % (os.getpid(),nums))
    for i in range(3):
        nums.append(i)
        time.sleep(1)
        print("in process1 pid = %d, nums = %s" % (os.getpid(),nums))


# 子进程2
def work2():
    print("in process2 pid = %d, nums = %s" % (os.getpid(),nums))


if __name__ == "__main__":
    p1 = Process(target=work1)
    p1.start()
    p1.join()

    p2 = Process(target=work2)
    p2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

显示结果,可以看出来全局变量nums在work1中改变,但在work2显示的仍然是初始的值。
在这里插入图片描述

三、进程间同步

进程之间有时需要通信,操作系统提供了很多机制来实现进程间的通信,如Queue。

  1. Queue类语法说明
    Queue类常用的方法如下:
方法名说明
q = Queue()构造函数,初始化Queue()对象 ,如果括号中没有指定最大可接受消息数量,或数量为负,表示可接受的消息数量没有上限直至内存耗尽
Queue.qsize()返回当前队列包含的消息数量
Queue.empty()如果队列为空,返回True,反之False
Queue.full()如果队列满了,返回True,反之False
Queue.get([block[,timeout]])获取队列中的一条消息,然后将其从列队中移除,block默认为True。有两种情况:1. 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),知道从消息队列读取到消息为止;如果设置了timeout,会等待timeout秒,如果还没有读取到任何消息,抛出Queue.Empty异常。2. 如果block为False,消息队列如果为空,会立刻抛出Queue.Empty异常。
Queue.get_nowait()相当与Queue.get(False)
Queue.put(item[,block[,timeout]])将item消息写入到队列中,block默认值为True。有两种情况:1. 如果block为默认值,且没有设置timeout(单位秒),消息队列如果已没有空间写入,此时程序将被阻塞(停在写入状态),直到消息队列腾出空间为止;如果设置了timeout,会等待timeout秒,如果还没有空间,抛出Queue.Full异常。2. 如果block为False,消息队列如果没有空间可写入,会立刻抛出Queue.Full异常。
Queue.put_nowait(item)相当于Queue.put(item,False)
  1. Queue类使用示例1
from multiprocessing import Queue
import time

# 初始化Queue对象,最多可以接收3条信息
q = Queue(3)
try:
    # 将消息写入队列中
    q.put("消息1")
    q.put("消息2")
    # 检查消息队列是否已满
    print("%s\t消息队列满了吗?:%s" % (time.ctime(),q.full()))
    # 将消息3写入队列
    q.put("消息3")
    # 检查消息队列是否已满
    print("%s\t消息队列满了吗?:%s" % (time.ctime(),q.full()))

    # 队列中插入消息4
    q.put("消息4",block=True,timeout=2)

except:
    print("%s\t消息队列已满,现有消息数量:%d" % (time.ctime(),q.qsize()))

try:
    q.put_nowait("消息4")
except:
    print("%s\t消息队列已满,现有消息数量:%d" % (time.ctime(),q.qsize()))

# 如果消息队列不空,插入消息
if not q.full():
    q.put("消息4")

# 读取消息时,先判断消息是不是空
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

显示结果:
在这里插入图片描述

3. Queue使用示例2

# 在父进程中创建两个子进程,一个写数据,一个读数据
from multiprocessing import Queue,Process
import time,os,random


# 写数据
def write(q:Queue):
    for val in ['A','B','C']:
        q.put(val)
        time.sleep(random.random())



# 读数据
def read(q:Queue):
    while True:
        if not q.empty():
            val = q.get(True)
            print("Get %s from queue, current queue has %d msgs" %(val,q.qsize()))
            time.sleep(random.random())
        else:
            break


if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pw.join()

    pr.start()
    pr.join()
    print("所有数据都写入并读完")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

显示结果:
在这里插入图片描述

四、进程间同步锁-Lock

锁是为了确保数据一致性。比如读写锁,每个进程给一个变量增加1,但如果在一个进程读取后还没有写入前,另一个进程读取了该值并写入该值,那么前一个进程写入的数据就是错误的,这时需要加锁使该值保持一致性。
可以通过multiprocession.Lock类控制一段代码在同一时间只能被一个进程执行。Lock类的两个方法:
acquire()用来获取锁;release()用来释放锁。
如果锁的状态是unlocked,当一个进程调用acquire(),会立即修改为locked()并返回,该进程获得锁;如果锁的状态是locked,当一个进程调用acquire(),该进程会阻塞。
Lock的使用:

  • lock = multiprocessing.Lock() # 创建一个锁
  • lock.acquire() # 获取锁,加锁
  • lock.release() # 释放锁,解锁
  • with lock # 自动获取、释放锁,类似与 with open() as f:

当没有加锁时

import multiprocessing
import time,os


def add(num,value):
    print("current pid = {0},add{1}:num = {2}".format(os.getpid(),value,num))
    for i in range(3):
        num += value
        print("current pid = {0},add{1}:num = {2}".format(os.getpid(), value, num))
        time.sleep(1)


if __name__ == "__main__":
    num=0
    p1 = multiprocessing.Process(target=add, args=(num, 1))
    p2 = multiprocessing.Process(target=add, args=(num, 2))
    p1.start()
    p2.start()

    print("proc end")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

显示结果,进程执行时没有顺序,交替运行
在这里插入图片描述
当加锁时,其中一个进程执行完成后,另一个进程才会去执行,谁先抢到锁谁先执行

import multiprocessing
import time,os


def add(num, value, lock):
    try:
        lock.acquire()
        print("current pid = {0},add{1}:num = {2}".format(os.getpid(),value,num))
        for i in range(3):
            num += value
            print("current pid = {0},add{1}:num = {2}".format(os.getpid(), value, num))
            time.sleep(1)
    except Exception as err:
        raise err
    finally:
        lock.release()


if __name__ == "__main__":
    lock = multiprocessing.Lock()
    num=0

    p1 = multiprocessing.Process(target=add, args=(num, 1, lock))
    p2 = multiprocessing.Process(target=add, args=(num, 2, lock))
    p1.start()
    p2.start()

    print("proc end")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

如下是两次执行的结果:
在这里插入图片描述
在这里插入图片描述

五、进程池Pool类

当需要创建的子进程数量不多时,可以使用multiprocessing的Process类动态生成多个进程,但是如果需要的子进程特比多,成百上千个时,手动创建就不太现实,此时可以使用multiprocessing模块的Pool类。

1.Pool类语法说明

语法格式:

multiprocessing.pool.Pool([processes[,initializer[,initargs[,maxtasksperchild[,context]]]]])
  • 1

参数说明:

processes:工作进程数目,如果processes为None,则使用os.cpu_count()返回的值;
initializer:如果initializer不为None,则每个进程会在启动时调用initializer(*initargs);
maxtasksperchild:一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源;
context:用于指定启动的工作进程的上下文。

有两种方式向进程池提交任务:

  1. apply(func[,args[,kwds]]):阻塞方式。
  2. apply_async(func[args[,kwds]]):非阻塞方式。使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表。

multiprocessing.Pool常用的函数:

方法名说明
close()关闭Pool,使其不再接受新的任务
terminate()不管任务是否完成,立即终止
join()主进程阻塞,等待子进程的退出,必须在close()或terminate()之后使用

2. Pool使用示例

初始化Pool时,可以指定最大进程数,
当有新的请求提交到Pool中,如果池还没有满,那么就会创建一个新的进程来执行该请求;
如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新任务。

线程池使用示例:

from multiprocessing import Pool
import os,time,random


def worker(msg):
    t_start = time.time()
    print("{0} start run, pid = {1}".format(msg,os.getpid()))
    time.sleep(random.random()*2)  # random() 随机生成0~1之间的浮点数
    t_stop = time.time()
    print("%d stop, use time:%.2f" % (msg,t_stop-t_start))


if __name__ == "__main__":
    po = Pool(3)  # 定义进程池,参数3表示最大进程数为3
    for i in range(10):
        # allpy_sync(要调用的目标,传递给目标的参数元组)
        po.apply_async(worker, args=(i,))  # 每次循环会用空闲的子进程去调用目标

    print("----start-----")
    po.close()  # 关闭线程池后,po不再接受新的请求
    po.join()  # 等待线程池po中的所有子进程执行完毕,
    print("----stop-------")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

运行结果:
在这里插入图片描述

3. 进程池中的Queue通信

如果要使用Pool创建进程,就需要使用multiprocessing.Manager中的Queue()。
使用示例:

from multiprocessing import Pool,Manager
import os,time,random


def reader(q):
    '''
    读取消息队列q中的消息
    :param q: 消息队列
    :return: 无
    '''
    print(f"reader start(pid={os.getpid()}), father pid = {os.getppid()}.")
    for i in range(q.qsize()):
        print(f"reade msg from Queue is : {q.get(True)}")


def  writer(q):
    '''
    向消息队列中写入消息
    :param q: 消息队列
    :return: 无
    '''
    print(f"writer start(pid={os.getpid()}, father pid={os.getppid()}.")
    for i in 'itcast':
        print("write %c"%i)
        q.put(i)


if __name__ == "__main__":
    print(f"Process pid={os.getpid()} start.")
    q=Manager().Queue()
    po = Pool()

    po.apply_async(writer,args=(q,))  # 向进程池提交任务
    time.sleep(random.random())

    po.apply_async(reader,args=(q,))

    po.close()  # 不再接受新的请求
    po.join()  # 阻塞至进程池中的任务完成

    print(f"Process pi={os.getpid()} end!")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

显示结果:

在这里插入图片描述

六、进程与线程的对比

  • 从功能上看
  • 进程能够完成多任务,比如一台电脑上能够同时运行多个QQ。
  • 线程能够完成多任务,比如一个QQ中有多个聊天窗口。
  • 从定义上看
    • 进程是系统进行资源分配和调度的一个独立单位。
    • 线程是进程的一个实体,是CPU调度和分派的基本单位,是比进程更小的能独立运行的基本单位。线程自己基本不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是一个线程可与同属于一个进程的其它线程共享进程拥有的全部资源。

进程和线程的区别:

  • 一个程序至少有一个进程,一个进程至少有一个线程;
    • 线程的划分尺度小于进程,多线程程序的并发性高。
    • 进程在执行过程中拥有独立的内存单元,多个线程共享内存,可以极大的提高程序的运行效率。
  • 线程不能独立执行,必须依存在进程中。

线程执行开销小,但不利于资源的管理和保护;
进程执行开销大,但利于资源的管理和保护。

全文参考:https://blog.csdn.net/yuan2019035055/article/details/124780526

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/130807
推荐阅读
相关标签
  

闽ICP备14008679号