当前位置:   article > 正文

Python基础学习(十六)--同步异步,消息队列,协程,UDP网络编程_python中socket中的udp使用异步读写

python中socket中的udp使用异步读写

一、同步异步的概念

(一)多线程开发可能遇到的问题

如下,该程序很明显想要将g_num这个变量自加2000000,每个函数加1000000次。

但结果并不是这样。

  1. import threading
  2. g_num=0
  3. def hs1():
  4. global g_num
  5. for i in range(1000000):
  6. g_num+=1
  7. print(g_num)
  8. def hs2():
  9. global g_num
  10. for i in range(1000000):
  11. g_num += 1
  12. print(g_num)
  13. if __name__ == '__main__':
  14. t1=threading.Thread(target=hs1)
  15. t2=threading.Thread(target=hs2)
  16. t1.start()
  17. t2.start()

问题产生的原因就是没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。

这种现象称为“线程不安全”。

(二)同步

同步就是协同步调,按预定的前后次序进行运行。

不要误认为“同”是指一起动作,其实“同”是指协同、协助、互相配合的意思

如线程或进程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。

对于以上的线程不安全的问题,就可以通过线程同步来解决,使用互斥锁。

某个线程要更改共享数据时,先将其锁定,此时资源的状态为锁定状态,其他线程就不能更改,直到该线程将资源状态改为非锁定状态,也就是释放资源,其他的线程才能再次锁定资源。互斥锁保证了每次只有一个线程进入写入操作。从而保证了线程下数据的安全性。

  1. from threading import Thread,Lock
  2. g=0
  3. def hs1():
  4. global g
  5. for i in range(100000):
  6. l.acquire() # 锁定
  7. g+=1
  8. l.release() # 释放
  9. print('函数1:',g)
  10. def hs2():
  11. global g
  12. for i in range(100000):
  13. l.acquire()
  14. g += 1
  15. l.release()
  16. print('函数2:',g)
  17. if __name__ == '__main__':
  18. l=Lock()
  19. t1=Thread(target=hs1)
  20. t2=Thread(target=hs2)
  21. t1.start()
  22. t2.start()

经过上述加锁操作,最后结果就无误了,变成了2000000,但函数1的输出结果并不是1000000。

这是因为两线程互不干扰,各自执行,函数1先结束,先加满1000000次,函数2后结束。虽然函数2后结束,但函数1结束时,函数2也是执行了的,也对这个全局变量g_num进行了加的操作,只不过是还不够1000000次,所以函数1输出的g_num值不是1000000。

这样加锁就解决了这个问题。

  1. from threading import Thread,Lock
  2. g=0
  3. def hs1():
  4. global g
  5. l.acquire()
  6. for i in range(1000000):
  7. g+=1
  8. print('函数1:',g)
  9. l.release()
  10. def hs2():
  11. global g
  12. l.acquire()
  13. for i in range(1000000):
  14. g += 1
  15. print('函数2:',g)
  16. l.release()
  17. if __name__ == '__main__':
  18. l=Lock()
  19. t1=Thread(target=hs1)
  20. t2=Thread(target=hs2)
  21. t1.start()
  22. t2.start()

上锁过程

当一个线程调用锁的acquire()方法获得锁时,锁就进入‘locked’状态。

每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为'blocked'状态,也就是阻塞态,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入‘unlocked’状态。

线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行状态。

(三)异步

同步调用就是 你喊朋友吃饭,你朋友在忙,你就一直在那儿等着,等你朋友忙完了,你们一起去。

异步调用就是你喊朋友去吃饭,你朋友说知道了,待会儿忙完去找你,你就去左别的了。

(四)死锁

