赞
踩
多线程执行-Thread
import threading
import time
def saySorry():
print("亲爱的,我错了,我能吃饭了吗?")
time.sleep(1)
if __name__ == "__main__":
for i in range(5):
t = threading.Thread(target=saySorry)
t.start() #启动线程,即让线程开始执行
# 运行结果
亲爱的,我错了,我能吃饭了吗?
亲爱的,我错了,我能吃饭了吗?
亲爱的,我错了,我能吃饭了吗?
亲爱的,我错了,我能吃饭了吗?
亲爱的,我错了,我能吃饭了吗?
使用线程完成多任务
from threading import Thread
import time
#如果多个线程执行的都是同一个函数的话,各自之间不会有影响,各是个的
def test():
print("----test----")
time.sleep(1)
def main():
for i in range(5):
t = Thread(target=test) #创建子线程
t.start()
if __name__ == "__main__":
main()
# 运行结果
----test----
----test----
----test----
----test----
----test----
使用线程的第二种方式
from threading import Thread
import time
class MyThread(Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm "+self.name+' @ '+str(i) #name属性中保存的是当前线程的名字
print(msg)
def main():
t = MyThread()#创建一个实列对象
t.start()#创建一个线程 这个线程执行run方法
#各自是各自的run方法互不影响 除非你用__new__方法创建了单列
t2 = MyThread()#在创建一个实列对象
t2.start()#在创建一个线程 这个线程执行run方法
if __name__ == "__main__":
main()
# 运行结果
I'm Thread-1 @ 0
I'm Thread-2 @ 0
I'm Thread-1 @ 1
I'm Thread-2 @ 1
I'm Thread-1 @ 2
I'm Thread-2 @ 2
线程的执行顺序
import threading
import time
#那个线程都执行 这是不确定的 这要看操作系统的调度算法 怎么决定
#进程也是这样 谁先执行这也是不确定的 这要看操作系统的调度算法 怎么决定
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm "+self.name+' @ '+str(i)
print(msg)
def test():
for i in range(5):
t = MyThread() #创建了5个线程
t.start()
if __name__ == '__main__':
test()
#从代码和执行结果我们可以看出,多线程程序的执行顺序是不确定的。当执行到sleep语句时,线程将被阻塞(Blocked),到sleep结束后,线程进入就绪(Runnable)状态,等待调度。而线程调度将自行选择一个线程执行。上面的代码中只能保证每个线程都运行完整个run函数,但是线程的启动顺序、run函数中每次循环的执行顺序都不能确定。
# 运行结果
I'm Thread-2 @ 0
I'm Thread-1 @ 0
I'm Thread-4 @ 0
I'm Thread-5 @ 0
I'm Thread-3 @ 0
I'm Thread-1 @ 1
I'm Thread-2 @ 1
I'm Thread-4 @ 1
I'm Thread-3 @ 1
I'm Thread-5 @ 1
I'm Thread-2 @ 2
I'm Thread-4 @ 2
I'm Thread-5 @ 2
I'm Thread-1 @ 2
I'm Thread-3 @ 2
多线程共享全局变量
from threading import Thread
import time
#线程之间共享全局变量
g_num = 100
def work1():
global g_num
for i in range(3):
g_num += 1
print("----in work1, g_num is %d---"%g_num)
def work2():
global g_num
print("----in work2, g_num is %d---"%g_num)
print("---线程创建之前g_num is %d---"%g_num)
t1 = Thread(target=work1)
t1.start()
#延时一会,保证t1线程中的事情做完
time.sleep(1)
t2 = Thread(target=work2)
t2.start()
# 运行结果
---线程创建之前g_num is 100---
----in work1, g_num is 103---
----in work2, g_num is 103---
线程共享全局变量的问题
from threading import Thread
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
#time.sleep(3) #取消屏蔽之后 再次运行程序,结果会不一样,,,为啥呢?
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
# 运行结果
---g_num=195989---
---test1---g_num=1196445
---test2---g_num=1396538
列表传递给线程
from threading import Thread
import time
def work1(nums):
nums.append(44)
print("----in work1---",nums)
def work2(nums):
#延时一会,保证t1线程中的事情做完
time.sleep(1)
print("----in work2---",nums)
g_nums = [11,22,33]
t1 = Thread(target=work1, args=(g_nums,))
t1.start()
t2 = Thread(target=work2, args=(g_nums,))
t2.start()
# 运行结果
----in work1--- [11, 22, 33, 44]
----in work2--- [11, 22, 33, 44]
避免多线程对共享数据出错的方式
from threading import Thread
import time
g_num = 0
g_flag = 1
def test1():
global g_num
global g_flag
if g_flag == 1:
for i in range(1000000):
g_num += 1
g_flag = 0
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
#轮询
while True:
if g_flag != 1:
for i in range(1000000):
g_num += 1
break
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
#time.sleep(3) #取消屏蔽之后 再次运行程序,结果会不一样,,,为啥呢?
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
# 运行结果
---g_num=159601---
---test1---g_num=1000000
---test2---g_num=2000000
使用互斥锁
from threading import Thread,Lock
import time
g_num = 0
def test1():
global g_num
#上锁
mutex.acquire() #这个线程和test2线程都在抢着 对这个锁 进行上锁, 如果有一方成功的上锁,那么导致另外
#一方会堵塞 (一直等待) 到这个锁被解开为止
for i in range(1000000):
g_num += 1
#解锁
mutex.release()#用来对mutex指向的这个锁 进行解锁 , , , 只要开了锁,那么接下来会让所有因为
#这个锁 被上了锁 而堵塞的线程 进行抢着上锁
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
mutex.acquire()
for i in range(1000000):
g_num += 1
mutex.release()
print("---test2---g_num=%d"%g_num)
mutex = Lock()
p1 = Thread(target=test1)
p1.start()
#time.sleep(3) #取消屏蔽之后 再次运行程序,结果会不一样,,,为啥呢?
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
# 运行结果
---g_num=203219---
---test1---g_num=1000000
---test2---g_num=2000000
上锁放到for里面
from threading import Thread,Lock
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
mutex.acquire()
g_num += 1
mutex.release()
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
mutex.acquire()
g_num += 1
mutex.release()
print("---test2---g_num=%d"%g_num)
mutex = Lock()
p1 = Thread(target=test1)
p1.start()
#time.sleep(3) #取消屏蔽之后 再次运行程序,结果会不一样,,,为啥呢?
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
# 运行结果
---g_num=33913---
---test1---g_num=1890722
---test2---g_num=2000000
多个线程使用非全局变量
from threading import Thread
import threading
import time
#线程里共享数据全局变量 非共享数据函数里的变量
#在多线程开发中,全局变量是多个线程都共享的数据,而局部变量等是各自线程的,是非共享的
def test1():
g_num = 100
name = threading.current_thread().name
print("thread name is %s"%name)
if name == "Thread-1":
g_num += 1
else:
time.sleep(2)
print("---thread name is %s---g_num=%d"%(name,g_num))
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test1)
p2.start()
# 运行结果
thread name is Thread-1
---thread name is Thread-1---g_num=101
thread name is Thread-2
---thread name is Thread-2---g_num=100
import threading
import time
#避免死锁的放法
#1.程序设计时要尽量避免(银行家算法)
#2.添加超时时间等
class MyThread1(threading.Thread):
def run(self):
if mutexA.acquire():
print(self.name+'----do1---up----')
time.sleep(1)
if mutexB.acquire(timeout=2): #添加超时时间 timeout=2
#我在这里等两秒,如果两秒后你还没满足这个条件
#这个锁我不上了,直接执行下面的 mutexA.release()开锁功能
print(self.name+'----do1---down----')
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
if mutexB.acquire():
print(self.name+'----do2---up----')
time.sleep(1)
if mutexA.acquire(timeout=2):
print(self.name+'----do2---down----')
mutexA.release()
mutexB.release()
mutexA = threading.Lock() #这里的类实列是全局变量 所以在这个类里可以直接使用另一个类的实列
mutexB = threading.Lock() #这个实列也是全局变量 所以在类里他们可以相互使用
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
# 运行结果
Thread-1----do1---up----
Thread-2----do2---up----
同步应用
from threading import Thread,Lock
from time import sleep
#程序第一圈
#先跟lock1上锁,后面两个上不了锁,因为主线程已经跟他们上了锁
#lock1执行完,跟lock2解锁了,这时候lock1,lock3都是上锁的,所以
#跟lock2上锁,lock2执行完了后跟lock3解锁了,这时候lock1,lock2
#都是上锁的,只有lock3是开锁的,所以这时候跟lock3上锁,执行完
#跟lock1解锁,这时候lock2,lock3是上锁的,现在跟lock1上锁...
#一直这样循环下去......
#锁只要上了一回 在运行到上锁的位置是等待的上不了 只能解锁了 在上锁
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print("------Task 1 -----")
sleep(0.5)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print("------Task 2 -----")
sleep(0.5)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print("------Task 3 -----")
sleep(0.5)
lock1.release()
#使用Lock创建出的锁默认没有“锁上”
lock1 = Lock()
#创建另外一把锁,并且“锁上”
lock2 = Lock()
lock2.acquire()
#创建另外一把锁,并且“锁上”
lock3 = Lock()
lock3.acquire()
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
# 运行结果
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
死循环
生产者与消费者模式
#encoding=utf-8
import threading
import time
#python2中
#from Queue import Queue
#python3中
from queue import Queue #这是线程当中的队列不能在进程当中用 进程当中的队列线程当中也不能用 进程里面的队列用法跟线程里面的队列用法一样
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count = count +1
msg = '生成产品'+str(count)
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消费了 '+queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
for i in range(500):
queue.put('初始产品'+str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()
# 运行结果
生成产品1
生成产品2
生成产品1
生成产品3
生成产品2
Thread-4消费了 初始产品1
Thread-3消费了 初始产品0
生成产品3
Thread-6消费了 初始产品3
Thread-5消费了 初始产品2
Thread-3消费了 初始产品6
生成产品4
Thread-4消费了 初始产品4
Thread-7消费了 初始产品5
Thread-3消费了 初始产品9
生成产品5
Thread-4消费了 初始产品10
Thread-7消费了 初始产品11
Thread-6消费了 初始产品7
生成产品6
生成产品7
生成产品4
生成产品5
Thread-7消费了 初始产品12
生成产品8
Thread-6消费了 初始产品13
生成产品9
Thread-5消费了 初始产品8
生成产品10
生成产品6
生成产品11
生成产品7
生成产品12
Thread-5消费了 初始产品14
生成产品13
生成产品8
生成产品14
生成产品9
生成产品15
生成产品10
生成产品16
生成产品11
生成产品17
生成产品12
生成产品18
生成产品13
生成产品19
生成产品14
生成产品20
生成产品15
生成产品21
生成产品16
生成产品22
生成产品17
生成产品23
生成产品18
生成产品24
生成产品19
生成产品25
生成产品20
生成产品26
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local() #你用线程给local_school添加了属性 调用这个属性的时候只会是这个线程的属性值
#不会因为多个线程对它的同一个属性添加 值而改变 是那个线程添加的调用的时候
#就是那个线程的值
#这个好处不需要函数间的传递,也不会因为多个线程的调用全局变量那样发生数据错乱
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('dongGe',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('老王',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
# 运行结果
Hello, dongGe (in Thread-A)
Hello, 老王 (in Thread-B)
异步
#同步调用就是你 喊 你朋友吃饭 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你们一起去
#异步调用就是你 喊 你朋友吃饭 ,你朋友说知道了 ,待会忙完去找你 ,你就去做别的了。
from multiprocessing import Pool
import time
import os
def test():
print("---进程池中的进程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid()))
for i in range(3):
print("----%d---"%i)
time.sleep(1)
return "hahah"
def test2(args):
print("---callback func--pid=%d"%os.getpid())
print("---callback func--args=%s"%args)
pool = Pool(3)
pool.apply_async(func=test,callback=test2) #子进程执行test子进程执行完了结束了,父进程原本卡在
#time.sleep(5)操作系统告诉主进程你儿子死了,主进程
#查看callback = 谁 主进程就去执行它
#args接收子进程的返回值
#time.sleep(5)
#print("----主进程-pid=%d----"%os.getpid())
#异步 是我正做这一件事,不知道什么时候能做完,让我去做另外一件事
while True:
time.sleep(1)
print("----主进程-pid=%d----"%os.getpid())
# 运行结果
---进程池中的进程---pid=2350,ppid=2349--
----0---
----1---
----主进程-pid=2349----
----2---
----主进程-pid=2349----
---callback func--pid=2349
---callback func--args=hahah
----主进程-pid=2349----
----主进程-pid=2349----
----主进程-pid=2349----
----主进程-pid=2349----
----主进程-pid=2349----
----主进程-pid=2349----
----主进程-pid=2349----
----主进程-pid=2349----
死循环
互斥锁
互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
互斥锁为资源引入一个状态:锁定/非锁定。
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
threading模块中定义了Lock类,可以方便的处理锁定:
#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([blocking])
#释放
mutex.release()
其中,锁定方法acquire可以有一个blocking参数。
如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True)
如果设定blocking为False,则当前线程不会堵塞
上锁解锁过程
当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。
每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。
线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
总结
锁的好处:
确保了某段关键代码只能由一个线程从头到尾完整地执行
锁的坏处:
阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
进程与线程的对比
进程 vs 线程
功能:
1.进程,能够完成多任务,比如 在一台电脑上能够同时运行多个QQ
2.线程,能够完成多任务,比如 一个QQ中的多个聊天窗口
定义的不同
进程是系统进行资源分配和调度的一个独立单位.
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
区别
一个程序至少有一个进程,一个进程至少有一个线程.
线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。
进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
线线程不能够独立执行,必须依存在进程中
优缺点
线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。
生产者与消费者
Queue的说明
对于Queue,在多线程通信之间扮演重要的角色
添加数据到队列中,使用put()方法
从队列中取数据,使用get()方法
判断队列中是否还有数据,使用qsize()方法
生产者消费者模式的说明
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,
同步和异步
同步的概念
结合程序 06 看
1. 多线程开发可能遇到的问题
假设两个线程t1和t2都要对num=0进行增1运算,t1和t2都各对num修改10次,num的最终的结果应该为20。
但是由于是多线程访问,有可能出现下面情况:
在num=0时,t1取得num=0。此时系统把t1调度为”sleeping”状态,把t2转换为”running”状态,t2也获得num=0。然后t2对得到的值进行加1并赋给num,使得num=1。然后系统又把t2调度为”sleeping”,把t1转为”running”。线程t1又把它之前得到的0加1后赋值给num。这样,明明t1和t2都完成了1次加1工作,但结果仍然是num=1。
2. 什么是同步
同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。
"同"字从字面上容易理解为一起动作
其实不是,"同"字应是指协同、协助、互相配合。
如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。
3. 解决问题的思路
对于本小节提出的那个计算错误的问题,可以通过线程同步来进行解决
思路,如下:
1.系统调用t1,然后获取到num的值为0,此时上一把锁,即不允许其他现在操作num
2.对num的值进行+1
3.解锁,此时num的值为1,其他的线程就可以使用num了,而且是num的值不是0而是1
4.同理其他线程在对num进行修改时,都要先上锁,处理完后再解锁,在上锁的整个过程中不允许其他线程访问,就保证了数据的正确性
异步
异步是没有先后顺序的,不确定的
总结
总结
1.每个线程一定会有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。
2.当线程的run()方法结束时该线程完成。
3.无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式。
4.线程的几种状态
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。