多道技术: (1)空间的复用:如内存中同时有多道程序 (2)时间上的复用:复用一个CPU的时间片,强调:遇到io切,占用CPU时间过长也切,核心在于切之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行
1.multiprocessing模块 multiprocess模块用来开启子进程,并在子进程中执行我们定制的任务(如函数),支持子进程、通信和共享数据,执行不同形式的同步,提供了Process、Queue、Pipe、Lock 等组件;与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于改进程内。 2.Process类 Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号。 参数介绍: group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称 方法介绍: p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 p.is_alive():如果p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 属性介绍: p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 p.name:进程的名称 p.pid:进程的pid 复制代码
from multiprocessing import Process
import time
import random
import os


def task(name):
    print("%s is running..." % name)
    time.sleep(random.randrange(1, 5))
    print("%s is done..." % name)


# 第二种创建子进程的方法
class MyProcess(Process):
    def __init__(self, name):
        super().__init__()  # 将父类的功能进行重用
        self.name = name

    # start将调用这个run方法
    def run(self) -> None:
        print("%s %s is running..., parent id is %s" % (self.name, os.getpid(), os.getppid()))
        time.sleep(3)
        print("%s %s is done-----, parent id is %s" % (self.name, os.getpid(), os.getppid()))


if __name__ == "__main__":
    # 第一种方式
    p1 = Process(target=task, args=("A", ))
    p2 = Process(target=task, args=("B", ))
    p3 = Process(target=task, args=("C", ))
    p4 = Process(target=task, args=("D", ))
    # 只是给操作系统发送了一个信号,由操作系统将父进程地址空间中的数据拷贝给子进程,作为子进程运行的初始状态,开启后再运行task
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主1......', os.getpid(), os.getppid())
    # 执行结果:执行结果因为执行进程会有一段时间,所以先做打印操作
    # --------------------
    # 主
    # 任务名称 is running...
    # 任务名称 is done...

    p = MyProcess('子进程2')
    p.start()
    # os.getpid() --- 查看子进程的id
    # os.getppid() --- 查看父进程的id
    print('主2......', os.getpid(), os.getppid())
(1)主进程和子进程之间的执行彼此独立,主进程的任务执行完毕后需要等待子进程执行完毕,然后统一回收资源; (2)主进程执行到某一阶段时,需要子进程执行完毕后再继续执行主进程的任务,这时候就需要检测(**join方法的作用就在这**)子进程什么时候执行完毕再执行主进程的任务,否则进程任务就会阻塞。 复制代码
join方法实现子进程的检测、实现子进程的并发和子进程的串行任务,我们可以通过is_alive()来判断子进程的存活状态,Process(target=join_task('task进程'), name="自定义的子进程的名称")定义进程的名称,默认为Process-1,使用p_task.name获取进程的名称。同时进程之间的空间是相互隔离的,即数据互不影响。【GitHub示例】
from multiprocessing import Process
import time
import os


def join_task(name):
    print("%s id: %s is running..., parent id is <%s>" % (name, os.getpid(), os.getppid()))
    time.sleep(3)
    print("%s id: %s has done===, parent id is <%s>" % (name, os.getpid(), os.getppid()))


# join并发
def join_concurrent(name, n):
    print("---%s id: %s is running..., parent id is <%s>---" % (name, os.getpid(), os.getppid()))
    time.sleep(n)
    # print("---%s id: %s has done===, parent id is <%s>---" % (name, os.getpid(), os.getppid()))


# join串行
def join_serial(name, n):
    print("$$$%s id: %s is running..., parent id is <%s>$$$" % (name, os.getpid(), os.getppid()))
    time.sleep(n)
    print("$$$%s id: %s has done===, parent id is <%s>$$$" % (name, os.getpid(), os.getppid()))


n = 100  # 进程之间的空间是隔离的


def spatial_isolation():
    global n
    n = 0
    print("子进程内的n: ", n)


