当前位置:   article > 正文

Python3并发编程-多线程threading_python3 multithreading 并发请求

python3 multithreading 并发请求

目录

1:线程的创建

1.1:Thread类创建线程

1.2:继承Thread类的子类创建

2:线程的同步

2.1:锁:Lock

2.2:死锁

2.3:递归锁:RLock

2.4:信号量:BoundedSemaphore

2.5:事件:Event

2.6:线程池

2.6.1:submit方法

2.6.2:map方法


同一进程的各个线程间可以共享主线程的地址空间和各种资源。

1:线程的创建

1.1:Thread类创建线程

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. import os
  4. import time
  5. def func(index,dic):
  6. print(f'线程{index};进程id={os.getpid()}')
  7. dic['cnt'] += 1
  8. count = 0
  9. while True:
  10. time.sleep(1)
  11. if count > 2:
  12. break
  13. count += 1
  14. dic['cnt'] += 1
  15. print(f'线程{index};cnt=[{dic["cnt"]}]')
  16. print(f'线程{index};cnt=[{dic["cnt"]}],退出')
  17. def main():
  18. dic = {}
  19. dic['cnt'] = 0
  20. t_list = []
  21. for i in range(5):
  22. t = Thread(target=func, args=(i, dic,)) # 线程中数据是共享的,看dic中数据的变化
  23. # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出
  24. t.start()
  25. # print(t.getName()) # 获取线程名称
  26. t_list.append(t)
  27. print(f'主线程,进程id={os.getpid()}')
  28. for t in t_list:
  29. t.join() # 等待线程的结束,非必须。
  30. print(f'主线程;cnt=[{dic["cnt"]}],退出')
  31. if __name__ == '__main__':
  32. main()

1.2:继承Thread类的子类创建

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. import os
  4. import time
  5. class MyThread(Thread):
  6. def __init__(self, index,dic):
  7. super().__init__()
  8. self.index = index
  9. self.dic = dic
  10. def run(self): # 调用start()函数之后会自动调用此函数
  11. print(f'线程{self.index};进程id={os.getpid()}')
  12. self.dic['cnt'] += 1
  13. count = 0
  14. while True:
  15. time.sleep(1)
  16. if count > 2:
  17. break
  18. count += 1
  19. self.dic['cnt'] += 1
  20. print(f'线程{self.index};cnt=[{self.dic["cnt"]}]')
  21. print(f'线程{self.index};cnt=[{self.dic["cnt"]}],退出')
  22. def main():
  23. dic = {}
  24. dic['cnt'] = 0
  25. t_list = []
  26. for i in range(5):
  27. t = MyThread(i, dic)
  28. # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出
  29. t.start()
  30. # print(t.getName()) # 获取线程名称
  31. t_list.append(t)
  32. print(f'主线程,进程id={os.getpid()}')
  33. for t in t_list:
  34. t.join() # 等待线程的结束,非必须。
  35. print(f'主线程;cnt=[{dic["cnt"]}],退出')
  36. if __name__ == '__main__':
  37. main()

2:线程的同步

同一进程下的线程资源是共享的,但是对共享数据的操作是不安全的,因此需要进行同步操作。

2.1:锁:Lock

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. from multiprocessing import Lock
  4. import os
  5. import time
  6. def func(index,dic,lock):
  7. print(f'线程{index};进程id={os.getpid()}')
  8. while True:
  9. try:
  10. lock.acquire()
  11. cnt = dic['cnt']
  12. time.sleep(0.0001) # 加点延时,不然看不到效果
  13. if cnt > 0:
  14. dic['cnt'] -= 1
  15. print(f'线程{index};获取到:{cnt}号票;剩下={dic["cnt"]};')
  16. else:
  17. lock.release()
  18. break
  19. lock.release()
  20. except:
  21. break
  22. print(f'线程{index};cnt=[{dic["cnt"]}],退出')
  23. def main():
  24. lock = Lock()
  25. dic = {}
  26. dic['cnt'] = 20
  27. t_list = []
  28. for i in range(5):
  29. t = Thread(target=func, args=(i, dic,lock,)) # 线程中数据是共享的,看dic中数据的变化
  30. # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出
  31. t.start()
  32. # print(t.getName()) # 获取线程名称
  33. t_list.append(t)
  34. print(f'主线程,进程id={os.getpid()}')
  35. for t in t_list:
  36. t.join() # 等待线程的结束,非必须。
  37. print(f'主线程;cnt=[{dic["cnt"]}],退出')
  38. if __name__ == '__main__':
  39. main()

