当前位置:   article > 正文

python协程池_python 协程池和pool.map用法

pool.map(partial(

一、问题描述

现在有一段代码,需要扫描一个网段内的ip地址,是否可以ping通。

执行起来效率太慢,需要使用协程

#!/usr/bin/env python#-*- coding: utf-8 -*-

importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件

:param content: 内容

:param colour: 颜色

:return: None"""

#颜色代码

colour_dict ={'red': 31, #红色

'green': 32, #绿色

'yellow': 33, #黄色

'blue': 34, #蓝色

'purple_red': 35, #紫红色

'bluish_blue': 36, #浅蓝色

'white': 37, #白色

}

choice= colour_dict.get(colour) #选择颜色

info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list

:param cmd: linux命令

:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒

:param skip: 是否跳过超时限制

:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

t_beginning= time.time() #开始时间

whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()

#p.kill()

#raise TimeoutError(cmd, timeout)

custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程

try:#time.sleep(1)

os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass

returnFalse

result= p.stdout.readlines() #结果输出列表

returnresultclassNetworkTest(object):def __init__(self):

self.flag_list=[]defcheck_ping(self,ip):"""检查ping

:param ip: ip地址

:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)

#本机执行命令

res = execute_linux2(cmd,2)#print(res)

if notres:

custom_print("错误, 执行命令: {} 失败".format(cmd), "red")

self.flag_list.append(False)returnFalse

res.pop()#删除最后一个元素

last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果

if notlast_row:

custom_print("错误,执行命令: {} 异常","red")

self.flag_list.append(False)returnFalse

res= last_row.split() #切割结果

#print(res,type(res),len(res))

if len(res) <10:

custom_print("错误,切割 ping 结果异常","red")

self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率

custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:

self.flag_list.append(False)

custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序

:return:"""

for num in range(1, 256):

ip= '192.168.10.{}'.format(num)

self.check_ping(ip)if __name__ == '__main__':

startime= time.time() #开始时间

NetworkTest().main()

endtime=time.time()

take_time= endtime -startimeif take_time < 1: #判断不足1秒时

take_time = 1 #设置为1秒

#计算花费时间

m, s = divmod(take_time, 60)

h, m= divmod(m, 60)

custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

改造成,协程执行。

#!/usr/bin/env python#-*- coding: utf-8 -*-

importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件

:param content: 内容

:param colour: 颜色

:return: None"""

#颜色代码

colour_dict ={'red': 31, #红色

'green': 32, #绿色

'yellow': 33, #黄色

'blue': 34, #蓝色

'purple_red': 35, #紫红色

'bluish_blue': 36, #浅蓝色

'white': 37, #白色

}

choice= colour_dict.get(colour) #选择颜色

info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list

:param cmd: linux命令

:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒

:param skip: 是否跳过超时限制

:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

t_beginning= time.time() #开始时间

whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()

#p.kill()

#raise TimeoutError(cmd, timeout)

custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程

try:#time.sleep(1)

os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass

returnFalse

result= p.stdout.readlines() #结果输出列表

returnresultclassNetworkTest(object):def __init__(self):

self.flag_list=[]defcheck_ping(self,ip):"""检查ping

:param ip: ip地址

:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)

#本机执行命令

res = execute_linux2(cmd,2)#print(res)

if notres:

custom_print("错误, 执行命令: {} 失败".format(cmd), "red")

self.flag_list.append(False)returnFalse

res.pop()#删除最后一个元素

last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果

if notlast_row:

custom_print("错误,执行命令: {} 异常","red")

self.flag_list.append(False)returnFalse

res= last_row.split() #切割结果

#print(res,type(res),len(res))

if len(res) <10:

custom_print("错误,切割 ping 结果异常","red")

self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率

custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:

self.flag_list.append(False)

custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序

:return:"""process_list=[]for num in range(1, 256):

ip= '192.168.10.{}'.format(num)#self.check_ping(ip)

#将任务加到列表中

process_list.append(gevent.spawn(self.check_ping, ip))

gevent.joinall(process_list)#等待所有协程结束

if __name__ == '__main__':

startime= time.time() #开始时间

NetworkTest().main()

endtime=time.time()

take_time= endtime -startimeif take_time < 1: #判断不足1秒时

take_time = 1 #设置为1秒

#计算花费时间

m, s = divmod(take_time, 60)

h, m= divmod(m, 60)

custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

执行输出:

...

错误, 命令: ping192.168.10.250 -c 2,本地执行超时!

错误, 执行命令: ping192.168.10.250 -c 2失败

错误, 命令: ping192.168.10.255 -c 2,本地执行超时!

错误, 执行命令: ping192.168.10.255 -c 2失败

本次花费时间00:00:07

注意:切勿在windows系统中运行,否则会报错

AttributeError: module 'os' has no attribute 'setsid'

二、使用协程池

上面直接将所有任务加到列表中,然后一次性,全部异步执行。那么同一时刻,最多有多少任务执行呢?

不知道,可能有256个吧?

注意:如果这个一个很耗CPU的程序,可能会导致服务器,直接卡死。

那么,我们应该要限制它的并发数。这个时候,需要使用协程池,固定并发数。

比如:固定为100个

#!/usr/bin/env python#-*- coding: utf-8 -*-

importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件

:param content: 内容

:param colour: 颜色

:return: None"""

#颜色代码

colour_dict ={'red': 31, #红色

'green': 32, #绿色

'yellow': 33, #黄色

'blue': 34, #蓝色

'purple_red': 35, #紫红色

'bluish_blue': 36, #浅蓝色

'white': 37, #白色

}

choice= colour_dict.get(colour) #选择颜色

info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list

:param cmd: linux命令

:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒

:param skip: 是否跳过超时限制

:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

t_beginning= time.time() #开始时间

whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()

#p.kill()

#raise TimeoutError(cmd, timeout)

custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程

try:#time.sleep(1)

os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass

returnFalse

result= p.stdout.readlines() #结果输出列表

returnresultclassNetworkTest(object):def __init__(self):

self.flag_list=[]defcheck_ping(self,ip):"""检查ping

:param ip: ip地址

:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)

