当前位置:   article > 正文

python协程处理多个文件_python:多任务(线程、进程、协程)

pyhon multiprocessing 处理多个文件

一、线程

1、创建线程

#创建线程

importthreading,timedeftask1():for i in range(5):print('task1 -- 任务:%s' %i)

time.sleep(1)deftask2():for j in range(10):print('task2 -- 任务:%s' %j)

time.sleep(1)defmain():#创建线程

t1 = threading.Thread(target=task1)

t2= threading.Thread(target=task2)#执行线程

t1.start()

t2.start()if __name__ == '__main__':

main()

View Code

2、查看线程数量

importthreading,time#创建线程

importthreading,timedeftask1():for i in range(5):print('task1 -- 任务:%s' %i)

time.sleep(1)deftask2():for j in range(5):print('task2 -- 任务:%s' %j)

time.sleep(1)defmain():#创建线程

t1 = threading.Thread(target=task1)

t2= threading.Thread(target=task2)#执行线程

t1.start()

t2.start()whileTrue:#打印当前线程情况

thread_num =len(threading.enumerate())print("当前线程的数量为:" +str(thread_num))print('线程详情:')print(threading.enumerate())if thread_num <= 1:#当剩下一个线程的情况下(也就是只剩下主线程的情况下,退出)

breaktime.sleep(0.5)if __name__ == '__main__':

main()

View Code

3、通过继承Thread来创建线程

importthreading,timeclassMyThread(threading.Thread):#当线程对象调用start()方法的时候,就会自动调用run()方法

defrun(self):for i in range(5):#self.name:当前线程的名字

print("I'm" + self.name + '@' +str(i))

time.sleep(1)if __name__ == '__main__':#创建线程

t1 =MyThread()

t2=MyThread()#运行线程

t1.start()

t2.start()

View Code

4、多线程共享全局变量

importthreading,time#定义全局变量

g_num = 100

deftask1():globalg_num

g_num+= 1

print("---- in task1: g_num = %d" %g_num)deftask2():print("---- in task2: g_num = %d" %g_num)defmain():

t1= threading.Thread(target=task1)

t2= threading.Thread(target=task2)

t1.start()

time.sleep(1)

t2.start()

time.sleep(1)print("---- in main: g_num = %d" %g_num)if __name__ == '__main__':

main()

View Code

5、args 参数

importthreading,time#定义全局变量

g_nums = [10, 20]deftask1(temp):

temp.append(30)print("---- in task1: temp = %s" %str(temp))deftask2(temp):print("---- in task1: temp = %s" %str(temp))defmain():#args 参数:指定调用函数的时候,传递什么参数过去(args 是元组类型)

t1 = threading.Thread(target=task1, args=(g_nums,))

t2= threading.Thread(target=task2, args=(g_nums,))

t1.start()

time.sleep(1)

t2.start()

time.sleep(1)print("---- in task1: g_nums = %s" %str(g_nums))if __name__ == '__main__':

main()

View Code

6、互斥锁解决资源竞争

importthreading,time#定义一个全局变量

g_num =0#创建一个互斥锁

mutex =threading.Lock()deftask1(num):globalg_numfor i inrange(num):#上锁

mutex.acquire()

g_num+= 1

#解锁

mutex.release()print('---- in stak1: g_num = %d' %g_num)deftask2(num):globalg_numfor i inrange(num):#上锁

mutex.acquire()

g_num+= 1

#解锁

mutex.release()print('---- in stak2: g_num = %d' %g_num)if __name__ == '__main__':#创建线程

t1 = threading.Thread(target=task1, args=(1000000,))

t2= threading.Thread(target=task2, args=(1000000,))#执行线程

t1.start()

t2.start()

time.sleep(2)print('---- in main: g_num = %d' % g_num)

View Code

7、多线程udp 聊天器

importthreading,socketdefrecv_msg(udp_socket):#接收数据

whileTrue:

recv_data= udp_socket.recvfrom(1024)print('接收到的数据为:%s' %str(recv_data))defsend_msg(udp_socket, dest_ip, dest_port):whileTrue:

send_data= input('请输入你要发送的数据:')

udp_socket.sendto(send_data.encode('utf-8'), (dest_ip, dest_port))defmain():#创建套接字

udp_socket =socket.socket(socket.AF_INET, socket.SOCK_DGRAM)#绑定本地信息

udp_socket.bind(("", 8899))#发送数据

