当前位置:   article > 正文

Python(网络编程、并发编程)_python 网络编程 并行通信

python 网络编程 并行通信

目录

一、术语扫盲

二、网络编程

1、套接字通信socket

1.1、基于tcp协议

1.2、基于udp协议

2、粘包问题

3、套接字实现并发socketserver

3.1、基于tcp协议

3.2、基于udp协议

4、linux安装python

三、并发编程 

1、操作系统发展史

2、多道技术(单核实现并发的效果)

四、进程

1、进程理论

2、开启进程的两种方式

3、进程对象的方法

4、进程间数据相互隔离(通过第三方模块交互)

4.1、队列

4.2、IPC机制

4.3、生产者消费者模型

5、守护进程

五、线程

1、线程理论

2、开启线程的两种方式

3、线程对象的方法

4、同一进程内的多个线程数据共享

5、守护线程

六、锁

1、互斥锁

2、GIL全局解释器锁

3、死锁与递归锁

4、信号量

七、拓展

1、Event事件

2、多进程与多线程比较

3、进程池与线程池

4、协程

5、IO模型


一、术语扫盲

  1. 1、CS架构与BS架构
  2. CS即Client/Server(客户机/服务器)结构
  3. BS即Browser/Server(浏览器/服务器)结构
  4. 开发维护成本:cs开发维护成本高于bs
  5. 安全性:cs安全性高于bs
  6. 客户端负载:cs客户端负载大于bs
  7. 响应速度:cs相应速度高于bs
  8. 2、网络通信
  9. 网络=物理链接介质+互联网通信协议
  10. 网络存在的意义就是跨地域数据传输===>称之为通信
  11. 3、OSI七层协议
  12. 五层协议
  13.         应用层
  14.         传输层
  15.         网络层
  16.         数据链路层
  17.         物理层
  18. 协议:规定数据的组织格式(头部+数据部分)
  19. 3.1物理层
  20. 一组数据称为:位(电信号010101)
  21. 单纯的电信号毫无意义,必须对其进行分组
  22. 3.2数据链路层:ethernet以太网协议
  23. 一组数据称为:帧=头(源mac地址与目标mac地址)+数据(网络层发过来的整体内容)
  24. mac地址:但凡接入互联网的主机必须有一块网卡,每块网卡在出厂时都烧制好全世界独一无二的地址
  25. 计算机通信基本靠吼,即以太网协议的工作方式是广播
  26. 3.3网络层:IP协议
  27. 一组数据称为:包=头(源IP地址与目标IP地址)+数据(传输层发过来的整体内容)
  28. 要达到的目的:
  29. 划分广播域
  30. 每一个广播域但凡要接通外部,一定要有一个网关帮内部的计算机转发包到网上
  31. 网关与外界通信走的是路由协议
  32. 一个合法的ipv4地址=ip地址/子网掩码地址
  33. 3.4传输层:tcp\udp协议
  34. 一组数据称为:段=头(源port与目标port)+数据(应用层发过来的整体内容)
  35. 基于端口,端口范围0-655350-1023为系统占用端口
  36. ip+port ===> 标识全世界范围内独一无二的一个基于网络通信的应用程序
  37. 基于tcp协议通信之前:必须建立一个双向通信的链接
  38. 三次握手,四次挥手
  39. tcp是可靠传输的,发送数据必须等到对方确认后才算完成,才会将自己内存中的数据清除
  40. ps:当服务端大量处于TIME_WAIT状态时意味着服务端正在经历高并发
  41. 3.5应用层
  42. 可以自定义协议=》头部(对数据的描述信息发给谁数据的类型长度等)+数据部分
  43. http https ftp
  44. 总结:
  45. mac => 标识局域网内独一无二的一台计算机
  46. ip + mac => 标识全世界范围内独一无二的一台计算机
  47. ARP协议通过ip地址解析mac地址:
  48. 两台计算机在同一局域网内(源ip与目标ip网络地址一样),目标mac地址就是计算机2的mac地址
  49. 两台计算机不在同一局域网内(源ip与目标ip网络地址不一样),目标mac地址就是网关的mac地址
  50. ip => 标识全世界范围内独一无二的一台计算机
  51. ip + port => 标识全世界范围内独一无二的一台计算机上的一个基于网络通信的应用程序
  52. 应用层 数据
  53. 传输层 tcp
  54. 网络层 ip
  55. 数据链路层 ethernet
  56. 物理层
  57. (原mac地址,目标mac地址) (源ip地址,目标ip地址) (源port,目标port) 数据
  58. 在经由物理层转换为电信号