在多个线程共享资源的时候,如果两个县城分别战友一部分资源,并且同时等待对方的资源,就会造成死锁现象。

  1. from threading import Thread,Lock
  2. import time
  3. def hs1():
  4. global g
  5. for i in range(100000):
  6. lock1.acquire()
  7. print('函数1得到锁1,请求锁2')
  8. time.sleep(1)
  9. lock2.acquire()
  10. print('11111111111111111111')
  11. g+=1
  12. lock2.release()
  13. lock1.release()
  14. print('函数1:',g)
  15. def hs2():
  16. global g
  17. for i in range(100000):
  18. lock2.acquire()
  19. print('函数2得到锁2,请求锁1')
  20. time.sleep(1)
  21. lock1.acquire()
  22. print('22222222222222222222')
  23. g += 1
  24. lock1.release()
  25. lock2.release()
  26. print('函数2:',g)
  27. if __name__ == '__main__':
  28. lock1=Lock()
  29. lock2=Lock()
  30. t1=Thread(target=hs1)
  31. t2=Thread(target=hs2)
  32. t1.start()
  33. t2.start()

(五)队列

昨天说到的进程通信使用的队列是进程消息队列,即from multiprocessing import Queue

今天所说的是线程通信使用的队列,即from queue import Queue

它有和进程消息队列一样的方法。

  1. import queue
  2. q=queue.Queue(3)
  3. q.put(10)
  4. q.put(20)
  5. q.put(30)
  6. print('-------',q.full())
  7. while q.qsize()>0:
  8. print(q.get())
  9. print('结束')

还有它特有的优先级队列和后进先出队列。

PriorityQueue--优先级队列

数字越小的优先级越高

  1. import queue
  2. q=queue.PriorityQueue(3)
  3. q.put((2,'西瓜'))
  4. q.put((-1,'芒果'))
  5. q.put((3,'香蕉'))
  6. while not q.empty():
  7. print(q.get())

LifoQueue--后进先出队列

  1. import queue
  2. q=queue.LifoQueue()
  3. q.put(10)
  4. q.put(20)
  5. q.put(30)
  6. q.put(40)
  7. while q.qsize()>0:
  8. print(q.get())

(六)生产者消费者

  1. from threading import Thread
  2. import queue,time
  3. def producer(q,name):
  4. count=1
  5. while True:
  6. bz=name+'生产包子%d'%count
  7. print(bz)
  8. q.put(bz)
  9. time.sleep(1)
  10. count+=1
  11. def consumer(q,name):
  12. while True:
  13. bz=q.get()
  14. print(name+'吃'+bz)
  15. if __name__ == '__main__':
  16. q=queue.Queue(5)
  17. p1=Thread(target=producer,args=(q,'张三'))
  18. p2=Thread(target=producer,args=(q,'李四'))
  19. c1=Thread(target=consumer,args=(q,'111'))
  20. c2=Thread(target=consumer,args=(q,'222'))
  21. c3=Thread(target=consumer,args=(q,'333'))
  22. p1.start()
  23. p2.start()
  24. c1.start()
  25. c2.start()
  26. c3.start()

(七)threadLocal

先给出一段代码

  1. from threading import Thread
  2. import threading,time
  3. class A():
  4. pass
  5. def hanshu1(name):
  6. A.name=name
  7. hanshu2()
  8. def hanshu2():
  9. time.sleep(1)
  10. print(threading.current_thread(),A.name)
  11. if __name__ == '__main__':
  12. t1=Thread(target=hanshu1,args=('张三',),name='线程帅哥')
  13. t2=Thread(target=hanshu1,args=('李四',),name='线程美女')
  14. t1.start()
  15. t2.start()

它的结果是:

<Thread(线程帅哥, started 6180)> 李四
<Thread(线程美女, started 3248)> 李四

因为线程2在线程1调用hanshu2前,就启动了,完成了赋值,所以覆盖了线程1对类属性的赋值。

这时threadLocal的方法应运而生。

一个threadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。

threadLocal解决了参数在一个线程中的各个函数之间互相传递的问题。

  1. from threading import Thread
  2. import threading,time
  3. local=threading.local()
  4. def hanshu1(name):
  5. local.name=name
  6. hanshu2()
  7. def hanshu2():
  8. time.sleep(1)
  9. print(threading.current_thread(),local.name)
  10. if __name__ == '__main__':
  11. t1=Thread(target=hanshu1,args=('张三',),name='线程帅哥')
  12. t2=Thread(target=hanshu1,args=('李四',),name='线程美女')
  13. t1.start()
  14. t2.start()