dest_ip = "192.168.1.9"dest_port= 8899

#创建线程

t_recv = threading.Thread(target=recv_msg, args=(udp_socket,))

t_send= threading.Thread(target=send_msg, args=(udp_socket,dest_ip,dest_port))

t_recv.start()

t_send.start()if __name__ == '__main__':

main()

View Code

8、总结

"""1、什么是线程?

线程操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

2、线程运行:

通过 t = threading.Thread(target=xxx) 来创建线程。

通过 t.start() 来运行线程。如果没有调用 start() 方法,线程是不会执行的。

3、查看线程数量

可通过 threading.enumerate() 来查看线程数量

4、通过继承Thread来创建线程时,需要重写run()方法。子线程对象在调用start()方法的时候,run()方法就会自动调用。

5、共享全局变量:

1)在一个进程内的所有线程共享全局变量,很方便在多个线程间共享数据

2)缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)

6、传递参数:

创建线程的时候,可以设置参数 args,来指定调用函数的时候,需要传递什么参数。args 的数据类型是元组。

7、互斥锁

某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。

互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

8、死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

避免死锁:

1)程序设计时要尽量避免(银行家算法)

2)添加超时时间等"""

二、进程

1、创建进程

importmultiprocessing,timedeftask1():whileTrue:print('---- task1 ----')

time.sleep(1)deftask2():whileTrue:print('---- task2 ----')

time.sleep(1)defmain():#创建进程对象

p1 = multiprocessing.Process(target=task1)

p2= multiprocessing.Process(target=task2)#开启进程

p1.start()

p2.start()if __name__ == '__main__':

main()

View Code

2、通过队列完成进程间通信

importmultiprocessingdefdownload_data(q):"""模拟下载数据"""data= [11, 22, 33, 44]#向队列中写入数据

for temp indata:print('写入数据到队列:' +str(temp))

q.put(temp)print('------下载器已下载完并写入到队列当中了------')defanalysis_data(q):"""模拟数据处理"""waiting_data=list()whileTrue:#从队列中获取数据

temp =q.get()print('从队列中获取数据:' +str(temp))

waiting_data.append(temp)#判断队列是否为空

ifq.empty():break

print('----数据已经分析完毕----')print(waiting_data)defmain():#创建队列

q =multiprocessing.Queue()#创建进程

p1 = multiprocessing.Process(target=download_data, args=(q,))

p2= multiprocessing.Process(target=analysis_data, args=(q,))#开启进程

p1.start()

p2.start()if __name__ == '__main__':

main()

View Code

3、进程池

importmultiprocessingimportos,time,randomdefworker(index):

t_start=time.time()print("{}开始执行,进程号为:{}".format(index, os.getpid()))

time.sleep(random.random()*2)

t_stop=time.time()print("{} 执行完毕,耗时{}".format(index, round(t_stop-t_start, 2)))defmain():#定义一个进程池,最大进程数量为 3

po = multiprocessing.Pool(3)for i in range(10):#Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))

#每次循环将会用空闲出来的子进程去调用目标

po.apply_async(func=worker, args=(i,))print('-'*10 + 'start' + '-'*10)#关闭进程池,关闭后po不再接收新的请求

po.close()#等待po中所有子进程执行完成,必须放在close语句之后

po.join() #保证进程池中的所有进程都执行完毕后,主进程才会结束

print('-'*10 + 'end' + '-'*10)if __name__ == '__main__':

main()

View Code

4、多任务拷贝文件夹

importmultiprocessing,os#拷贝文件函数

defcopy_file(q, file_name, old_folder_name, new_folder_name):

old_file=os.path.join(old_folder_name, file_name)

new_file=os.path.join(new_folder_name, file_name)

with open(old_file,'rb') as f:whileTrue:#最多一次取出1024个字节

content = f.read(1024)ifcontent:

with open(new_file,'ab') as f2:

f2.write(content)else:break

#如果拷贝完了文件,那么就放入队列中,表示文件已拷贝完成

q.put(file_name)defmain():#1、获取用户要拷贝的文件夹名称

old_folder_name = input("请输入你要拷贝的文件夹名称:")#2、创建一个新的文件夹

new_folder_name = "[复制]" +old_folder_name;try:

os.mkdir(new_folder_name)exceptFileExistsError:pass

#3、获取文件夹中所有的文件名称

file_names =os.listdir(old_folder_name)#4、创建进程池

