当前位置:   article > 正文

python中3个线程并发实现_Python学习并发与多线程

pytest-parallel的并发3个线程同时运行

1、并发

1.1、并发与并行

并行,parallel,同一时刻,执行不同任务,并且相互没有干扰;

并发,concurrency,一段时间内,交替执行不同的任务;

串行,一个任务执行完成后执行下一个任务;

1.2、并发的解决方法

“高并发模型”:例如早高峰的北京地铁,在同一时刻,需要处理大量任务,可以理解为高并发模型;

解决方法:

(1)队列,缓冲区:将任务排队,形成队列,先进先出,就解决了资源的使用问题;形成的队列其实就是一个缓冲区域,假设排队有一种优先机制,例如女士优先,则次队列为优先对列;

(2)争抢:谁抢到资源就上锁,排他性的锁,其他任务只能等待;争抢是一种高并发的解决方案,但是这样不好,因为可能有任务长期霸占资源,有人一直抢不到资源;

(3)预处理:将各个任务会用到的热点数据提前缓存,这样就减少了任务的执行时间;

(4)并行:开辟多的资源,例如银行办理业务,排队的人多了,可以多增加业务窗口;此方案为水平扩展的思想;

(5)提速:简单的就是提高资源性能,提高cpu性能或者单个服务器安装更多的cpu,此方案为垂直扩展

(6)消息中间件:模型可以理解为北京地铁站外面的排队走廊,具有缓冲人流量的功能;常见的消息队列有:RabbitMQ、ActiveMQ、RocketMQ(阿里)、kafka等

2、进程与线程

# 进程与线程的关系:

程序是源码编译后的文件,而这些文件被存放在磁盘上;当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源),进程是线程的容器;线程是操作系统进行运算调度的最小单位;

2.1、线程的状态

2.2、python中的进程与线程

# 进程会启动一个解释器进程,线程共享一个解释器进程;会有一个GIL,全局解释器锁,来分配具体线程任务执行;python的GIL保证同一时刻只有一个线程被执行;

3、Python的多线程开发

3.1、Thread类

# Python的线程开发使用标准款threading;

# Thread类的初始化方法

def __init__(self, group=None, target=None, name=None,

args=(), kwargs=None, *, daemon=None):

参数:

target  线程调用的对象,就是目标函数;

name   为线程起名

args   为目标函数传递位置参数,元祖;

kwargs  为目标函数传递关键字参数,字典

(1)线程的启动:

通过threading.Thread创建一个线程对象,target是目标函数,name指定线程名称,到此线程被创建完成,需要调用start() 方法来启动线程 ;

importthreadingdefwork():print('I m working')print('Finished')

t=threading.Thread(target=work,name='workerthread') # 创建线程t

t.start() # 启动线程t

(2)线程的退出:

# python没有提供线程的退出方法,但是线程在以下情况会退出:

①:线程函数内语句执行完成;

②:线程函数中抛出未处理异常;

#python的线程没有优先级、没有线程组的概念,也不能被销毁,停止,挂起,也没有恢复和中断;

(3)线程的传参

# 线程的传参本质上和函数没有区别

importthreadingdefadd(x,y):print(x+y)

t=threading.Thread(target=add,args=(4,5),name='workerthread') # args=(4,5)传递位置参数;

t.start()

(4)threading的属性和方法

# current_thread()  返回当前线程对象

# main_thread() 返回主线程对象

# active_count() 返回处于alive状态的线程数

# enumerate() 返回所有活着的线程列表,不包括已经终止的线程和未开始的线程

# get_ident() 返回当前线程的id,非0整数

(5)Thread实例的属性和方法

# name:线程名称

# ident:线程id,线程启动后才有id,否则为None

# is_alive():是否存活

# start()方法与run()方法

①:start() 启动线程,每一个线程必须并且只能执行一次该方法;

defstart(self):if notself._initialized:raise RuntimeError("thread.__init__() not called")ifself._started.is_set():raise RuntimeError("threads can only be started once")

with _active_limbo_lock:

_limbo[self]=selftry:

_start_new_thread(self._bootstrap, ())#启动一个线程

exceptException:

with _active_limbo_lock:del_limbo[self]raiseself._started.wait()

②:run()  并不启动新线程,只是在主线程中调用了一个函数,调用定义线程对象中的target函数;

defrun(self):try:ifself._target:

self._target(*self._args, **self._kwargs) #执行target函数

finally:#Avoid a refcycle if the thread is running a function with

#an argument that has a member that points to the thread.

del self._target, self._args, self._kwargs

3.2、多线程

#  多线程,一个进程中如果有多个线程,就是多线程,实现一种并发;

# 一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程;一个进程至少有一个主线程,其他线程为工作线程;

# 线程安全与线程不安全