二、网络编程

1、套接字通信socket

1.1、基于tcp协议

  1. import socket
  2. server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  3. server.bind(("127.0.0.1",8080))
  4. server.listen(5)
  5. # 链接循环
  6. while True:
  7. conn,addr=server.accept()
  8. # 通信循环
  9. while True:
  10. try:
  11. data=conn.recv(1024)
  12. conn.send(data.upper())
  13. except Exception:
  14. break
  15. conn.close()
  1. import socket
  2. client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  3. client.connect(("127.0.0.1",8080))
  4. while True:
  5. msg=input("请输入信息:").strip()
  6. if msg=="":continue
  7. client.send(msg.encode("utf-8"))
  8. data=client.recv(1024)
  9. print(data.decode("utf-8"))
  10. client.close()

1.2、基于udp协议

  1. import socket
  2. server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  3. server.bind(("127.0.0.1",8080))
  4. while True:
  5. data,addr=server.recvfrom(1024)
  6. server.sendto(data.upper(),addr)
  1. import socket
  2. server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  3. while True:
  4. msg=input("请输入:").strip()
  5. server.sendto(msg.encode("utf-8"),("127.0.0.1",8080))
  6. res=server.recvfrom(1024)
  7. print(res)

2、粘包问题

  1. import socket
  2. import os
  3. import json
  4. import struct
  5. server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  6. server.bind(("127.0.0.1",8080))
  7. server.listen(5)
  8. conn,addr=server.accept()
  9. while True:
  10. try:
  11. path=conn.recv(1024)
  12. path=path.decode("utf-8")
  13. filename=os.path.basename(path)
  14. size=os.path.getsize(filename)
  15. # 制作头:包含数据的一些信息
  16. header={
  17. "filename":filename,
  18. "size":size
  19. }
  20. header_str=json.dumps(header)
  21. header_bytes=header_str.encode("utf-8")
  22. # 先发送头的长度:4个字节
  23. x=struct.pack("i", len(header_bytes))
  24. conn.send(x)
  25. # 再发送头
  26. conn.send(header_bytes)
  27. # 再发送真实数据
  28. with open(path,"rb") as f:
  29. while True:
  30. msg = f.read(1024)
  31. if len(msg)==0:
  32. break
  33. conn.send(msg)
  34. except Exception:
  35. break
  36. conn.close()
  1. '''
  2. 粘包问题出现的原因
  3. 1、tcp是流式协议,数据像水流一样粘在一起没有边界区分
  4. 2、收数据没有收干净,残留部分会和下一次结果混在一起
  5. ps:
  6. 1、udp协议是报式协议,数据一报一报的一个recv对应一个send
  7. 2、收不干净就丢掉,所以没有粘包问题
  8. 解决粘包问题的思路:每次都收干净就行
  9. 1、拿到数据的总大小total_size
  10. 2、recv_size+=循环接收的数据,直至recv_size=total_size
  11. '''
  12. import socket
  13. import struct
  14. import json
  15. client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  16. client.connect(("127.0.0.1",8080))
  17. while True:
  18. msg=input("请输入文件路径:").strip()
  19. if msg=="":continue
  20. client.send(msg.encode("utf-8"))
  21. # 先接收4个字节,获取头的长度
  22. x=client.recv(4)
  23. header_len=struct.unpack("i",x)[0]
  24. # 根据头的长度,再接收头
  25. header_bytes=client.recv(header_len)
  26. header_str=header_bytes.decode("utf-8")
  27. header=json.loads(header_str)
  28. # 根据头的内容,再接收真实数据
  29. total_size=header["size"]
  30. recv_size=0
  31. while recv_size<total_size:
  32. msg=client.recv(1024)
  33. try:
  34. print(msg.decode("utf-8"),end="")
  35. except Exception:
  36. pass
  37. recv_size+=len(msg)
  38. else:
  39. print()
  40. client.close()

