前言
对于python来说,因为有DIL锁的存在,在同一个进程中,一个cpu中同一时刻只能运行一个线程,无法并行,只能并发,但是python可以创建多个进程,每个进程可以分别占用一个cpu来运行线程,实现并行,但是创建多个进程很占用内存。
并发:可以运行多个程序,不是同时进行进行
并行:可以同时运行多个程序
同步和异步:同步是一个任务遇到io等阻塞,一直等待用户完成操作,再执行下面的程序;异步如果遇到io等阻塞,先执行下面的程序,一旦等到用户完成操作,在回来执行之前的程序。
目录
线程创建和常用方法
方法一:
import time def Hi(name): print('你好:%s'%name) time.sleep(2) #睡两秒 if __name__ == '__main__': #创建了两个子线程 # target是目标函数,args是函数的参数(以元组的形式传入) t1=threading.Thread(target=Hi,args=("chen",)) t2=threading.Thread(target=Hi,args=('liu',)) t1.start() t2.start()
方法二:通过继承
import threading import time #通过类继承 class MyThread(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = name def run(self): #开启线程后,默认执行run方法 print("你好:%s\n"%self.name) time.sleep(3) if __name__=='__main__': t1 = MyThread('chen') t2 = MyThread('liu') t1.start() t2.start() print('endind....')
join():一旦子线调用了这个方法,主线程需要等子线程运行完了再运行主线程,不让主线程和子线程是同时运行的。
setDaemon(True):一旦子线程调用这个方法,就将子线程设置为守护线程(随着主线程的结束而结束)。 注:要在start()之前调用
同步锁
当不同线程同时使用一个全局变量的时候,就会出现数据混乱,同步锁的作用是,给线程的一段代码使用同步锁,在这段代码执行时,同一时刻只有一个线程在运行,也就是说加了同步锁的这段代码是串行的
- import threading
- import time
-
- def sub():
-
- global num
-
- lock.acquire() #添加同步锁
- temp = num
- time.sleep(0.001)
- num = temp-1
- lock.release() #释放同步锁
-
- num = 100
- l = []
-
- lock = threading.Lock() #创建同步锁
-
- for i in range(100):
- t = threading.Thread(target=sub)
- t.start()
- l.append(t)
-
- for t in l:
- t.join()
-
- print(num)
递归锁
在同步锁中,会产生一个问题,如果在两个线程中,它们互相需要对方的资源,但是双方又不能先把资源给对方,这样就会一直卡住,这样就引入递归锁。
- import threading
- import time
-
-
- class MyThread(threading.Thread):
-
- def actionA(self):
-
- A.acquire() #请求A的同步锁
- print("%s gotA %s\n"%(self.name,time.ctime()))
- time.sleep(2)
-
- B.acquire() #请求B的同步锁
- print("%s gotB %s\n"%(self.name,time.ctime()))
- time.sleep(1)
- B.release() #释放A的同步锁
-
- A.release() #释放B的同步锁
-
-
- def actionB(self):
-
- B.acquire()#请求B的同步锁
- print("%s gotA %s\n"%(self.name,time.ctime()))
- time.sleep(2)
- A.acquire() #请求A的同步锁
- print("%s gotB %s\n"%(self.name,time.ctime()))
- time.sleep(1)
- A.release() #释放A的同步锁
- B.release() #释放B的同步锁
-
-
- def run(self):
- self.actionA()
- time.sleep(0.5)
- self.actionB()
-
- if __name__ == '__main__':
- A=threading.Lock()
- B=threading.Lock()
- L=[]
- for i in range(5):
- t=MyThread()
- t.start()
- L.append(t)
-
- for i in L:
- i.join()
-
- print("ending....")
-
- #######################################################
- Thread-1 gotA Sat May 4 12:05:52 2019
-
- Thread-1 gotB Sat May 4 12:05:54 2019
-
- Thread-1 gotA Sat May 4 12:05:55 2019
-
- Thread-2 gotA Sat May 4 12:05:55 2019
-
- 卡住......
线程1首先完成actionA()方法,然后别的线程开始执行,也就是说,这个时候,线程2的actionA()方法和线程1的actionB()方法同时执行,然后线程1中的actionA()的方法拿到A的同步锁,此时线程2中的actionB()拿到B的同步锁,之后线程1请求B的同步锁,但是B锁已经被线程1给拿走了,同样的线程1请求A的同步锁,但是A锁已经被线程2给拿走了,此时两个线程都等对方先释放锁,就卡住 了。
因此使用递归锁
- import threading
- import time
-
-
- class MyThread(threading.Thread):
-
- def actionA(self):
-
- r_lcok.acquire() #count=1
- print(self.name,"gotA",time.ctime())
- time.sleep(2)
- r_lcok.acquire() #count=2
- print(self.name, "gotB", time.ctime())
- time.sleep(1)
- r_lcok.release() #count=1
- r_lcok.release() #count=0
-
-
- def actionB(self):
-
- r_lcok.acquire()
- print(self.name, "gotB", time.ctime())
- time.sleep(2)
- r_lcok.acquire()
- print(self.name, "gotA", time.ctime())
- time.sleep(1)
- r_lcok.release()
- r_lcok.release()
-
- def run(self):
- self.actionA()
- self.actionB()
-
-
- if __name__ == '__main__':
-
- # A=threading.Lock()
- # B=threading.Lock()
- r_lcok=threading.RLock() #使用递归锁
- L=[]
-
- for i in range(5):
- t=MyThread()
- t.start()
- L.append(t)
-
- for i in L:
- i.join()
-
- print("ending....")
递归锁里内部维护这一个计数器count,初始值为0,当请求锁的时候就加1,释放锁的时候就减1,只要计数器大于零,就没有别的线程可以拿到这把锁
同步对象(event)
相当于一个线程标记(flag)
- #创建event对象
- event = threading.Event()
- event.wait() #相当于一个阻塞状态,直到别的线程event.set()
- event.set() #先当与告诉别的已经wait()的线程不用再阻塞了
- event.clear() #清除event,用来重新wait()
信号量
设置同一时刻线程的执行数目
- import threading,time
-
- class myThread(threading.Thread):
- def run(self):
-
- if semaphore.acquire(): #设置信号量
- print(self.name)
- time.sleep(3)
- semaphore.release() #释放信号量
-
- if __name__=="__main__":
- semaphore=threading.Semaphore(5) #设置5把锁,只有5个线程可以进入,同一时刻只能执行5个线程
-
- thrs=[]
- for i in range(100): #这里开启了100个子线程,如何不设置信号量,那100个子线程将同时执行
- thrs.append(myThread())
- for t in thrs:
- t.start()
线程队列
- import queue # 线程 队列
-
- q=queue.Queue() # 创建一个队列,可以设置队列里的最大存放个数
-
- q.put(12)
- q.put("hello")
- q.put({"name":"yuan"}) #在put时,如果队列的值是满的,就会卡住,直到别的线程将值取走,队列里有空位了,才将值放进队列里
-
- while 1:
- data=q.get() #如果这里的get()是里面没有值的话,就会一直卡住,直到比的线程往队列里面put()值
- print(data)
- print("----------")
put('chen',False) 可以设置如果队列里是满的,就报错,同样 get(block=False) 也是一样
q=queue.LifoQueue() 创建后进先出的队列,相当于栈
q=queue.PriorityQueue() 创建按照优先级决定进出的队列
- import queue # 线程 队列
-
- q=queue.PriorityQueue() # 创建一个优先级队列
-
- q.put([3,12]) #列表带着优先级,数字越小,优先级越高
- q.put([2,"hello"])
- q.put([4,{"name":"yuan"}])
-
- while 1:
- data=q.get()
- print(data[1])
- print("----------")
- ############################
- hello
- ----------
- 12
- ----------
- {'name': 'yuan'}
- ----------
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回true,如果队列不会空,返回false
q.get_nowait() 相当于 q.get(block=False) ,不等待,直接报错
q.task_done() 会给 q.join()发信号
q.join() 等待接收来自 q.task_done() 的信号,不然一直卡住
生产者和消费者模型
在多线程开发过程中,如果生产数据的线程处理的速度很快,但是消费数据的线程处理的很慢,那么就必须等到消费数据的线程处理完,才可以进行生产数据,反之,如果生产数据的处理速度小于消费数据的处理速度,那么消费线程就必须等待生产线程。
因此生产者和消费者模型是在彼此之间不做任何通信,通过阻塞队列来进行通信,生产线程生产完数据后,直接扔给阻塞队列,而消费者直接从阻塞队列里区,阻塞队列就相当于一个缓冲区。(解耦问题)


import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(5) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) # if not q.empty(): # print("waiting.....") #q.join() data = q.get() print("eating....") time.sleep(4) q.task_done() #print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) # else: # print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A君',)) c1 = threading.Thread(target=Consumer, args=('B君',)) c2 = threading.Thread(target=Consumer, args=('C君',)) c3 = threading.Thread(target=Consumer, args=('D君',)) p1.start() c1.start() c2.start() c3.start()