这样代码就不会出现上述情况了。

(八)全局解释器

下面两段代码,一个是一个线程执行200000000次,一个是两个线程各自执行100000000,运行时间居然是相似的,并没有像想象中的那样相差一半左右。

  1. import threading,time
  2. def hanshu1():
  3. s=time.time()
  4. count=0
  5. for i in range(200000000):
  6. count+=1
  7. e=time.time()
  8. print(e-s)
  9. if __name__ == '__main__':
  10. t1=threading.Thread(target=hanshu1)
  11. t1.start()
  1. def hanshu2():
  2. s=time.time()
  3. count=0
  4. for i in range(100000000):
  5. count+=1
  6. e=time.time()
  7. print(e-s)
  8. if __name__ == '__main__':
  9. t1=threading.Thread(target=hanshu2)
  10. t2=threading.Thread(target=hanshu2)
  11. t1.start()
  12. t2.start()

因为python代码的执行是有python虚拟机(又名解释器主循环)进行控制的。

python在设计时是这样考虑的,主循环中同时只能有一个控制线程在执行,就像单核CPU系统中的多进程一样。

python解释器同内存一样,它可以运行多个线程,但是在任意给定时刻只能有一个线程会被解释器执行。

对python虚拟机的访问是由全局解释器锁(GIL)控制的。

二、协程

(一)什么是协程

协程,又称微线程,纤程。

在一个线程中的某个函数,可以在任何地方保存当前函数的临时变量等信息,然后切换到另外一个函数中执行,注意不是通过函数调用的方式做到的并且切换的次数以及什么时候在切换到原来的函数都由开发者确定

(二)协程和线程的差异

线程切换时,操作系统为了程序运行的高效性每个线程都有自己缓存Cache等数据,操作系统还会帮你做这些数据的恢复操作。所以线程的切换非常耗性能。协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次也扛得住。

(三)协程的问题

系统并不感知,所以操作系统不会帮你做切换。切换是由开发者确定的。

(四)好处

尤其适用于IO密集型的程序。

  1. def hanshu1():
  2. for i in range(3):
  3. print('函数1',i)
  4. yield
  5. def hanshu2(a):
  6. for i in range(3):
  7. next(a)
  8. print('函数2',i)
  9. h=hanshu1()
  10. hanshu2(h)

三、网络编程--UDP

客户端:

  1. # 不同机器,端口号可相同
  2. from socket import *
  3. import threading
  4. udp_s = socket(AF_INET, SOCK_DGRAM)
  5. dest_address = ('10.10.107.240', 9999) # 对方
  6. udp_s.bind(('10.10.107.68', 8899)) # 我
  7. def send():
  8. while True:
  9. data = input()
  10. udp_s.sendto(data.encode('utf-8'), dest_address)
  11. def get():
  12. while True:
  13. s, addr = udp_s.recvfrom(124) # 一次接收124字节的字符
  14. print(s.decode('utf-8'))
  15. if __name__ == '__main__':
  16. t1 = threading.Thread(target=send)
  17. t2 = threading.Thread(target=get)
  18. t1.start()
  19. t2.start()

服务器端:

  1. import threading
  2. from socket import *
  3. udp_s=socket(AF_INET,SOCK_DGRAM)
  4. local_address=('10.10.107.240',9999)
  5. udp_s.bind(local_address) # 我
  6. dest_address=('10.10.107.68',8899) # 对方
  7. def send():
  8. while True:
  9. data = input()
  10. udp_s.sendto(data.encode('utf-8'), dest_address)
  11. def get():
  12. while True:
  13. s, addr = udp_s.recvfrom(124)
  14. print(s.decode('utf-8'))
  15. if __name__ == '__main__':
  16. t1=threading.Thread(target=send)
  17. t2=threading.Thread(target=get)
  18. t1.start()
  19. t2.start()

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/85325
推荐阅读
相关标签
  

闽ICP备14008679号