#本机执行命令

res = execute_linux2(cmd,2)#print(res)

if notres:

custom_print("错误, 执行命令: {} 失败".format(cmd), "red")

self.flag_list.append(False)returnFalse

res.pop()#删除最后一个元素

last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果

if notlast_row:

custom_print("错误,执行命令: {} 异常","red")

self.flag_list.append(False)returnFalse

res= last_row.split() #切割结果

#print(res,type(res),len(res))

if len(res) <10:

custom_print("错误,切割 ping 结果异常","red")

self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率

custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:

self.flag_list.append(False)

custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序

:return:"""process_list=[]

pool= gevent.pool.Pool(100) #协程池固定为100个

for num in range(1, 256):

ip= '192.168.10.{}'.format(num)#self.check_ping(ip)

#将任务加到列表中

process_list.append(pool.spawn(self.check_ping, ip))

gevent.joinall(process_list)#等待所有协程结束

if __name__ == '__main__':

startime= time.time() #开始时间

NetworkTest().main()

endtime=time.time()

take_time= endtime -startimeif take_time < 1: #判断不足1秒时

take_time = 1 #设置为1秒

#计算花费时间

m, s = divmod(take_time, 60)

h, m= divmod(m, 60)

custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

再次执行,效果如下:

...

错误, 执行命令: ping192.168.10.254 -c 2失败

错误, 命令: ping192.168.10.255 -c 2,本地执行超时!

错误, 执行命令: ping192.168.10.255 -c 2失败

本次花费时间00:00:15

可以,发现花费的时间,明显要比上面慢了!

pool.map 单个参数

其实,还有一种写法,使用pool.map,语法如下:

pool.map(func,iterator)

比如:

pool.map(self.get_kernel, NODE_LIST)

注意:func是一个方法,iterator是一个迭代器。比如:list就是一个迭代器

使用map时,func只能接收一个参数。这个参数就是,遍历迭代器的每一个值。

使用map,完整代码如下:

#!/usr/bin/env python#-*- coding: utf-8 -*-

importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件

:param content: 内容

:param colour: 颜色

:return: None"""

#颜色代码

colour_dict ={'red': 31, #红色

'green': 32, #绿色

'yellow': 33, #黄色

'blue': 34, #蓝色

'purple_red': 35, #紫红色

'bluish_blue': 36, #浅蓝色

'white': 37, #白色

}

choice= colour_dict.get(colour) #选择颜色

info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list

:param cmd: linux命令

:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒

:param skip: 是否跳过超时限制

:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

t_beginning= time.time() #开始时间

whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()

#p.kill()

#raise TimeoutError(cmd, timeout)

custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程

try:#time.sleep(1)

os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass

returnFalse