if __name__ == "__main__":
    # join检测子进程
    p_task = Process(target=join_task('task进程'), name="自定义的进程的名称")
    p_task.start()
    # 加入join方法后一定会等到子进程结束以后才会执行主进程
    print("子进程存活状态:", p_task.is_alive())
    p_task.join()
    print("子进程存活状态:", p_task.is_alive())
    print("主进程的id:%s, 及父进程的id:%s" % (os.getpid(), os.getppid()))
    print("验证了存在僵尸进程:", p_task.pid)
    print("子进程的名称:", p_task.name)

    # join并发
    start_time_concurrent = time.time()
    p_concurrent_1 = Process(target=join_concurrent, args=('join并发1', 3))
    p_concurrent_2 = Process(target=join_concurrent, args=('join并发2', 2))
    p_concurrent_3 = Process(target=join_concurrent, args=('join并发3', 3))
    p_concurrent_1.start()
    p_concurrent_2.start()
    p_concurrent_3.start()
    # 三个是以并发进行的,只是等待最长的程序执行完毕才算结束
    p_concurrent_1.join()
    p_concurrent_2.join()
    p_concurrent_3.join()
    print("join并发主进程总耗时:", (time.time() - start_time_concurrent))

    # join串行
    start_time_serial = time.time()
    p_serial_1 = Process(target=join_concurrent, args=('join串行1', 3))
    p_serial_2 = Process(target=join_concurrent, args=('join串行2', 2))
    p_serial_3 = Process(target=join_concurrent, args=('join串行3', 1))
    # 每个都是执行完以后再进行下一步
    p_serial_1.start()
    p_serial_1.join()
    p_serial_2.start()
    p_serial_2.join()
    p_serial_3.start()
    p_serial_3.join()
    print("join串行主进程总耗时:", (time.time() - start_time_serial))

    # 进程之间的空间是隔离的
    p_spatial_isolation = Process(target=spatial_isolation)
    p_spatial_isolation.start()
    print('主进程内n: ', n)
守护进程会在主进程代码结束后就结束了; 守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children。 复制代码
from multiprocessing import Process
import time
import os


def task1(name, n):
    print("%s id: %s is running..., parent id is <%s>" % (name, os.getpid(), os.getppid()))
    time.sleep(n)
    print("%s id: %s has done===, parent id is <%s>" % (name, os.getpid(), os.getppid()))
    # 字进程不允许创建子进程
    # p = Process(target=time.sleep, args=(2, 3))
    # p.start()


def task2(name, n):
    print("%s id: %s is running..., parent id is <%s>" % (name, os.getpid(), os.getppid()))
    time.sleep(n)
    print("%s id: %s has done===, parent id is <%s>" % (name, os.getpid(), os.getppid()))


if __name__ == "__main__":
    p_task1 = Process(target=task1, args=("task1", 3))
    p_task2 = Process(target=task2, args=("task2", 3))
    #一定要在p.start()前设置, 设置p为守护进程, 禁止p创建子进程, 并且父进程代码执行结束, p即终止运行
    p_task1.daemon = True
    # 未加守护进程的任务还会继续运行
    p_task1.start()
    p_task2.start()
    print("主程序%s......")
from multiprocessing import Process, Lock
import time
import json


# 未加互斥锁
def unlock(name):
    print("%s 1" % name)
    time.sleep(1)
    print("%s 2" % name)
    time.sleep(1)
    print("%s 3" % name)


# ------------------------------------------------------------
# 加互斥锁
def lock(name, mutex):
    mutex.acquire()
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)
    mutex.release()