3、套接字实现并发socketserver

3.1、基于tcp协议

  1. import socketserver
  2. class MyRequestHandler(socketserver.BaseRequestHandler):
  3. # 通信循环:启动一个线程进行通信循环
  4. def handle(self):
  5. # self.request => 链接对象
  6. # self.client_address => 客户端ip、port
  7. while True:
  8. try:
  9. data = self.request.recv(1024)
  10. self.request.send(data.upper())
  11. except Exception:
  12. break
  13. self.request.close()
  14. # 链接循环:从半链接池中取出链接请求,与其建立双向链接
  15. s=socketserver.ThreadingTCPServer(("127.0.0.1",8080),MyRequestHandler)
  16. s.serve_forever()
  1. import socket
  2. client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  3. client.connect(("127.0.0.1",8080))
  4. while True:
  5. msg=input("请输入信息:").strip()
  6. if msg=="":continue
  7. client.send(msg.encode("utf-8"))
  8. data=client.recv(1024)
  9. print(data.decode("utf-8"))
  10. client.close()

3.2、基于udp协议

  1. import socketserver
  2. class MyRequestHandler(socketserver.BaseRequestHandler):
  3. def handle(self):
  4. # self.request => (b'hello', <socket.socket fd=360, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
  5. # self.client_address => ('127.0.0.1', 52491)
  6. client_data=self.request[0]
  7. server=self.request[1]
  8. client_addr=self.client_address
  9. server.sendto(client_data.upper(),client_addr)
  10. s=socketserver.ThreadingUDPServer(("127.0.0.1",8080),MyRequestHandler)
  11. s.serve_forever()
  1. import socket
  2. server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  3. while True:
  4. msg=input("请输入:").strip()
  5. server.sendto(msg.encode("utf-8"),("127.0.0.1",8080))
  6. res=server.recvfrom(1024)
  7. print(res)

4、linux安装python

  1. cd /home
  2. wget https://www.python.org/ftp/python/3.7.6/Python-3.7.6.tgz
  3. tar -xvf Python-3.7.6.tgz
  4. cd Python-3.7.6
  5. ./configure --prefix=/usr/local/python3
  6. make && make install
  7. vim /etc/profile

三、并发编程 

1、操作系统发展史

参考博客:操作系统的发展史 - JasonJi - 博客园 (cnblogs.com)

2、多道技术(单核实现并发的效果)

并发与并行

  • 并发:看起来像同时运行的
  • 并行:真正意义上的同时执行

如何实现

  • 空间上的复用:多个程序共用一套计算机硬件
  • 时间上的复用:切换(一个程序遇到IO操作、长时间占用cpu)+保存状态

多道技术图解

四、进程

1、进程理论

程序与进程

  • 程序:躺在硬盘的一堆代码,是静态的。(程序想要被计算机运行,它的代码必须要先由硬盘读到内存,之后CPU取指令再执行)
  • 进程:程序的执行过程,是动态的。(创建进程就是在内存中申请一块空间,将需要运行的代码丢进去,一个进程对应一块独立的内存空间)

进程的调度

  • 先来先服务调度算法:对长作业有利,对短作业无利
  • 短作业优先调度算法:对短作业有利,对长作业无利
  • 时间片轮转法+多级反馈队列:

同步与异步(描述的是任务的提交方式)

  • 同步:任务提交后,原地等待任务的返回结果
  • 异步:任务提交之后,不原地等待任务的返回结果,任务的返回结果会有一个异步回调机制自动处理

阻塞与非阻塞 (描述的是程序的运行状态)

  • 阻塞:阻塞态
  • 非阻塞:就绪态、运行态

时间片轮转法+多级反馈队列

进程运行的三状态图