result= p.stdout.readlines() #结果输出列表

returnresultclassNetworkTest(object):def __init__(self):

self.flag_list=[]defcheck_ping(self,ip):"""检查ping

:param ip: ip地址

:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)

#本机执行命令

res = execute_linux2(cmd,2)#print(res)

if notres:

custom_print("错误, 执行命令: {} 失败".format(cmd), "red")

self.flag_list.append(False)returnFalse

res.pop()#删除最后一个元素

last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果

if notlast_row:

custom_print("错误,执行命令: {} 异常","red")

self.flag_list.append(False)returnFalse

res= last_row.split() #切割结果

#print(res,type(res),len(res))

if len(res) <10:

custom_print("错误,切割 ping 结果异常","red")

self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率

custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:

self.flag_list.append(False)

custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序

:return:"""pool= gevent.pool.Pool(100) #协程池固定为100个

ip_list = ["192.168.10.{}".format(i) for i in range(1, 256)]#使用pool.map,语法:pool.map(func,iterator)

pool.map(self.check_ping, ip_list)if __name__ == '__main__':

startime= time.time() #开始时间

NetworkTest().main()

endtime=time.time()

take_time= endtime -startimeif take_time < 1: #判断不足1秒时

take_time = 1 #设置为1秒

#计算花费时间

m, s = divmod(take_time, 60)

h, m= divmod(m, 60)

custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

注意:方法只有一个参数的情况下,使用pool.map,一行就可以搞定。这样看起来,比较精简!

pool.map 多参数

如果方法,有多个参数,需要借用偏函数实现。

完整代码如下:

#!/usr/bin/env python3#coding: utf-8

#!/usr/bin/env python#-*- coding: utf-8 -*-

importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()from functools importpartialdef custom_print(content,colour='white'):"""写入日志文件

:param content: 内容

:param colour: 颜色

:return: None"""

#颜色代码

colour_dict ={'red': 31, #红色

'green': 32, #绿色

'yellow': 33, #黄色

'blue': 34, #蓝色

'purple_red': 35, #紫红色

'bluish_blue': 36, #浅蓝色

'white': 37, #白色

}

choice= colour_dict.get(colour) #选择颜色

info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list

:param cmd: linux命令

:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒

:param skip: 是否跳过超时限制

:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

t_beginning= time.time() #开始时间

whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()

#p.kill()

#raise TimeoutError(cmd, timeout)

custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程

try:#time.sleep(1)

os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass

returnFalse

result= p.stdout.readlines() #结果输出列表

returnresultclassNetworkTest(object):def __init__(self):

self.flag_list=[]defcheck_ping(self,ip,timeout):"""检查ping

:param ip: ip地址

:param ip: 超时时间

:return: none"""cmd= "ping %s -c 2 -W %s" %(ip,timeout)#print(cmd)

#本机执行命令

res = execute_linux2(cmd,2)#print("res",res,"ip",ip,"len",len(res))

if notres:

custom_print("错误, 执行命令: {} 失败".format(cmd), "red")

self.flag_list.append(False)returnFalseif len(res) != 7:

custom_print("错误,执行命令: {} 异常".format(cmd), "red")

self.flag_list.append(False)returnFalse

res.pop()#删除最后一个元素

last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果

if notlast_row:

custom_print("错误,执行命令: {} 获取结果异常","red")

self.flag_list.append(False)returnFalse

res= last_row.split() #切割结果

#print(res,type(res),len(res))

if len(res) <10:

custom_print("错误,切割 ping 结果异常","red")

self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率

custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:

self.flag_list.append(False)

custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序

:return:"""pool= gevent.pool.Pool(100) #协程池固定为100个

ip_list = ["192.168.0.{}".format(i) for i in range(1, 256)]#使用协程池,执行任务。语法: pool.map(func,iterator)

#partial使用偏函数传递参数

#注意:func第一个参数,必须是迭代器遍历的值。后面的参数,必须使用有命名传参

pool.map(partial(self.check_ping, timeout=1), ip_list)if __name__ == '__main__':

startime= time.time() #开始时间

NetworkTest().main()

endtime=time.time()

take_time= endtime -startimeif take_time < 1: #判断不足1秒时

take_time = 1 #设置为1秒

#计算花费时间

m, s = divmod(take_time, 60)

h, m= divmod(m, 60)

custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

执行脚本,效果同上

本文参考链接:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/181333?site
推荐阅读
相关标签
  

闽ICP备14008679号