把锁注释掉,看运行结果:

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. from multiprocessing import Lock
  4. import os
  5. import time
  6. def func(index,dic,lock):
  7. print(f'线程{index};进程id={os.getpid()}')
  8. while True:
  9. try:
  10. # lock.acquire()
  11. cnt = dic['cnt']
  12. time.sleep(0.0001) # 加点延时,不然看不到效果
  13. if cnt > 0:
  14. dic['cnt'] -= 1
  15. print(f'线程{index};获取到:{cnt}号票;剩下={dic["cnt"]};')
  16. else:
  17. # lock.release()
  18. break
  19. # lock.release()
  20. except:
  21. break
  22. print(f'线程{index};cnt=[{dic["cnt"]}],退出')
  23. def main():
  24. lock = Lock()
  25. dic = {}
  26. dic['cnt'] = 20
  27. t_list = []
  28. for i in range(5):
  29. t = Thread(target=func, args=(i, dic,lock,)) # 线程中数据是共享的,看dic中数据的变化
  30. # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出
  31. t.start()
  32. # print(t.getName()) # 获取线程名称
  33. t_list.append(t)
  34. print(f'主线程,进程id={os.getpid()}')
  35. for t in t_list:
  36. t.join() # 等待线程的结束,非必须。
  37. print(f'主线程;cnt=[{dic["cnt"]}],退出')
  38. if __name__ == '__main__':
  39. main()

2.2:死锁

死锁就是有多个锁,多个线程中互相等待对方的锁。

比如:

有两个锁:锁A,锁B;

有两个线程:线程A,线程B

在线程A中已获取锁A,在线程B中已获取锁B,然后在线程A中要获取锁B,而此时锁B已在线程B中被占有,线程A中只能等待;在线程B中,又要获取锁A,而锁A已在线程A中被占有,这时线程B只能等待;就这样形成了,线程A,线程B分别在等待对方释放自己所要的锁。
 

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. from multiprocessing import Lock
  4. import os
  5. import time
  6. from threading import Thread,Lock
  7. import time
  8. def funcA(lockA,lockB):
  9. print(" funcA:需要A锁")
  10. lockA.acquire()
  11. time.sleep(0.2)
  12. print(" funcA:获得了:A锁")
  13. print(" funcA:需要B锁")
  14. lockB.acquire() # B锁 在funcB中被占有,而在funcB中又在等待A锁的释放
  15. print(" funcA:获得了:B锁")
  16. lockB.release()
  17. lockA.release()
  18. def funcB(lockA,lockB):
  19. print(" funcB:需要B锁")
  20. lockB.acquire()
  21. time.sleep(0.2)
  22. print(" funcB:获得了:B锁")
  23. print(" funcB:需要A锁") # A锁 在funcA
  24. lockA.acquire() # A锁 在funcA中被占有,而在funcA中又在等待B锁的释放
  25. print(" funcB:获得了:A锁")
  26. lockA.release()
  27. lockB.release()
  28. def main():
  29. lockA = Lock()
  30. lockB = Lock()
  31. t_a = Thread(target=funcA, args=(lockA, lockB,))
  32. t_a.start()
  33. t_b = Thread(target=funcB, args=(lockA, lockB,))
  34. t_b.start()
  35. print("结束主线程")
  36. if __name__ == "__main__":
  37. main()