2、开启进程的两种方式

  1. from multiprocessing import Process
  2. import time
  3. def task(name):
  4. print("{} is running".format(name))
  5. time.sleep(3)
  6. print("{} is over".format(name))
  7. if __name__ == '__main__':
  8. # 1、创建一个进程
  9. p=Process(target=task,args=("Cang",))
  10. # 2、开启进程
  11. p.start()
  12. print("主")
  1. from multiprocessing import Process
  2. import time
  3. class MyProcess(Process):
  4. def run(self):
  5. print("aaa")
  6. time.sleep(3)
  7. print("bbb")
  8. if __name__ == '__main__':
  9. p=MyProcess()
  10. p.start()
  11. print("主")

3、进程对象的方法

  1. from multiprocessing import Process,current_process
  2. import os
  3. import time
  4. def func():
  5. # print("进程:",current_process().pid)
  6. print("子进程:",os.getpid()) # 查看当前进程的pid
  7. print("主进程:",os.getppid()) # 查看当前进程的父进程的pid
  8. if __name__ == '__main__':
  9. p=Process(target=func)
  10. p.start()
  11. # p.terminate() # 杀死当前进程,是告诉操作系统帮你杀死进程,但是需要一定的时间
  12. # print(p.is_alive()) # 判断当前进程是否存活
  13. p.join() # 等待子进程运行结束之后,再继续运行主进程,不影响其它子进程的运行

4、进程间数据相互隔离(通过第三方模块交互)

4.1、队列

  1. import queue
  2. q=queue.Queue(3) # 括号内可以传数字,标示生成的队列最大可以同时存放的数据量
  3. q.put(111)
  4. q.put(222)
  5. q.put(333)
  6. # q.put(444) # 当队列为满时,等待存值
  7. # print(q.empty()) # 判断队列是否为空
  8. # print(q.full()) # 判断队列是否为满
  9. v1=q.get()
  10. v2=q.get()
  11. v3=q.get()
  12. # v4=q.get() # 当队列为空时,等待取值
  13. # v4=q.get(timeout=3) # 只等3s,若还没有值报错queue.Empty
  14. # v4=q.get_nowait() # 等都不等,没值直接报错queue.Empty
  15. import queue
  16. q=queue.LifoQueue(3)
  17. q.put(111)
  18. q.put(222)
  19. q.put(333)
  20. print(q.get())
  21. print(q.get())
  22. print(q.get())
  23. import queue
  24. q=queue.PriorityQueue(3)
  25. q.put((3,111))
  26. q.put((5,222))
  27. q.put((-1,333))
  28. print(q.get())
  29. print(q.get())
  30. print(q.get())

4.2、IPC机制

  1. from multiprocessing import Process, Queue
  2. def producer(q):
  3. q.put(111)
  4. def consumer(q):
  5. print(q.get())
  6. if __name__ == '__main__':
  7. q=Queue()
  8. p1=Process(target=producer,args=(q,))
  9. p2=Process(target=consumer,args=(q,))
  10. p1.start()
  11. p2.start()

4.3、生产者消费者模型

  1. from multiprocessing import Process,JoinableQueue
  2. import time
  3. import random
  4. def producer(name,food,q):
  5. for i in range(3):
  6. data="{}生产{}{}".format(name,food,i)
  7. print(data)
  8. time.sleep(random.random())
  9. q.put(data)
  10. def consumer(name,q):
  11. while True:
  12. data=q.get()
  13. time.sleep(random.random())
  14. print("{}吃{}".format(name,data))
  15. q.task_done()
  16. if __name__ == '__main__':
  17. q=JoinableQueue()
  18. p1=Process(target=producer,args=("egon","包子",q))
  19. p2=Process(target=producer,args=("tank","泔水",q))
  20. p1.start()
  21. p2.start()
  22. p1.join()
  23. p2.join()
  24. c1=Process(target=consumer,args=("cang",q))
  25. c1.daemon=True
  26. c1.start()
  27. q.join()
  28. print("zhu")