po = multiprocessing.Pool(5)#5、创建进程池队列,通过队列来计算进度(进程间通信)

q =multiprocessing.Manager().Queue()#6、向进程池中添加复制文件的任务

for file_name infile_names:

po.apply_async(func=copy_file, args=(q, file_name, old_folder_name, new_folder_name))

po.close()#po.join()

#7、获取队列中的内容,计算已经完成拷贝的文件数量

all_file_num =len(file_names)

copy_file_num=0whileTrue:

file_name=q.get()

copy_file_num+= 1

print("\r当前拷贝文件夹的进度为:%.2f %%"%(copy_file_num*100/all_file_num), end='')if copy_file_num >=all_file_num:print()break

print('Done.')if __name__ == '__main__':

main()

View Code

总结:

"""1、什么是进程

程序:例如xxx.py这是程序,是一个静态的

进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。

不仅可以通过线程完成多任务,进程也是可以的

2、进程的状态:

就绪、执行、等待

3、如何创建

需使用multiprocessing模块来创建,创建的时候,和线程一样,需要指定要执行的任务是什么,例如:

p1 = multiprocessing.Process(target=task1)

4、进程间如何通信?

进程间可通过队列Queue 来进行通信。

5、进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程。

但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

6、进程和线程的对比

进程,能够完成多任务,比如 在一台电脑上能够同时运行多个QQ

线程,能够完成多任务,比如 一个QQ中的多个聊天窗口

进程是系统进行资源分配和调度的一个独立单位。

线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

区别:

1)一个程序至少有一个进程,一个进程至少有一个线程。

2)线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高

3)进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率

4)线线程不能够独立执行,必须依存在进程中

5)线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。"""

三、协程

1、迭代器

"""迭代是访问集合元素的一种方式。

迭代器是一个可以记住遍历的位置的对象。

迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。

总结:

1、如果想要一个对象成为 一个可迭代的对象,即可以使用for循环,那么必须实现 __iter__ 方法

2、要构造一个迭代器,就必须实现 __iter__ 方法和 __next__ 方法

3、迭代器抛出 StopIteration 异常,for 循环就会自动停止"""

from collections.abc importIterable,IteratorimporttimeclassClassmate(object):def __init__(self):

self.names=list()

self.current_index=0defadd(self, name):

self.names.append(name)def __iter__(self):#如果想要一个对象成为 一个可迭代的对象,即可以使用for循环,那么必须实现 __iter__ 方法

returnselfdef __next__(self):#迭代器:要构造一个迭代器,就必须实现 __iter__ 方法和 __next__ 方法

if self.current_index

result=self.names[self.current_index]

self.current_index+= 1

returnresultelse:#抛出 StopIteration 异常,for 循环就会自动停止

raiseStopIteration

classmate=Classmate()#判断对象是否可迭代

print('对象是否可迭代:', isinstance(classmate, Iterable))#获取对象的迭代器

classmate_iter =iter(classmate)print('对象是否是迭代器:', isinstance(classmate_iter, Iterator))

classmate.add('张三')

classmate.add('李四')

classmate.add('王五')

classmate.add('赵六')for name inclassmate:#classmate:可迭代,并且迭代器就是本身

#name:打印name,其实质是调用迭代器的 __next__ 方法

print(name)

time.sleep(1)

View Code

2、迭代器应用——斐波那契数列

#斐波那契数列:0,1,1,2,3,5,....,a,b,a+b

classFibonacci(object):def __init__(self, count):

self.count=count

self.index=0

self.a=0

self.b= 1

def __iter__(self):returnselfdef __next__(self):if self.index

result=self.a

self.a, self.b= self.b, self.a +self.b

self.index+= 1

returnresultelse:raiseStopIteration

fibo= Fibonacci(10)for num infibo:print(num)

View Code

3、迭代器的其他应用

"""list、tuple 也可接收可迭代的对象

比如:list(a):先生成一个空的列表,然后根据a获取 a 的迭代器,再通过next方法获取各个值,最后返回列表"""

classFibonacci(object):def __init__(self, count):

self.count=count

self.index=0

self.a=0

self.b= 1

def __iter__(self):returnselfdef __next__(self):if self.index

result=self.a

self.a, self.b= self.b, self.a +self.b

self.index+= 1

returnresultelse:raiseStopIteration

fibo= Fibonacci(10)