2.3:递归锁:RLock

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. from multiprocessing import Lock
  4. import os
  5. import time
  6. from threading import Thread,RLock # 递归锁
  7. import time
  8. def funcA(lockA,lockB):
  9. print(" funcA:需要A锁")
  10. lockA.acquire()
  11. time.sleep(0.2)
  12. print(" funcA:获得了:A锁")
  13. print(" funcA:需要B锁")
  14. lockB.acquire()
  15. print(" funcA:获得了:B锁")
  16. lockB.release()
  17. lockA.release()
  18. def funcB(lockA,lockB):
  19. print(" funcB:需要B锁")
  20. lockB.acquire()
  21. time.sleep(0.2)
  22. print(" funcB:获得了:B锁")
  23. print(" funcB:需要A锁")
  24. lockA.acquire()
  25. print(" funcB:获得了:A锁")
  26. lockA.release()
  27. lockB.release()
  28. def main():
  29. lockA = lockB = RLock()
  30. t_a = Thread(target=funcA, args=(lockA, lockB,))
  31. t_a.start()
  32. t_b = Thread(target=funcB, args=(lockA, lockB,))
  33. t_b.start()
  34. print("结束主线程")
  35. if __name__ == "__main__":
  36. main()

2.4:信号量:BoundedSemaphore

互斥锁同时只允许一个线程更改数据,而信号量是同时允许多少个线程同时运行。

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread,BoundedSemaphore,active_count
  3. import time
  4. def func(index, semaphore):
  5. semaphore.acquire() #加锁
  6. print(f"线程:{index} 运行中...,当前活动线程数:{active_count()}")
  7. time.sleep(5) # 为了看到效果,休眠久点
  8. semaphore.release() #释放
  9. def main():
  10. semaphore = BoundedSemaphore(3) # 最多允许3个线程同时运行
  11. for i in range(20):
  12. t = Thread(target=func, args=(i, semaphore))
  13. t.start()
  14. print(f"主线程:当前活动线程数:{active_count()}")
  15. if __name__ == '__main__':
  16. main()

2.5:事件:Event

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from threading import Thread,Event
  4. def light(e):
  5. while 1:
  6. print('现在是红灯:')
  7. time.sleep(5)
  8. e.set() # 设置event的状态值为True ;
  9. print('现在是绿灯:')
  10. time.sleep(3)
  11. e.clear() # 恢复event的状态值为False。
  12. def car(index,e):
  13. if e.is_set(): # 返回event的状态值;
  14. # 状态为True
  15. print(f' 现在是绿灯,{index}:过马路中!')
  16. else:
  17. print(f' 现在是红灯{index}:等待中!')
  18. e.wait() # 如果 event.isSet()==False将阻塞
  19. print(f' 红灯变绿灯,{index}:可以走了!')
  20. def main():
  21. e = Event()
  22. lgh = Thread(target=light, args=(e,))
  23. lgh.start()
  24. cnt = 0
  25. while 1:
  26. time.sleep(1) # 每隔1秒来一辆car
  27. t1 = Thread(target=car, args=(cnt, e,))
  28. t1.start()
  29. cnt += 1
  30. if __name__ == '__main__':
  31. main()

2.6:线程池

2.6.1:submit方法

  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  3. import time
  4. from threading import current_thread
  5. def func(index):
  6. # print(index,current_thread().ident)
  7. time.sleep(0.1)
  8. return [index,index**2]
  9. if __name__ == "__main__":
  10. t_p = ThreadPoolExecutor(max_workers=6)
  11. t_ret_list = []
  12. for i in range(20):
  13. t = t_p.submit(func, i)
  14. t_ret_list.append(t)
  15. for ret in t_ret_list:
  16. print(ret.result())

2.6.2:map方法

  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  3. import time
  4. from threading import current_thread
  5. def func(index):
  6. # print(index,current_thread().ident)
  7. time.sleep(0.1)
  8. return [index,index**2]
  9. if __name__ == "__main__":
  10. t_p = ThreadPoolExecutor(max_workers=6)
  11. map_ret = t_p.map(func,range(20))
  12. print(map_ret)
  13. for ret in map_ret:
  14. print(ret)

 

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号