5、守护进程

  1. from multiprocessing import Process
  2. import time
  3. def task(name):
  4. print("{} is running".format(name))
  5. time.sleep(3)
  6. print("{} is over".format(name))
  7. if __name__ == '__main__':
  8. p=Process(target=task,args=("cang",))
  9. p.daemon=True # 将进程p设置成守护进程,这一句一定要放在start方法上面才有效否则会报错
  10. p.start()
  11. print("主寿终正寝")

五、线程

1、线程理论

进程:资源单位(创建一个进程就是在内存中申请一块空间将需要运行的代码丢进去)
线程:执行单位(线程指得是代码的执行过程,执行过程中需要的资源向线程所在的进程索要)

为何要有线程:开设线程的开销要远远小于开设进程的开销(申请内存空间、拷贝代码)

2、开启线程的两种方式

  1. from threading import Thread
  2. import time
  3. def task(name):
  4. print("{} is running".format(name))
  5. time.sleep(1)
  6. print("{} is over".format(name))
  7. if __name__ == '__main__':
  8. t=Thread(target=task,args=("cang",))
  9. t.start()
  10. print("zhu")
  1. from threading import Thread
  2. import time
  3. class MyThread(Thread):
  4. def __init__(self,name):
  5. super().__init__()
  6. self.name=name
  7. def run(self):
  8. print("{} is running".format(self.name))
  9. time.sleep(1)
  10. print("{} is over".format(self.name))
  11. if __name__ == '__main__':
  12. t=MyThread("cang")
  13. t.start()
  14. print("zhu")

3、线程对象的方法

  1. from threading import Thread,current_thread,active_count
  2. import time
  3. import os
  4. def task(n):
  5. print(os.getpid())
  6. print(current_thread().name)
  7. time.sleep(n)
  8. if __name__ == '__main__':
  9. t1=Thread(target=task,args=(1,))
  10. t2=Thread(target=task,args=(2,))
  11. t1.start()
  12. t2.start()
  13. t1.join() # 等待子线程运行结束后再运行主线程,不影响其它子线程的运行
  14. print(os.getpid()) # 获取进程的pid
  15. print(current_thread().name) # 获取线程的名字
  16. print(active_count()) # 统计当前正在活跃的线程数

4、同一进程内的多个线程数据共享

  1. from threading import Thread
  2. money=100
  3. def task():
  4. global money
  5. money=666
  6. if __name__ == '__main__':
  7. t=Thread(target=task)
  8. t.start()
  9. t.join()
  10. print(money)

5、守护线程

  1. from threading import Thread
  2. import time
  3. def task():
  4. print("123")
  5. time.sleep(1)
  6. print("end123")
  7. def foo():
  8. print("456")
  9. time.sleep(3)
  10. print("end456")
  11. if __name__ == '__main__':
  12. t1=Thread(target=task)
  13. t2=Thread(target=foo)
  14. t1.daemon=True
  15. t1.start()
  16. t2.start()
  17. print("zhu")

六、锁

1、互斥锁

  1. '''
  2. 互斥锁:
  3. 多个进程/线程操作同一份数据的时候,会出现数据错乱的问题
  4. 加锁处理,将并发变成串行,牺牲效率但是保证了数据的安全性
  5. '''
  6. from multiprocessing import Process,Lock
  7. import json
  8. import time
  9. import random
  10. def search(i):
  11. with open("a.txt",mode="rt",encoding="utf-8") as f:
  12. res=f.read()
  13. dic=json.loads(res)
  14. print("用户{}查询票数:{}".format(i,dic.get("ticket_num")))
  15. def buy(i):
  16. with open("a.txt",mode="rt",encoding="utf-8") as f:
  17. res=f.read()
  18. dic=json.loads(res)
  19. time.sleep(random.randint(1,3))
  20. if dic.get("ticket_num")>0:
  21. dic["ticket_num"] -= 1
  22. with open("a.txt",mode="wt",encoding="utf-8") as f:
  23. res=json.dumps(dic)
  24. f.write(res)
  25. print("用户{}买票成功".format(i))
  26. else:
  27. print("用户{}买票失败".format(i))
  28. def run(i,mutex):
  29. search(i)
  30. # 给买票环节加锁处理
  31. # 枪锁
  32. mutex.acquire()
  33. buy(i)
  34. # 释放锁
  35. mutex.release()
  36. if __name__ == '__main__':
  37. # 在主进程中生成一把锁,让所有的子进程抢,谁先抢到谁买票
  38. mutex=Lock()
  39. for i in range(1,10):
  40. p=Process(target=run,args=(i,mutex))
  41. p.start()