list1=list(fibo)print(list1)

tuple1= (1,2,3,4)

list2=list(tuple1)print(list2)

list3= [1,2,3,4]

tuple2=tuple(list3)print(tuple2)

View Code

4、生成器——创建方式 1

"""生成器是一种特殊的迭代器

创建方式一:只要把一个列表生成式的 [ ] 改成 ( )"""list1= [x*2 for x in range(10)]print(list1) #[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

list2= (x*2 for x in range(10))print(list2) # at 0x101bafad0>

for num inlist2:print(num) #0 / 2 / 4 / 6 / 8 / 10 / 12 / 14 / 16 / 18

View Code

5、生成器——创建方式 2

"""生成器创建方式二:使用 yield

如果一个函数中有yield 语句,那么这个就不再是一个函数,而是一个生成器。

执行的程序遇到yield:

1)保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起

2)将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用

下次再执行的时候,就会从yield 语句挂起那开始执行,不再是在函数的最开始位置执行。

可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)。"""

#打印斐波那契数列

defcreate_fibonacci(count):

a, b= 0, 1index=0while index

#执行的程序遇到yield,则会在 yield 语句中暂停,并且把这个值返回。

#下次再执行的时候,就会从yield 语句后面开始执行,不再是在函数的最开始位置执行。

yielda

a, b= b, a+b

index+= 1

return "ok."obj= create_fibonacci(10)print(obj) #

print(type(obj)) #:类型是生成器,生成器是一个特殊的迭代器,那么,就可以通过for 循环来获取值

for num inobj:print(num) #0 / 1 / 1 / 2 / 3 / 5 / 8 / 13 / 21 / 34

print('='*40)

obj2= create_fibonacci(2)whileTrue:try:

num=next(obj2)print(num)exceptException as ex:#ex.value:获取生成器上的"ok."字符串

print(ex.value)break

#print(next(obj))#print(next(obj))#print(next(obj))#print('='*40)#obj2 = create_fibonacci(2)#print(next(obj2))#print(next(obj2))

View Code

6、使用send()方法来唤醒生成器

"""我们除了可以使用next()函数来唤醒生成器继续执行外,还可以使用send()函数来唤醒执行。

使用send()函数的一个好处是可以在唤醒的同时向断点处传入一个附加数据。"""

#打印斐波那契数列

defcreate_fibonacci(count):

a, b= 0, 1index=0while index

ret2= yieldaprint(">>>ret>>>", ret2)

a, b= b, a+b

index+= 1obj= create_fibonacci(10)#第一次运行,执行到 yield 的时候,将 yield 后面的表达式返回,即将 a 返回。#所以就打印出:0

ret =next(obj)print(ret) #0

#通过 send 方法传递参数来唤醒 yield 时,程序将在上次断点的位置开始执行,即:ret2 = yield a。#这个时候,传递过来的参数,将会将 yield a 替换成传递过来的参数 'hello world!',然后返回给 ret2。#所以就接着打印出:>>>ret>>> hello world!#然后程序会再次循环到 yield,将 yield 后面的表达式返回,即将 a 返回。所以得到的结果是:1#所以就打印出:1

ret = obj.send('hello world!')print(ret)

View Code

7、使用 yield 实现多任务

"""了解"""

importtimedeftask1():whileTrue:print('-'*10 + '1' + '-'*10)

time.sleep(0.5)#使用yield,此时这个函数已经不再是函数,而是一个生成器

yield

deftask2():whileTrue:print('-'*15 + '2' + '-'*15)

time.sleep(0.5)yield

defmain():

count=0

t1=task1()

t2=task2()whileTrue:

count+= 1

#这是一个假的多任务

#实现了 2 个任务交替执行,也就是并发执行的。

next(t1)

next(t2)if count > 5:break

if __name__ == '__main__':

main()

View Code

8、使用 greenlet 实现多任务

"""了解:greenlet相当于对 yield 进行了封装,不再需要使用 yield 来创建生成器"""

from greenlet importgreenletimporttimedeftask1():whileTrue:print('-'*10 + '1' + '-'*10)#切换到gr2中运行

gr2.switch()

time.sleep(0.5)deftask2():whileTrue:print('-'*15 + '2' + '-'*15)#切换到gr1中运行

gr1.switch()

time.sleep(0.5)

gr1=greenlet(task1)

gr2=greenlet(task2)#切换到gr1中运行

