赞
踩
一、线程
1、创建线程
#创建线程
importthreading,timedeftask1():for i in range(5):print('task1 -- 任务:%s' %i)
time.sleep(1)deftask2():for j in range(10):print('task2 -- 任务:%s' %j)
time.sleep(1)defmain():#创建线程
t1 = threading.Thread(target=task1)
t2= threading.Thread(target=task2)#执行线程
t1.start()
t2.start()if __name__ == '__main__':
main()
View Code
2、查看线程数量
importthreading,time#创建线程
importthreading,timedeftask1():for i in range(5):print('task1 -- 任务:%s' %i)
time.sleep(1)deftask2():for j in range(5):print('task2 -- 任务:%s' %j)
time.sleep(1)defmain():#创建线程
t1 = threading.Thread(target=task1)
t2= threading.Thread(target=task2)#执行线程
t1.start()
t2.start()whileTrue:#打印当前线程情况
thread_num =len(threading.enumerate())print("当前线程的数量为:" +str(thread_num))print('线程详情:')print(threading.enumerate())if thread_num <= 1:#当剩下一个线程的情况下(也就是只剩下主线程的情况下,退出)
breaktime.sleep(0.5)if __name__ == '__main__':
main()
View Code
3、通过继承Thread来创建线程
importthreading,timeclassMyThread(threading.Thread):#当线程对象调用start()方法的时候,就会自动调用run()方法
defrun(self):for i in range(5):#self.name:当前线程的名字
print("I'm" + self.name + '@' +str(i))
time.sleep(1)if __name__ == '__main__':#创建线程
t1 =MyThread()
t2=MyThread()#运行线程
t1.start()
t2.start()
View Code
4、多线程共享全局变量
importthreading,time#定义全局变量
g_num = 100
deftask1():globalg_num
g_num+= 1
print("---- in task1: g_num = %d" %g_num)deftask2():print("---- in task2: g_num = %d" %g_num)defmain():
t1= threading.Thread(target=task1)
t2= threading.Thread(target=task2)
t1.start()
time.sleep(1)
t2.start()
time.sleep(1)print("---- in main: g_num = %d" %g_num)if __name__ == '__main__':
main()
View Code
5、args 参数
importthreading,time#定义全局变量
g_nums = [10, 20]deftask1(temp):
temp.append(30)print("---- in task1: temp = %s" %str(temp))deftask2(temp):print("---- in task1: temp = %s" %str(temp))defmain():#args 参数:指定调用函数的时候,传递什么参数过去(args 是元组类型)
t1 = threading.Thread(target=task1, args=(g_nums,))
t2= threading.Thread(target=task2, args=(g_nums,))
t1.start()
time.sleep(1)
t2.start()
time.sleep(1)print("---- in task1: g_nums = %s" %str(g_nums))if __name__ == '__main__':
main()
View Code
6、互斥锁解决资源竞争
importthreading,time#定义一个全局变量
g_num =0#创建一个互斥锁
mutex =threading.Lock()deftask1(num):globalg_numfor i inrange(num):#上锁
mutex.acquire()
g_num+= 1
#解锁
mutex.release()print('---- in stak1: g_num = %d' %g_num)deftask2(num):globalg_numfor i inrange(num):#上锁
mutex.acquire()
g_num+= 1
#解锁
mutex.release()print('---- in stak2: g_num = %d' %g_num)if __name__ == '__main__':#创建线程
t1 = threading.Thread(target=task1, args=(1000000,))
t2= threading.Thread(target=task2, args=(1000000,))#执行线程
t1.start()
t2.start()
time.sleep(2)print('---- in main: g_num = %d' % g_num)
View Code
7、多线程udp 聊天器
importthreading,socketdefrecv_msg(udp_socket):#接收数据
whileTrue:
recv_data= udp_socket.recvfrom(1024)print('接收到的数据为:%s' %str(recv_data))defsend_msg(udp_socket, dest_ip, dest_port):whileTrue:
send_data= input('请输入你要发送的数据:')
udp_socket.sendto(send_data.encode('utf-8'), (dest_ip, dest_port))defmain():#创建套接字
udp_socket =socket.socket(socket.AF_INET, socket.SOCK_DGRAM)#绑定本地信息
udp_socket.bind(("", 8899))#发送数据
dest_ip = "192.168.1.9"dest_port= 8899
#创建线程
t_recv = threading.Thread(target=recv_msg, args=(udp_socket,))
t_send= threading.Thread(target=send_msg, args=(udp_socket,dest_ip,dest_port))
t_recv.start()
t_send.start()if __name__ == '__main__':
main()
View Code
8、总结
"""1、什么是线程?
线程操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
2、线程运行:
通过 t = threading.Thread(target=xxx) 来创建线程。
通过 t.start() 来运行线程。如果没有调用 start() 方法,线程是不会执行的。
3、查看线程数量
可通过 threading.enumerate() 来查看线程数量
4、通过继承Thread来创建线程时,需要重写run()方法。子线程对象在调用start()方法的时候,run()方法就会自动调用。
5、共享全局变量:
1)在一个进程内的所有线程共享全局变量,很方便在多个线程间共享数据
2)缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)
6、传递参数:
创建线程的时候,可以设置参数 args,来指定调用函数的时候,需要传递什么参数。args 的数据类型是元组。
7、互斥锁
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。
互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
8、死锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
避免死锁:
1)程序设计时要尽量避免(银行家算法)
2)添加超时时间等"""
二、进程
1、创建进程
importmultiprocessing,timedeftask1():whileTrue:print('---- task1 ----')
time.sleep(1)deftask2():whileTrue:print('---- task2 ----')
time.sleep(1)defmain():#创建进程对象
p1 = multiprocessing.Process(target=task1)
p2= multiprocessing.Process(target=task2)#开启进程
p1.start()
p2.start()if __name__ == '__main__':
main()
View Code
2、通过队列完成进程间通信
importmultiprocessingdefdownload_data(q):"""模拟下载数据"""data= [11, 22, 33, 44]#向队列中写入数据
for temp indata:print('写入数据到队列:' +str(temp))
q.put(temp)print('------下载器已下载完并写入到队列当中了------')defanalysis_data(q):"""模拟数据处理"""waiting_data=list()whileTrue:#从队列中获取数据
temp =q.get()print('从队列中获取数据:' +str(temp))
waiting_data.append(temp)#判断队列是否为空
ifq.empty():break
print('----数据已经分析完毕----')print(waiting_data)defmain():#创建队列
q =multiprocessing.Queue()#创建进程
p1 = multiprocessing.Process(target=download_data, args=(q,))
p2= multiprocessing.Process(target=analysis_data, args=(q,))#开启进程
p1.start()
p2.start()if __name__ == '__main__':
main()
View Code
3、进程池
importmultiprocessingimportos,time,randomdefworker(index):
t_start=time.time()print("{}开始执行,进程号为:{}".format(index, os.getpid()))
time.sleep(random.random()*2)
t_stop=time.time()print("{} 执行完毕,耗时{}".format(index, round(t_stop-t_start, 2)))defmain():#定义一个进程池,最大进程数量为 3
po = multiprocessing.Pool(3)for i in range(10):#Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
#每次循环将会用空闲出来的子进程去调用目标
po.apply_async(func=worker, args=(i,))print('-'*10 + 'start' + '-'*10)#关闭进程池,关闭后po不再接收新的请求
po.close()#等待po中所有子进程执行完成,必须放在close语句之后
po.join() #保证进程池中的所有进程都执行完毕后,主进程才会结束
print('-'*10 + 'end' + '-'*10)if __name__ == '__main__':
main()
View Code
4、多任务拷贝文件夹
importmultiprocessing,os#拷贝文件函数
defcopy_file(q, file_name, old_folder_name, new_folder_name):
old_file=os.path.join(old_folder_name, file_name)
new_file=os.path.join(new_folder_name, file_name)
with open(old_file,'rb') as f:whileTrue:#最多一次取出1024个字节
content = f.read(1024)ifcontent:
with open(new_file,'ab') as f2:
f2.write(content)else:break
#如果拷贝完了文件,那么就放入队列中,表示文件已拷贝完成
q.put(file_name)defmain():#1、获取用户要拷贝的文件夹名称
old_folder_name = input("请输入你要拷贝的文件夹名称:")#2、创建一个新的文件夹
new_folder_name = "[复制]" +old_folder_name;try:
os.mkdir(new_folder_name)exceptFileExistsError:pass
#3、获取文件夹中所有的文件名称
file_names =os.listdir(old_folder_name)#4、创建进程池
po = multiprocessing.Pool(5)#5、创建进程池队列,通过队列来计算进度(进程间通信)
q =multiprocessing.Manager().Queue()#6、向进程池中添加复制文件的任务
for file_name infile_names:
po.apply_async(func=copy_file, args=(q, file_name, old_folder_name, new_folder_name))
po.close()#po.join()
#7、获取队列中的内容,计算已经完成拷贝的文件数量
all_file_num =len(file_names)
copy_file_num=0whileTrue:
file_name=q.get()
copy_file_num+= 1
print("\r当前拷贝文件夹的进度为:%.2f %%"%(copy_file_num*100/all_file_num), end='')if copy_file_num >=all_file_num:print()break
print('Done.')if __name__ == '__main__':
main()
View Code
总结:
"""1、什么是进程
程序:例如xxx.py这是程序,是一个静态的
进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。
不仅可以通过线程完成多任务,进程也是可以的
2、进程的状态:
就绪、执行、等待
3、如何创建
需使用multiprocessing模块来创建,创建的时候,和线程一样,需要指定要执行的任务是什么,例如:
p1 = multiprocessing.Process(target=task1)
4、进程间如何通信?
进程间可通过队列Queue 来进行通信。
5、进程池
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程。
但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。
6、进程和线程的对比
进程,能够完成多任务,比如 在一台电脑上能够同时运行多个QQ
线程,能够完成多任务,比如 一个QQ中的多个聊天窗口
进程是系统进行资源分配和调度的一个独立单位。
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。
区别:
1)一个程序至少有一个进程,一个进程至少有一个线程。
2)线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高
3)进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
4)线线程不能够独立执行,必须依存在进程中
5)线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。"""
三、协程
1、迭代器
"""迭代是访问集合元素的一种方式。
迭代器是一个可以记住遍历的位置的对象。
迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。
总结:
1、如果想要一个对象成为 一个可迭代的对象,即可以使用for循环,那么必须实现 __iter__ 方法
2、要构造一个迭代器,就必须实现 __iter__ 方法和 __next__ 方法
3、迭代器抛出 StopIteration 异常,for 循环就会自动停止"""
from collections.abc importIterable,IteratorimporttimeclassClassmate(object):def __init__(self):
self.names=list()
self.current_index=0defadd(self, name):
self.names.append(name)def __iter__(self):#如果想要一个对象成为 一个可迭代的对象,即可以使用for循环,那么必须实现 __iter__ 方法
returnselfdef __next__(self):#迭代器:要构造一个迭代器,就必须实现 __iter__ 方法和 __next__ 方法
if self.current_index
result=self.names[self.current_index]
self.current_index+= 1
returnresultelse:#抛出 StopIteration 异常,for 循环就会自动停止
raiseStopIteration
classmate=Classmate()#判断对象是否可迭代
print('对象是否可迭代:', isinstance(classmate, Iterable))#获取对象的迭代器
classmate_iter =iter(classmate)print('对象是否是迭代器:', isinstance(classmate_iter, Iterator))
classmate.add('张三')
classmate.add('李四')
classmate.add('王五')
classmate.add('赵六')for name inclassmate:#classmate:可迭代,并且迭代器就是本身
#name:打印name,其实质是调用迭代器的 __next__ 方法
print(name)
time.sleep(1)
View Code
2、迭代器应用——斐波那契数列
#斐波那契数列:0,1,1,2,3,5,....,a,b,a+b
classFibonacci(object):def __init__(self, count):
self.count=count
self.index=0
self.a=0
self.b= 1
def __iter__(self):returnselfdef __next__(self):if self.index
result=self.a
self.a, self.b= self.b, self.a +self.b
self.index+= 1
returnresultelse:raiseStopIteration
fibo= Fibonacci(10)for num infibo:print(num)
View Code
3、迭代器的其他应用
"""list、tuple 也可接收可迭代的对象
比如:list(a):先生成一个空的列表,然后根据a获取 a 的迭代器,再通过next方法获取各个值,最后返回列表"""
classFibonacci(object):def __init__(self, count):
self.count=count
self.index=0
self.a=0
self.b= 1
def __iter__(self):returnselfdef __next__(self):if self.index
result=self.a
self.a, self.b= self.b, self.a +self.b
self.index+= 1
returnresultelse:raiseStopIteration
fibo= Fibonacci(10)
list1=list(fibo)print(list1)
tuple1= (1,2,3,4)
list2=list(tuple1)print(list2)
list3= [1,2,3,4]
tuple2=tuple(list3)print(tuple2)
View Code
4、生成器——创建方式 1
"""生成器是一种特殊的迭代器
创建方式一:只要把一个列表生成式的 [ ] 改成 ( )"""list1= [x*2 for x in range(10)]print(list1) #[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
list2= (x*2 for x in range(10))print(list2) # at 0x101bafad0>
for num inlist2:print(num) #0 / 2 / 4 / 6 / 8 / 10 / 12 / 14 / 16 / 18
View Code
5、生成器——创建方式 2
"""生成器创建方式二:使用 yield
如果一个函数中有yield 语句,那么这个就不再是一个函数,而是一个生成器。
执行的程序遇到yield:
1)保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起
2)将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用
下次再执行的时候,就会从yield 语句挂起那开始执行,不再是在函数的最开始位置执行。
可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)。"""
#打印斐波那契数列
defcreate_fibonacci(count):
a, b= 0, 1index=0while index
#执行的程序遇到yield,则会在 yield 语句中暂停,并且把这个值返回。
#下次再执行的时候,就会从yield 语句后面开始执行,不再是在函数的最开始位置执行。
yielda
a, b= b, a+b
index+= 1
return "ok."obj= create_fibonacci(10)print(obj) #
print(type(obj)) #:类型是生成器,生成器是一个特殊的迭代器,那么,就可以通过for 循环来获取值
for num inobj:print(num) #0 / 1 / 1 / 2 / 3 / 5 / 8 / 13 / 21 / 34
print('='*40)
obj2= create_fibonacci(2)whileTrue:try:
num=next(obj2)print(num)exceptException as ex:#ex.value:获取生成器上的"ok."字符串
print(ex.value)break
#print(next(obj))#print(next(obj))#print(next(obj))#print('='*40)#obj2 = create_fibonacci(2)#print(next(obj2))#print(next(obj2))
View Code
6、使用send()方法来唤醒生成器
"""我们除了可以使用next()函数来唤醒生成器继续执行外,还可以使用send()函数来唤醒执行。
使用send()函数的一个好处是可以在唤醒的同时向断点处传入一个附加数据。"""
#打印斐波那契数列
defcreate_fibonacci(count):
a, b= 0, 1index=0while index
ret2= yieldaprint(">>>ret>>>", ret2)
a, b= b, a+b
index+= 1obj= create_fibonacci(10)#第一次运行,执行到 yield 的时候,将 yield 后面的表达式返回,即将 a 返回。#所以就打印出:0
ret =next(obj)print(ret) #0
#通过 send 方法传递参数来唤醒 yield 时,程序将在上次断点的位置开始执行,即:ret2 = yield a。#这个时候,传递过来的参数,将会将 yield a 替换成传递过来的参数 'hello world!',然后返回给 ret2。#所以就接着打印出:>>>ret>>> hello world!#然后程序会再次循环到 yield,将 yield 后面的表达式返回,即将 a 返回。所以得到的结果是:1#所以就打印出:1
ret = obj.send('hello world!')print(ret)
View Code
7、使用 yield 实现多任务
"""了解"""
importtimedeftask1():whileTrue:print('-'*10 + '1' + '-'*10)
time.sleep(0.5)#使用yield,此时这个函数已经不再是函数,而是一个生成器
yield
deftask2():whileTrue:print('-'*15 + '2' + '-'*15)
time.sleep(0.5)yield
defmain():
count=0
t1=task1()
t2=task2()whileTrue:
count+= 1
#这是一个假的多任务
#实现了 2 个任务交替执行,也就是并发执行的。
next(t1)
next(t2)if count > 5:break
if __name__ == '__main__':
main()
View Code
8、使用 greenlet 实现多任务
"""了解:greenlet相当于对 yield 进行了封装,不再需要使用 yield 来创建生成器"""
from greenlet importgreenletimporttimedeftask1():whileTrue:print('-'*10 + '1' + '-'*10)#切换到gr2中运行
gr2.switch()
time.sleep(0.5)deftask2():whileTrue:print('-'*15 + '2' + '-'*15)#切换到gr1中运行
gr1.switch()
time.sleep(0.5)
gr1=greenlet(task1)
gr2=greenlet(task2)#切换到gr1中运行
gr1.switch()
View Code
9、使用gevent协程完成多任务
"""gevent 相当于是对 greenlet 再次进行了封装,即:greenlet 对yield 进行了封装,gevent 对 greenlet 进行了封装"""
importgevent,timefrom gevent importmonkey#1、将程序中耗时的代码,换为 gevent中自己实现的代码
monkey.patch_all()defwork(current_name):for i in range(5):print(current_name, i)
time.sleep(0.5)#2.等待gevent.joinall这个方法里面的所有协程列表都执行完毕,程序才会到 joinall后面的方法
gevent.joinall([#创建协程
gevent.spawn(work, 'work1'),
gevent.spawn(work,'work2'),
])print('Done.')
View Code
10、案例:通过gevent实现图片下载器
importgeventfrom gevent importmonkeyimportuuid, hashlib#1、将程序中耗时的代码,换为 gevent中自己实现的代码
monkey.patch_all()#import requests需放在key.patch_all()之后,不然会有警告!
importrequests#不重复名称的文件名
defget_unique_str():
uuid_str=str(uuid.uuid4())
md5=hashlib.md5()
md5.update(uuid_str.encode('utf-8'))returnmd5.hexdigest()#下载图片
defdownload_image(image_url):print('准备请求图片:' +image_url)#图片名称
imagename = get_unique_str() + "." + image_url.split('.')[-1]
res=requests.get(image_url)if res.status_code == 200:
content=res.content#保存图片
with open('./images/' + imagename, 'wb') as f:
f.write(content)print('图片已下载到本地:' +imagename)else:print('图片请求失败:' +image_url)defmain():
imgurls=["https://ss1.bdstatic.com/70cFuXSh_Q1YnxGkpoWK1HF6hhy/it/u=3865738611,3013226268&fm=26&gp=0.jpg","https://ss0.bdstatic.com/70cFvHSh_Q1YnxGkpoWK1HF6hhy/it/u=3057356668,282499874&fm=26&gp=0.jpg"]
tasks=[]for imgur inimgurls:#添加协程
tasks.append(gevent.spawn(download_image, imgur))#执行任务
gevent.joinall(tasks)print('Done.')if __name__ == '__main__':
main()
View Code
11、案例:爬取百度图片
from gevent importmonkey#将程序中耗时的代码,换为 gevent中自己实现的代码
monkey.patch_all()from gevent.queue importQueueimportgeventimportrequestsimportreimportuuid, hashlibimportos#图片列表队列
works =Queue()defget_unique_str():"""不重复名称的文件名"""uuid_str=str(uuid.uuid4())
md5=hashlib.md5()
md5.update(uuid_str.encode('utf-8'))returnmd5.hexdigest()defdownload_image():"""下载图片"""
while notworks.empty():
image_url=works.get_nowait()print('准备请求图片:' +image_url)#图片名称
imagename = get_unique_str() + "." + image_url.split('.')[-1]
res=requests.get(image_url)if res.status_code == 200:
content=res.content
image_folder= "./images/"
if notos.path.isdir(image_folder):
os.makedirs(image_folder)#保存图片
with open('./images/' + imagename, 'ab') as f:
f.write(content)print('图片已下载到本地:' +imagename)else:print('图片请求失败:' +image_url)defmain():#1.获取图片地址
word = "美女"url= "http://image.baidu.com/search/flip?tn=baiduimage&ie=utf-8&word={}&pn=0".format(word)print('正请求图片地址...')
res=requests.get(url)if res.status_code == 200:
html=res.text#利用正则表达式找到图片url
pattern = '"objURL":"([^\s]*?(jpge|jpg|png|PNG|JPG))"'pic_urls=re.findall(pattern, html, re.S)for pic_url inpic_urls:#print(pic_url)
works.put_nowait(pic_url[0])else:print('图片列表访问失败!')#设置协程去下载图片
tasks =list()for i in range(5):
tasks.append(gevent.spawn(download_image))#执行任务
gevent.joinall(tasks)print('Done.')if __name__ == '__main__':
main()
View Code
12、总结
"""协程:在处理等待某些资源的同时,去做其他的任务。
进程、线程、协程对比:
进程是资源分配的单位
线程是操作系统调度的单位
进程切换需要的资源很最大,效率很低
线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)
协程切换任务资源很小,效率高
多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发"""
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。