2、GIL全局解释器锁

  1. '''
  2. GIL全局解释器锁:
  3. 是Cpython解释器的特点,而不是python的特点
  4. 保证的是解释器级别的数据安全,针对不同的数据还是需要加不同的锁来处理
  5. 会导致同一个进程下的多个线程无法同时执行,也就无法利用多核优势(解释型语言的通病)
  6. '''
  7. from threading import Thread, Lock
  8. import time
  9. money = 100
  10. def task(mutex):
  11. global money
  12. mutex.acquire()
  13. tem = money
  14. time.sleep(0.01)
  15. money = tem - 1
  16. mutex.release()
  17. if __name__ == '__main__':
  18. mutex = Lock()
  19. t_list = []
  20. for i in range(100):
  21. t = Thread(target=task, args=(mutex,))
  22. t.start()
  23. t_list.append(t)
  24. for t in t_list:
  25. t.join()
  26. print(money)
  27. '''
  28. 100个线程起起来之后,去抢GIL
  29. 我抢到了GIL,又抢到了互斥锁(无人争抢,其它线程还在等着抢GIL)
  30. 我进入IO 自动释放GIL,其它线程虽然抢到了GIL但是抢不到互斥锁,最后还是乖乖释放GIL
  31. 我又抢到了GIL,释放互斥锁,释放GIL
  32. '''

3、死锁与递归锁

  1. '''
  2. 死锁:
  3. 我手上有锁A,然后去抢锁B
  4. 你手上有锁B,然后去抢锁A
  5. 递归锁:
  6. 可以被连续的acquire和release
  7. 每acquire一次计数加一,每release一次计数减一
  8. 只要计数不为0,那么其他人都无法抢到该锁
  9. '''
  10. from threading import Thread, RLock
  11. import time
  12. mutexB = mutexA = RLock()
  13. class MyThread(Thread):
  14. def __init__(self, name):
  15. super().__init__()
  16. self.name = name
  17. def run(self):
  18. self.task1()
  19. self.task2()
  20. def task1(self):
  21. mutexA.acquire()
  22. print("线程{}抢到锁A".format(self.name))
  23. mutexB.acquire()
  24. print("线程{}抢到锁B".format(self.name))
  25. mutexB.release()
  26. print("线程{}释放锁B".format(self.name))
  27. mutexA.release()
  28. print("线程{}释放锁A".format(self.name))
  29. def task2(self):
  30. mutexB.acquire()
  31. print("线程{}抢到锁B".format(self.name))
  32. time.sleep(1)
  33. mutexA.acquire()
  34. print("线程{}抢到锁A".format(self.name))
  35. mutexA.release()
  36. print("线程{}释放锁A".format(self.name))
  37. mutexB.release()
  38. print("线程{}释放锁B".format(self.name))
  39. if __name__ == '__main__':
  40. for i in range(10):
  41. t = MyThread(i)
  42. t.start()

4、信号量

  1. '''
  2. 信号量:
  3. 在不同的阶段可能对应不同的技术点,在并发编程中指的是锁
  4. 如果将互斥锁比成一个坑的话,那么信号量就相当于多个坑
  5. '''
  6. from threading import Thread, Semaphore
  7. import time
  8. import random
  9. sp = Semaphore(5) # 坑位
  10. def task(name):
  11. sp.acquire()
  12. print("线程{}在蹲坑".format(name))
  13. time.sleep(random.randint(1,3))
  14. sp.release()
  15. if __name__ == '__main__':
  16. for i in range(20):
  17. t=Thread(target=task,args=(i,))
  18. t.start()

七、拓展