# ------------------------------------------------------------
# 模拟抢票过程
def search_ticket(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print("<%s> 查看到剩余票数为【%s】" % (name, dic['count']))


def buy_ticket(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(3)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 购票成功' % name)
    else:
        print('<%s> 暂无余票' % name)


def ticket_task(name, mutex):
    # 查票每个人都可以执行,是并发执行操作
    search_ticket(name)
    mutex.acquire()  # 添加互斥锁,达到串行执行,使得只有一个人购票成功
    buy_ticket(name)
    mutex.release()


if __name__ == "__main__":
    # for i in range(3):
    #     p_unlock = Process(target=unlock, args=("未进程%s" % i,))
    #     p_unlock.start()
    # 执行结果:
    # ------------------------
    # 进程0 1
    # 进程1 1
    # 进程2 1
    # 进程0 2
    # 进程1 2
    # 进程2 2
    # 进程0 3
    # 进程1 3
    # 进程2 3

    # mutex = Lock()
    # for i in range(3):
    #     p_lock = Process(target=lock, args=("加锁进程%s" % i, mutex))
    #     p_lock.start()
    # 执行结果:
    # ------------------------
    # 加锁进程0 1
    # 加锁进程0 2
    # 加锁进程0 3
    # 加锁进程1 1
    # 加锁进程1 2
    # 加锁进程1 3
    # 加锁进程2 1
    # 加锁进程2 2
    # 加锁进程2 3

    # 模拟抢票
    mutex = Lock()
    for i in range(10):
        p_ticket = Process(target=ticket_task, args=("乘客%s" % i, mutex))
        p_ticket.start()
        # 利用互斥锁,可以通过添加互斥锁的位置,实现部分程序执行达到串行的效果,其他程序仍然可以并行执行,而添加了join只能执行完了之后才能执行下一个,若作为每项都添加一个join,则都要串行执行。从而大大降低了效率。
        p_ticket.join()
队列的意义:解决大家共享硬盘文件,效率低以及使用内存解决加锁这个繁琐的步骤。multiprocessing模块提供了IPC(internet process communicate)进程之间的通信,队列以及管道,这两种方式都是使用消息传递的,队列就是管道加锁的实现。【GitHub示例】
参数: maxsize---队列中最大的项数,可以放置任意类型的数据,省略则无大小限制。 --- 但是: 队列内存放的是消息而非大数据 队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小 方法: q.put --- 用以插入数据到队列中,数据不宜过大 q.get --- 可以从队列读取并删除一个元素 复制代码
from multiprocessing import Process, Queue

q = Queue(3)
q.put(1)
q.put(2)
print("当前队列是否已满:", q.full())
q.put(3)
# 判断队列是否满了
print("当前队列是否已满:", q.full())
try:
    q.put(4, block=True, timeout=3)
except:
    print("队列已满,当前队列的深度为:%s" % q.qsize())

print("--------华丽的分割线--------")

print(q.get())
print(q.get())
print("当前队列是否已空:%s" % q.empty())
print(q.get())
# 判断队列是否为空
print("当前队列是否已空:%s" % q.empty())
try:
    q.get(block=True, timeout=3)
except:
    print("队列已空,当前队列的深度为:%s" % q.qsize())

print("--------华丽的分割线--------")

if __name__ == "__main__":
    pass
from multiprocessing import Process, Queue
import time


def producer(q, name):
    for i in range(3):
        res = "包子%s" % i
        time.sleep(0.5)
        print("%s 生产了 %s" % (name, res))
        q.put(res)


def consumer(q, name):
    while True:
        res = q.get()
        if res is None:
            break
        time.sleep(1)
        print("%s 吃了%s" % (name, res))


if __name__ == "__main__":
    # 队列容器
    q = Queue()
    # 生产者们
    p1 = Process(target=producer, args=(q, '生产者1'))
    p2 = Process(target=producer, args=(q, '生产者2'))
    p3 = Process(target=producer, args=(q, '生产者3'))
    # 消费者们
    c1 = Process(target=consumer, args=(q, '消费者1'))
    c2 = Process(target=consumer, args=(q, '消费者2'))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    # 生产完了才开始吃
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print("主进程")
参数 maxsize --- 队列中允许的最大项数,省略则无大小限制; 方法 JoinableQueue的方法共用与Queue的方法 q.task_done() --- 使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常 q.join() --- 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止 复制代码
from multiprocessing import Process, Queue, JoinableQueue
import time


def producer_j(qj, name):
    for i in range(3):
        res = "
第一种:调用Thread类的方法; 第二种:继承Thread类 【GitHub示例】
#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @File : 72_Thread.py @Time : 2019/10/12 12:18 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """ from threading import Thread import time import random def thread_one(name): print("%s is running." % name) time.sleep(random.randrange(1, 3)) print("%s run end====" % name) class ThreadTwo(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print("%s is running..." % self.name) time.sleep(random.randrange(1, 3)) print("%s run end two====" % self.name) if __name__ == "__main__": # 创建线程第一种方式 to = Thread(target=thread_one, args=("第一种创建线程的方式...", )) to.start() print("主线程一") # 创建线程第二种方式 tt = ThreadTwo('第二种创建线程的方式...') tt.start() print("主线程二") 复制代码
【GitHub示例】 区别一:开启速度:
(1)t1.start() 的同时,线程就开启了(先打印t1 running 肉眼可观察的速度) (2)p1.start() 将开启进程的信号发给操作系统后,操作系统要申请内存空间,让好拷贝父进程地址空间到子进程,开销远大于线程。 复制代码
# 1. 开启速度 # 1.1 在主进程下开启线程 from threading import Thread from multiprocessing import Process import time def run(name): print("%s thread is running..." % name) time.sleep(2) print("%s thread run done===" % name) # 1.2 在主进程下开启子进程 def child_process(name): print("%s process is running..." % name) time.sleep(2) print("%s process run done===" % name) if __name__ == "__main__": t1 = Thread(target=run, args=('主进程下的线程', )) t1.start() print("\n主进程下有线程") p1 = Process(target=child_process, args=("主进程下的进程", )) p1.start() print("\n主进程下有子进程") 复制代码
(1)同一进程下的线程的pid跟主进程的pid一样 (2)开多个进程,每个进程都有不同的pid 复制代码
# 2.pid不同 # 2.1 在主进程下开启多个线程,每个线程都跟主进程的pid一样【pid(process id:进程的id号)】 import os def pid_thread_task(name): # 一个进程内多个线程是平级别的 print("%s 子进程:%s" % (name, os.getpid())) # 2.2 开多个进程,每个进程都有不同的pid def pid_process_task(name): print("%s 子进程的pid: %s, 父进程的pid: %s" %(name, os.getpid(), os.getppid())) if __name__ == "__main__": # 2.1 pid_thread_task_1 = Thread(target=pid_thread_task, args=(' 线程1', )) pid_thread_task_2 = Thread(target=pid_thread_task, args=(' 线程2', )) pid_thread_task_1.start() pid_thread_task_2.start() print("\n 2.1的主进程:", os.getpid()) # 2.2 pid_process_task_1 = Process(target=pid_process_task, args=(' 进程1', )) pid_process_task_2 = Process(target=pid_process_task, args=(' 进程1', )) pid_process_task_1.start() pid_process_task_2.start() print("\n 2.2的主进程:", os.getpid()) 复制代码
# 3.同一进程内的多个线程共享该进程的地址空间,父进程与子进程不共享地址空间,进程之间的地址空间是隔离的 n = 100 def task3(name, num1): global n n = 0 print("=====%s=====" % name) n = n + num1 print("-----%s-----" % n) if __name__ == "__main__": # 3 p3 = Process(target=task3, args=("进程31", 100)) t31 = Thread(target=task3, args=("线程31", 100)) t32 = Thread(target=task3, args=("线程32", 200)) # p3.start() # p3.join() t31.start() t32.start() print("\n\n【p3主进程】", n) 复制代码
汇总下两者的区别: (1)启动线程的速度要比启动进程的速度快很多,启动进程进程的开销更大 (2)在主进程下面开启的多个线程,每个线程都和主进程的pid(进程的id)一致;在主进程下开启多个子进程,每个进程都有不一样的pid (3)统一进程内的多个线程共享该进程的地址空间;父进程与子进程不共享地址空间,表明进程之间的地址空间是隔离的
【GitHub示例】 Thread实例对象的方法:
isAlive():返回线程是否活动的; getName():返回线程名; setName():设置线程名 复制代码
from threading import Thread import time def task(): # 获取当前线程的名字 print("%s is running" % currentThread().getName()) time.sleep(1) print("%s is done ======" % currentThread().getName()) if __name__ == "__main__": t = Thread(target=task, name='子线程1') t.start() t.setName("改名为新线程名称1") t.join() print("新的子线程的名称为:", t.getName()) print("线程是否存活:", t.isAlive()) 复制代码
threading.currentThread():返回当前的线程变量 threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。 复制代码
from threading import Thread, currentThread, active_count, enumerate import time def task(): # 获取当前线程的名字 print("%s is running" % currentThread().getName()) time.sleep(1) print("%s is done ======" % currentThread().getName()) if __name__ == "__main__": currentThread().setName("主线程新名字") print("主线程的名字:", currentThread().getName()) print("线程是否存活:", t.isAlive()) t.join() print("查看活跃的线程数:", active_count()) print("将当前活跃的线程显示出来:", enumerate()) 复制代码
无论是进程还是线程,都遵循:守护XXXX非等待主XXXX运行完毕后被销毁 对于程序运行完毕的一点补充:
(1)对主进程来说,运行完毕指的是主进程代码运行完毕;主进程在代码结束以后就算运行完毕了(守护进程在此时就别回收了),然后主进程会一直等非守护的子进程运行完毕后回收子进程的资源,否则非产生僵尸进程,才会结束。 (2)对主线程来说,运行完毕子的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕。主线程在其他非守护线程运行完毕才算运行完毕(守护线程在此时会被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程保证非守护线程都运行完毕后才结束。 复制代码
#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @File : 72_Thread_protect.py @Time : 2019/10/14 17:37 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """ from threading import Thread import time def run(name): time.sleep(2) print("%s is running..." % name) def thread1(): print(123) time.sleep(1) print("end123") def thread2(): print(456) time.sleep(0.5) print("end456") if __name__ == "__main__": #t = Thread(target=run, args=("线程1", )) # # t.setDaemon(True) 等价于 t.daemon = True # # t.setDaemon(True) # t.daemon = True # t.start() # print("主进程") # print(t.is_alive()) # 输出结果: # ---------------------因为主进程结束,守护主线程也就跟着结束了,所以不打印守护线程的语句 # 主进程 t1 = Thread(target=thread1) t2 = Thread(target=thread2) t1.daemon = True t1.start() t2.start() print("主进程2") 复制代码
#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @File : 72_Thread_lock.py @Time : 2019/10/14 19:28 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """ from threading import Thread, Lock import time n1 = 100 n2 = 100 def unlock_task(): # 未加锁之前 global n1 temp = n1 # 未加锁之前,100个线程都停留在这并且temp都等于100 time.sleep(0.1) n1 = temp - 1 def lock_task(): global n2 # 开始加锁 mutex.acquire() temp = n2 time.sleep(0.1) n2 = temp - 1 # 解锁 mutex.release() if __name__ == "__main__": # 未加锁之前 t_1 = [] for i in range(100): t = Thread(target=unlock_task) t_1.append(t) t.start() for t in t_1: t.join() print('未加锁的主进程', n1) # ----- 执行结果 ----- # 主进程 100 # 加锁 mutex = Lock() t_2 = [] for i2 in range(100): t2 = Thread(target=lock_task) t_2.append(t2) t2.start() for t2 in t_2: t2.join() print("加锁后的主进程", n2) 复制代码
# 1. 信号量(Semaphore) from threading import Thread, Semaphore, currentThread # 信号量大小,也就是同一时间3个任务去拿锁 sm = Semaphore(3) def semaphore_task(): sm.acquire() print("%s in" % currentThread().getName()) sm.release() if __name__ == "__main__": for i1 in range(10): t1 = Thread(target=semaphore_task) t1.start() 复制代码
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。 复制代码
# 有多个工作线程尝试链接MySQL,我们想要在连接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器, # 如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作 from threading import Event, Thread, currentThread import time event = Event() def conn_mysql(): n = 0 while not event.is_set(): # print("event.is_set(): ", event.is_set()) if n == 3: print("%s try many times" % currentThread().getName()) return print("%s try %s" % (currentThread().getName(), n)) event.wait(0.5) n += 1 print("%s is connected" % currentThread().getName()) def check(): print("%s is checking" % currentThread().getName()) time.sleep(1) event.set() if __name__ == "__main__": # 2.Event for i2 in range(3): t = Thread(target=conn_mysql) t.start() t = Thread(target=check) t.start() 复制代码
定时器: 定时器,指定n秒后执行某操作
from threading import Timer def after_run(): print("定时器指定时长后再执行程序") if __name__ == "__main__": # 3. Timer t3 = Timer(2, after_run) t3.start() 复制代码
# 利用定时器完成验证码的实现 pass 复制代码
#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @File : 72_Thread_queue.py @Time : 2019/10/14 21:27 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """ import queue # 1. 先进先出 first_in_first_out = queue.Queue(5) first_in_first_out.put('first') first_in_first_out.put(2) first_in_first_out.put('third') # block = True时阻塞,timeout=3,等待三秒后如果还没有从里面取出数据,则阻塞 first_in_first_out.put(4, block=True, timeout=3) print(first_in_first_out.get()) print(first_in_first_out.get()) print(first_in_first_out.get()) print(first_in_first_out.get(block=False)) print("\n") # 2. 后进先出 last_in_first_out = queue.LifoQueue(3) last_in_first_out.put("L1") last_in_first_out.put("L2") last_in_first_out.put("L3") print(last_in_first_out.get()) print(last_in_first_out.get()) print(last_in_first_out.get()) print("\n") # 3. 优先级队列(存储数据时可设置优先级的队列) priority = queue.PriorityQueue(3) priority.put((10, 'one')) priority.put((40, 'two')) priority.put((20, 'three')) print(priority.get()) print(priority.get()) print(priority.get()) 复制代码
concurrent.futures 模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 基本方法: submit(fn, *args, **kwargs) --- 异步提交任务 map(func, *iterables, timeout=None, chunksize=1) --- 取代for循环submit的操作 shutdown(wait=True) --- 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 result(timeout=None) --- 取得结果 add_done_callback(fn) --- 回调函数 复制代码
#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @File : 72_ThreadPool_ProcessPool.py @Time : 2019/10/15 10:20 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import time import os import random def task(name): print("name: %s pid: %s run" % (name, os.getpid())) time.sleep(random.randrange(1, 3)) if __name__ == "__main__": # 设置最大同时运行的进程数 # p = ProcessPoolExecutor(4) p = ThreadPoolExecutor(4) for i in range(10): # 异步提交任务,提交后不用管进程是否执行 p.submit(task, '任务 %s' % i) # 将进程池的入口关闭,等待任务提交结束后才执行后面的任务, 默认wait=True p.shutdown(wait=True) print("主进程") 复制代码
异步调用:提交完任务后,不等待任务执行完毕。 【GitHub示例】
#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @File : 72_Thread_Asynchronous_call_recall.py @Time : 2019/10/15 11:12 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """ from concurrent.futures import ThreadPoolExecutor import time import random def swim(name): print("%s is swimming" % name) time.sleep(random.randint(1, 5)) swim_len = random.randint(10, 20) return {"name": name, 'swim_len': swim_len} def distance(swim_res): # swim_res = swim_res.result() name = swim_res['name'] s_length = swim_res['swim_len'] print("%s 游了 %s km" % (name, s_length)) if __name__ == '__main__': # 1. 同步调用 pool = ThreadPoolExecutor(13) swim_res1 = pool.submit(swim, 'A').result() distance(swim_res1) swim_res2 = pool.submit(swim, 'B').result() # result取得结果 distance(swim_res2) swim_res3 = pool.submit(swim, 'C').result() distance(swim_res3) # 2. 异步调用 # pool = ThreadPoolExecutor(13) # pool.submit(swim, 'AA').add_done_callback(distance) # pool.submit(swim, 'BB').add_done_callback(distance) # pool.submit(swim, 'CC').add_done_callback(distance) print("#############################") 复制代码
(1)协程的切换是子程序切换,是由程序自身控制,没有线程切换的开销,和多线程比线程的数量越多,协程的性能优势越明显 (2)不需要多线程锁的机制,因为只有一个线程,也不存在同时写变量的冲突。在协程中共享资源不需要加锁,只需要判断状态就好了。所以执行效率比多线程好。 如何利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。 复制代码
#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @File : 73_Coroutine_yeild.py @Time : 2019/10/15 12:38 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """ import time def task1(name): for i in range(3): print("%s is running...%s" % (name, i)) yield time.sleep(1) def task2(name): for i in range(3): print("%s is running...%s" % (name, i)) yield time.sleep(1) def main(): g1 = task1('任务1') g2 = task2('任务2') for i in range(3): next(g1) next(g2) print("执行完毕") if __name__ == "__main__": main() 复制代码
当数据的引用数变成0的时候,python解释器就认为这个数据是来及,进行垃圾回收,释放空间 复制代码
通过对象存在的时间不同,采用不同的算法来回收垃圾,形象的比喻, 三个链表,零代链表上的对象(新创建的对象都加入到零代链表),引用数都是一,每增加一个指针,引用加一,随后python会检测列表中的互相引用的对象,根据规则减掉其引用计数. GC算法对链表一的引用减一,引用为0的,清除,不为0的到链表二,链表二也执行GC算法,链表三一样. 存在时间越长的数据,越是有用的数据.复制代码