defworker():for x in range(100):print('{} is runing'.format(threading.current_thread().name))for x in range(1,5):

name='work-{}'.format(x)

t= threading.Thread(target=worker,name=name)

t.start()

运行结果:

work-1 isruning

work-2 is runingwork-1 is runingwork-3 isruning

work-4 isruning

work-3 is runingwork-2 isruning

work-1 isruning

分析:

看代码,应该是一行行打印,但是很多字符串打印在了一起?

说明print函数被打断了,被线程切换打断了,print函数分2步,第一步打印字符串,第二步打印换行,就在2步之间被打断,发生了线程切换

每个线程是根据系统调用cpu分时计算的,可能出现线程1的函数还没有执行完成,则cpu资源被线程2给抢占,则去执行线程2的函数;

解决方法:字符串是不可变数据类型,它作为一个整体不可分割输出,end=''不再让print换行,但没有解决根本问题,应为print函数是线程不安全

3.3、daemon线程与non-daemon线程

# 注意:daemon不是linux中的守护进程,daemon属性必须在start方法之前设置好;

在主线程运行结束后会检查是否含有,没有执行完成的non-daemon线程,

如果有则等待non-daemon线程运行完成;

没有则直接结束线程;

源码中Thread的 __init__方法中

if daemon is notNone:

self._daemonic=daemon #设定daemon的布尔值 ,true为daemon,false为non-daemon

else:

self._daemonic=current_thread().daemon #如果为None则从父线程获取;

# 总结:

①:线程具有一个daemon属性,可以显示设置为True或False,也可以不设置,则默认为None;

②:如果不设置daemon,就取当前线程的daemon来设置它;

③:主线程是non-daemon,即daemon=False;

④:从主线程创建的所有线程不设置daemon,则默认daemon=False,也就是non-daemon线程;

⑤:Python程序在没有活的non-daemon线程运行时退出,也就是剩下的只能是daemon线程,主线程才能退出,否则主线程只能等待;

3.4、join方法

importtimeimportthreadingdeffoo(n):for i inrange(n):print(i)

time.sleep(1)

t1=threading.Thread(target=foo,args=(10,),daemon=True)

t1.start()

t1.join()#表示主线程,必须等到t1线程结束以后才结束;

print('main thread end')'''运行结果:

0

1

2

.

.

9

main thread end

分析:由于t1.join()方法后,将主线程阻塞,等待t1线程结束;'''

# join(timeout=None),是线程的标准方法之一;

# 一个线程中调用另外一个线程的join方法,调用着将被阻塞,直到被调用线程终止;

# 一个线程可以被join多次,timout参数指定调用则等待多久,没有设置超时时间,就一直等待到被调用者线程终止;设置超时时间如果被调线程没有结束,则调用者不等待,直接结束线程;

# 调用谁的join方法,就是join谁,就要等待谁;

4、threading.local类

# python提供的threading.local类,将这个类实例化得到一个全局对象,但是不同的线程使用这个对象存储数据其他线程看不见;

# 本质:threading.local类构建了一个大字典,存放所有线程相关的字典,定义如下:{ id(Thread) -> (ref(Thread), thread-local dict) }

# 每一个线程的id为key,元祖为value;

# value中2部分为:线程对象引用,每个线程自己的字典;

from threading importlocalimportthreadingimporttime

a=local()defworker():

a.x=0for i in range(10):#print(a.__dict__,current_thread().ident)

time.sleep(0.0001)

a.x+=1

print(threading.current_thread(),a.x,a.__dict__)for i in range(5):

threading.Thread(target=worker).start()'''运行结构:

10 {'x': 10}

10 {'x': 10}

10 {'x': 10}

10 {'x': 10}

10 {'x': 10}'''

5、定时器Timer

# Timer是线程Thread的子类,就是线程类,具有线程的属性和方法;

# 它的实例就是能延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它;

# cancel方法本质使用Event类实现;

# 自定义实现Timer类:

classMyTimer:def __init__(self,interval,func,args=(),kwargs={}):

self.interval=interval

self.func=func

self.args=args

self.kwargs=kwargs

self.event=Event()defcancel(self):print("call cancel------>")

self.event.set()defstart(self):

Thread(target=self._run).start() # 在这里起一个线程是为了避免在主线程被阻塞,执行cancel方法不生效的问题def_run(self):

self.event.wait(self.interval)if notself.event.is_set():

self.func(*self.args,**self.kwargs)#Thread(target=self.func,args=self.args,kwargs=self.kwargs).start()

self.event.set()defadd(x,y):print(3,'========>',x +y)

t1=MyTimer(3,add,(4,5))

t1.start()

t1.cancel()print('Main Thread end')

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/649480
推荐阅读
相关标签
  

闽ICP备14008679号