1、Event事件

  1. '''
  2. Event事件:
  3. 一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似发射信号
  4. '''
  5. from threading import Thread, Event
  6. import time
  7. sub=Event()
  8. def light():
  9. print("红灯亮")
  10. time.sleep(3)
  11. print("绿灯亮")
  12. sub.set()
  13. def car(name):
  14. print("{}等待绿灯".format(name))
  15. sub.wait()
  16. print("{}加油门飙起来".format(name))
  17. if __name__ == '__main__':
  18. t=Thread(target=light)
  19. t.start()
  20. for i in range(20):
  21. t=Thread(target=car,args=(i,))
  22. t.start()

2、多进程与多线程比较

  1. '''
  2. 同一个进程下的多个线程无法利用多核优势,那么多线程是不是没有用了
  3. 计算密集型(多核四个任务)
  4. 多进程:总耗时10+
  5. 多线程:总耗时40+
  6. IO密集型(多核四个任务)
  7. 多进程:相对浪费资源
  8. 多线程:相对节省资源
  9. 总结:多进程和多线程都有各自的优势,通常使用多进程下开设多线程,这样既可以利用多核也可以减少资源消耗
  10. '''
  11. from multiprocessing import Process
  12. from threading import Thread
  13. import time
  14. import os
  15. def task():
  16. # m = 1
  17. # for i in range(1, 100000):
  18. # m *= i
  19. time.sleep(0.01)
  20. if __name__ == '__main__':
  21. start_time=time.time()
  22. print(os.cpu_count())
  23. p_list = []
  24. for i in range(10):
  25. # p = Process(target=task)
  26. # p.start()
  27. # p_list.append(p)
  28. t=Thread(target=task)
  29. t.start()
  30. p_list.append(t)
  31. for p in p_list:
  32. p.join()
  33. print(time.time()-start_time)

3、进程池与线程池

  1. '''
  2. 进程池与线程池:
  3. 在保证计算机安全的情况下,最大限度的利用计算机
  4. 虽然降低了效率,但是它保证了计算机的安全
  5. '''
  6. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  7. import time
  8. def task(n):
  9. print(n)
  10. time.sleep(2)
  11. return "===>:{}".format(n ** 2)
  12. def call_back(res):
  13. print(res.result())
  14. if __name__ == '__main__':
  15. pool = ThreadPoolExecutor(5)
  16. for i in range(20):
  17. res = pool.submit(task, i)
  18. res.add_done_callback(call_back)

4、协程

  1. '''
  2. 进程:资源单位
  3. 线程:执行单位
  4. 协程:单线程实现并发
  5. 本来cpu遇到IO或者长时间被占用,需要进行切换
  6. 但是我们程序员自己在代码层面上检测所以的IO操作,一旦遇到IO了我们在代码级别完成切换
  7. 这样给cpu的感觉就是没有遇到IO,从而提高了程序的运行效率
  8. '''
  9. # import time
  10. # def task1():
  11. # for i in range(10000000):
  12. # i+1
  13. # def task2():
  14. # for i in range(10000000):
  15. # i+1
  16. #
  17. # start_time=time.time()
  18. # task1()
  19. # task2()
  20. # print(time.time()-start_time) # 1.5311427116394043
  21. # import time
  22. # def task1():
  23. # for i in range(10000000):
  24. # i + 1
  25. # yield
  26. # def task2():
  27. # t = task1()
  28. # for i in range(10000000):
  29. # i + 1
  30. # next(t)
  31. #
  32. # start_time = time.time()
  33. # task2()
  34. # print(time.time() - start_time) # 2.8123199939727783
  35. # 总结:计算密集型任务,切换会降低效率
  36. # import time
  37. #
  38. # def task1():
  39. # print("hello")
  40. # time.sleep(2)
  41. # def task2():
  42. # print("world")
  43. # time.sleep(3)
  44. # start_time = time.time()
  45. # task1()
  46. # task2()
  47. # print(time.time() - start_time) # 5.0310328006744385
  48. from gevent import monkey;monkey.patch_all()
  49. from gevent import spawn
  50. import time
  51. def task1():
  52. print("hello")
  53. time.sleep(2)
  54. def task2():
  55. print("world")
  56. time.sleep(3)
  57. start_time = time.time()
  58. s1=spawn(task1)
  59. s2=spawn(task2)
  60. s1.join()
  61. s2.join()
  62. print(time.time() - start_time) # 3.031080722808838