gr1.switch()

View Code

9、使用gevent协程完成多任务

"""gevent 相当于是对 greenlet 再次进行了封装,即:greenlet 对yield 进行了封装,gevent 对 greenlet 进行了封装"""

importgevent,timefrom gevent importmonkey#1、将程序中耗时的代码,换为 gevent中自己实现的代码

monkey.patch_all()defwork(current_name):for i in range(5):print(current_name, i)

time.sleep(0.5)#2.等待gevent.joinall这个方法里面的所有协程列表都执行完毕,程序才会到 joinall后面的方法

gevent.joinall([#创建协程

gevent.spawn(work, 'work1'),

gevent.spawn(work,'work2'),

])print('Done.')

View Code

10、案例:通过gevent实现图片下载器

importgeventfrom gevent importmonkeyimportuuid, hashlib#1、将程序中耗时的代码,换为 gevent中自己实现的代码

monkey.patch_all()#import requests需放在key.patch_all()之后,不然会有警告!

importrequests#不重复名称的文件名

defget_unique_str():

uuid_str=str(uuid.uuid4())

md5=hashlib.md5()

md5.update(uuid_str.encode('utf-8'))returnmd5.hexdigest()#下载图片

defdownload_image(image_url):print('准备请求图片:' +image_url)#图片名称

imagename = get_unique_str() + "." + image_url.split('.')[-1]

res=requests.get(image_url)if res.status_code == 200:

content=res.content#保存图片

with open('./images/' + imagename, 'wb') as f:

f.write(content)print('图片已下载到本地:' +imagename)else:print('图片请求失败:' +image_url)defmain():

imgurls=["https://ss1.bdstatic.com/70cFuXSh_Q1YnxGkpoWK1HF6hhy/it/u=3865738611,3013226268&fm=26&gp=0.jpg","https://ss0.bdstatic.com/70cFvHSh_Q1YnxGkpoWK1HF6hhy/it/u=3057356668,282499874&fm=26&gp=0.jpg"]

tasks=[]for imgur inimgurls:#添加协程

tasks.append(gevent.spawn(download_image, imgur))#执行任务

gevent.joinall(tasks)print('Done.')if __name__ == '__main__':

main()

View Code

11、案例:爬取百度图片

from gevent importmonkey#将程序中耗时的代码,换为 gevent中自己实现的代码

monkey.patch_all()from gevent.queue importQueueimportgeventimportrequestsimportreimportuuid, hashlibimportos#图片列表队列

works =Queue()defget_unique_str():"""不重复名称的文件名"""uuid_str=str(uuid.uuid4())

md5=hashlib.md5()

md5.update(uuid_str.encode('utf-8'))returnmd5.hexdigest()defdownload_image():"""下载图片"""

while notworks.empty():

image_url=works.get_nowait()print('准备请求图片:' +image_url)#图片名称

imagename = get_unique_str() + "." + image_url.split('.')[-1]

res=requests.get(image_url)if res.status_code == 200:

content=res.content

image_folder= "./images/"

if notos.path.isdir(image_folder):

os.makedirs(image_folder)#保存图片

with open('./images/' + imagename, 'ab') as f:

f.write(content)print('图片已下载到本地:' +imagename)else:print('图片请求失败:' +image_url)defmain():#1.获取图片地址

word = "美女"url= "http://image.baidu.com/search/flip?tn=baiduimage&ie=utf-8&word={}&pn=0".format(word)print('正请求图片地址...')

res=requests.get(url)if res.status_code == 200:

html=res.text#利用正则表达式找到图片url

pattern = '"objURL":"([^\s]*?(jpge|jpg|png|PNG|JPG))"'pic_urls=re.findall(pattern, html, re.S)for pic_url inpic_urls:#print(pic_url)

works.put_nowait(pic_url[0])else:print('图片列表访问失败!')#设置协程去下载图片

tasks =list()for i in range(5):

tasks.append(gevent.spawn(download_image))#执行任务

gevent.joinall(tasks)print('Done.')if __name__ == '__main__':

main()

View Code

12、总结

"""协程:在处理等待某些资源的同时,去做其他的任务。

进程、线程、协程对比:

进程是资源分配的单位

线程是操作系统调度的单位

进程切换需要的资源很最大,效率很低

线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)

协程切换任务资源很小,效率高

多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发"""

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/243120
推荐阅读
相关标签
  

闽ICP备14008679号