5、IO模型

5.1、阻塞模型

  1. import socket
  2. server=socket.socket()
  3. server.bind(("127.0.0.1",8080))
  4. server.listen(5)
  5. while True:
  6. conn,addr=server.accept()
  7. while True:
  8. try:
  9. data=conn.recv(1024)
  10. conn.send(data.upper())
  11. except Exception:
  12. break
  13. conn.close()
  1. import socket
  2. client=socket.socket()
  3. client.connect(("127.0.0.1",8080))
  4. while True:
  5. client.send("hello".encode("utf-8"))
  6. data=client.recv(1024)
  7. print(data.decode("utf-8"))

5.2、非阻塞模型

  1. '''
  2. 虽然非阻塞IO模型给你的感觉很牛逼
  3. 但是该模型会长时间占用cpu而不干活
  4. '''
  5. import socket
  6. server=socket.socket()
  7. server.bind(("127.0.0.1",8080))
  8. server.listen(5)
  9. server.setblocking(False)
  10. r_list=[]
  11. del_list=[]
  12. while True:
  13. try:
  14. conn,addr=server.accept()
  15. r_list.append(conn)
  16. except BlockingIOError:
  17. for conn in r_list:
  18. try:
  19. data = conn.recv(1024)
  20. conn.send(data.upper())
  21. except BlockingIOError:
  22. continue
  23. except ConnectionRefusedError:
  24. conn.close()
  25. del_list.append(conn)
  26. continue
  27. for conn in del_list:
  28. r_list.remove(conn)
  29. del_list.clear()
  1. import socket
  2. client=socket.socket()
  3. client.connect(("127.0.0.1",8080))
  4. while True:
  5. client.send("hello".encode("utf-8"))
  6. data=client.recv(1024)
  7. print(data.decode("utf-8"))

5.3、IO多路复用

  1. '''
  2. select机制 windows、linux都有
  3. poll机制 只有linux有
  4. select和poll都可以监管多个对象,但是poll监管的数量更多
  5. 上述两个机制对于监管特别多的对象时,会出现相当大的延时响应
  6. epoll机制 只有linux有
  7. 它给每个对象都绑定了一个回调机制,一旦有响应回调机制立刻发起提醒
  8. 针对不同的操作系统还要考虑不同的监管机制,书写代码太过繁琐
  9. 这是有一个人跳出来了(selectors模块)能够根据你跑的平台不同,自动帮你选择对应的监管机制
  10. '''
  11. import socket
  12. import select
  13. server=socket.socket()
  14. server.bind(("127.0.0.1",8080))
  15. server.listen(5)
  16. server.setblocking(False)
  17. read_list = [server] # 将server对象添加到监管列表中
  18. while True:
  19. r_list,w_list,x_list=select.select(read_list,[],[]) # 操作系统会监管这些对象,返回可执行的对象
  20. for i in r_list: # 针对不同的对象作不同的操作
  21. if i is server: # 针对可执行的server对象,建立链接,并将该链接对象conn添加到监管列表中
  22. conn,addr=i.accept()
  23. read_list.append(conn)
  24. else: # 针对可执行的conn对象,进行接收数据操作
  25. try:
  26. data=i.recv(1024)
  27. conn.send(data.upper())
  28. except Exception:
  29. i.close()
  30. read_list.remove(i)
  1. import socket
  2. client=socket.socket()
  3. client.connect(("127.0.0.1",8080))
  4. while True:
  5. client.send("hello".encode("utf-8"))
  6. data=client.recv(1024)
  7. print(data.decode("utf-8"))

5.4、异步IO

异步IO模型是所有模型中效率最高的,也是应用最广泛的

相关的模块与框架

        模块:asyncio

        框架:sanic        tronado        twisted

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/197651
推荐阅读
相关标签
  

闽ICP备14008679号