赞
踩
一、问题描述
现在有一段代码,需要扫描一个网段内的ip地址,是否可以ping通。
执行起来效率太慢,需要使用协程。
#!/usr/bin/env python#-*- coding: utf-8 -*-
importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件
:param content: 内容
:param colour: 颜色
:return: None"""
#颜色代码
colour_dict ={'red': 31, #红色
'green': 32, #绿色
'yellow': 33, #黄色
'blue': 34, #蓝色
'purple_red': 35, #紫红色
'bluish_blue': 36, #浅蓝色
'white': 37, #白色
}
choice= colour_dict.get(colour) #选择颜色
info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list
:param cmd: linux命令
:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒
:param skip: 是否跳过超时限制
:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)
t_beginning= time.time() #开始时间
whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()
#p.kill()
#raise TimeoutError(cmd, timeout)
custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程
try:#time.sleep(1)
os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass
returnFalse
result= p.stdout.readlines() #结果输出列表
returnresultclassNetworkTest(object):def __init__(self):
self.flag_list=[]defcheck_ping(self,ip):"""检查ping
:param ip: ip地址
:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)
#本机执行命令
res = execute_linux2(cmd,2)#print(res)
if notres:
custom_print("错误, 执行命令: {} 失败".format(cmd), "red")
self.flag_list.append(False)returnFalse
res.pop()#删除最后一个元素
last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果
if notlast_row:
custom_print("错误,执行命令: {} 异常","red")
self.flag_list.append(False)returnFalse
res= last_row.split() #切割结果
#print(res,type(res),len(res))
if len(res) <10:
custom_print("错误,切割 ping 结果异常","red")
self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率
custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:
self.flag_list.append(False)
custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序
:return:"""
for num in range(1, 256):
ip= '192.168.10.{}'.format(num)
self.check_ping(ip)if __name__ == '__main__':
startime= time.time() #开始时间
NetworkTest().main()
endtime=time.time()
take_time= endtime -startimeif take_time < 1: #判断不足1秒时
take_time = 1 #设置为1秒
#计算花费时间
m, s = divmod(take_time, 60)
h, m= divmod(m, 60)
custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
改造成,协程执行。
#!/usr/bin/env python#-*- coding: utf-8 -*-
importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件
:param content: 内容
:param colour: 颜色
:return: None"""
#颜色代码
colour_dict ={'red': 31, #红色
'green': 32, #绿色
'yellow': 33, #黄色
'blue': 34, #蓝色
'purple_red': 35, #紫红色
'bluish_blue': 36, #浅蓝色
'white': 37, #白色
}
choice= colour_dict.get(colour) #选择颜色
info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list
:param cmd: linux命令
:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒
:param skip: 是否跳过超时限制
:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)
t_beginning= time.time() #开始时间
whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()
#p.kill()
#raise TimeoutError(cmd, timeout)
custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程
try:#time.sleep(1)
os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass
returnFalse
result= p.stdout.readlines() #结果输出列表
returnresultclassNetworkTest(object):def __init__(self):
self.flag_list=[]defcheck_ping(self,ip):"""检查ping
:param ip: ip地址
:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)
#本机执行命令
res = execute_linux2(cmd,2)#print(res)
if notres:
custom_print("错误, 执行命令: {} 失败".format(cmd), "red")
self.flag_list.append(False)returnFalse
res.pop()#删除最后一个元素
last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果
if notlast_row:
custom_print("错误,执行命令: {} 异常","red")
self.flag_list.append(False)returnFalse
res= last_row.split() #切割结果
#print(res,type(res),len(res))
if len(res) <10:
custom_print("错误,切割 ping 结果异常","red")
self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率
custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:
self.flag_list.append(False)
custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序
:return:"""process_list=[]for num in range(1, 256):
ip= '192.168.10.{}'.format(num)#self.check_ping(ip)
#将任务加到列表中
process_list.append(gevent.spawn(self.check_ping, ip))
gevent.joinall(process_list)#等待所有协程结束
if __name__ == '__main__':
startime= time.time() #开始时间
NetworkTest().main()
endtime=time.time()
take_time= endtime -startimeif take_time < 1: #判断不足1秒时
take_time = 1 #设置为1秒
#计算花费时间
m, s = divmod(take_time, 60)
h, m= divmod(m, 60)
custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
执行输出:
...
错误, 命令: ping192.168.10.250 -c 2,本地执行超时!
错误, 执行命令: ping192.168.10.250 -c 2失败
错误, 命令: ping192.168.10.255 -c 2,本地执行超时!
错误, 执行命令: ping192.168.10.255 -c 2失败
本次花费时间00:00:07
注意:切勿在windows系统中运行,否则会报错
AttributeError: module 'os' has no attribute 'setsid'
二、使用协程池
上面直接将所有任务加到列表中,然后一次性,全部异步执行。那么同一时刻,最多有多少任务执行呢?
不知道,可能有256个吧?
注意:如果这个一个很耗CPU的程序,可能会导致服务器,直接卡死。
那么,我们应该要限制它的并发数。这个时候,需要使用协程池,固定并发数。
比如:固定为100个
#!/usr/bin/env python#-*- coding: utf-8 -*-
importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件
:param content: 内容
:param colour: 颜色
:return: None"""
#颜色代码
colour_dict ={'red': 31, #红色
'green': 32, #绿色
'yellow': 33, #黄色
'blue': 34, #蓝色
'purple_red': 35, #紫红色
'bluish_blue': 36, #浅蓝色
'white': 37, #白色
}
choice= colour_dict.get(colour) #选择颜色
info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list
:param cmd: linux命令
:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒
:param skip: 是否跳过超时限制
:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)
t_beginning= time.time() #开始时间
whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()
#p.kill()
#raise TimeoutError(cmd, timeout)
custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程
try:#time.sleep(1)
os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass
returnFalse
result= p.stdout.readlines() #结果输出列表
returnresultclassNetworkTest(object):def __init__(self):
self.flag_list=[]defcheck_ping(self,ip):"""检查ping
:param ip: ip地址
:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)
#本机执行命令
res = execute_linux2(cmd,2)#print(res)
if notres:
custom_print("错误, 执行命令: {} 失败".format(cmd), "red")
self.flag_list.append(False)returnFalse
res.pop()#删除最后一个元素
last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果
if notlast_row:
custom_print("错误,执行命令: {} 异常","red")
self.flag_list.append(False)returnFalse
res= last_row.split() #切割结果
#print(res,type(res),len(res))
if len(res) <10:
custom_print("错误,切割 ping 结果异常","red")
self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率
custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:
self.flag_list.append(False)
custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序
:return:"""process_list=[]
pool= gevent.pool.Pool(100) #协程池固定为100个
for num in range(1, 256):
ip= '192.168.10.{}'.format(num)#self.check_ping(ip)
#将任务加到列表中
process_list.append(pool.spawn(self.check_ping, ip))
gevent.joinall(process_list)#等待所有协程结束
if __name__ == '__main__':
startime= time.time() #开始时间
NetworkTest().main()
endtime=time.time()
take_time= endtime -startimeif take_time < 1: #判断不足1秒时
take_time = 1 #设置为1秒
#计算花费时间
m, s = divmod(take_time, 60)
h, m= divmod(m, 60)
custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
再次执行,效果如下:
...
错误, 执行命令: ping192.168.10.254 -c 2失败
错误, 命令: ping192.168.10.255 -c 2,本地执行超时!
错误, 执行命令: ping192.168.10.255 -c 2失败
本次花费时间00:00:15
可以,发现花费的时间,明显要比上面慢了!
pool.map 单个参数
其实,还有一种写法,使用pool.map,语法如下:
pool.map(func,iterator)
比如:
pool.map(self.get_kernel, NODE_LIST)
注意:func是一个方法,iterator是一个迭代器。比如:list就是一个迭代器
使用map时,func只能接收一个参数。这个参数就是,遍历迭代器的每一个值。
使用map,完整代码如下:
#!/usr/bin/env python#-*- coding: utf-8 -*-
importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件
:param content: 内容
:param colour: 颜色
:return: None"""
#颜色代码
colour_dict ={'red': 31, #红色
'green': 32, #绿色
'yellow': 33, #黄色
'blue': 34, #蓝色
'purple_red': 35, #紫红色
'bluish_blue': 36, #浅蓝色
'white': 37, #白色
}
choice= colour_dict.get(colour) #选择颜色
info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list
:param cmd: linux命令
:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒
:param skip: 是否跳过超时限制
:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)
t_beginning= time.time() #开始时间
whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()
#p.kill()
#raise TimeoutError(cmd, timeout)
custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程
try:#time.sleep(1)
os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass
returnFalse
result= p.stdout.readlines() #结果输出列表
returnresultclassNetworkTest(object):def __init__(self):
self.flag_list=[]defcheck_ping(self,ip):"""检查ping
:param ip: ip地址
:return: none"""cmd= "ping %s -c 2" %ip#print(cmd)
#本机执行命令
res = execute_linux2(cmd,2)#print(res)
if notres:
custom_print("错误, 执行命令: {} 失败".format(cmd), "red")
self.flag_list.append(False)returnFalse
res.pop()#删除最后一个元素
last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果
if notlast_row:
custom_print("错误,执行命令: {} 异常","red")
self.flag_list.append(False)returnFalse
res= last_row.split() #切割结果
#print(res,type(res),len(res))
if len(res) <10:
custom_print("错误,切割 ping 结果异常","red")
self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率
custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:
self.flag_list.append(False)
custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序
:return:"""pool= gevent.pool.Pool(100) #协程池固定为100个
ip_list = ["192.168.10.{}".format(i) for i in range(1, 256)]#使用pool.map,语法:pool.map(func,iterator)
pool.map(self.check_ping, ip_list)if __name__ == '__main__':
startime= time.time() #开始时间
NetworkTest().main()
endtime=time.time()
take_time= endtime -startimeif take_time < 1: #判断不足1秒时
take_time = 1 #设置为1秒
#计算花费时间
m, s = divmod(take_time, 60)
h, m= divmod(m, 60)
custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
注意:方法只有一个参数的情况下,使用pool.map,一行就可以搞定。这样看起来,比较精简!
pool.map 多参数
如果方法,有多个参数,需要借用偏函数实现。
完整代码如下:
#!/usr/bin/env python3#coding: utf-8
#!/usr/bin/env python#-*- coding: utf-8 -*-
importosimporttimeimportsignalimportsubprocessimportgeventimportgevent.poolfrom gevent importmonkey;monkey.patch_all()from functools importpartialdef custom_print(content,colour='white'):"""写入日志文件
:param content: 内容
:param colour: 颜色
:return: None"""
#颜色代码
colour_dict ={'red': 31, #红色
'green': 32, #绿色
'yellow': 33, #黄色
'blue': 34, #蓝色
'purple_red': 35, #紫红色
'bluish_blue': 36, #浅蓝色
'white': 37, #白色
}
choice= colour_dict.get(colour) #选择颜色
info= "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list
:param cmd: linux命令
:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒
:param skip: 是否跳过超时限制
:return: list"""p= subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)
t_beginning= time.time() #开始时间
whileTrue:if p.poll() is notNone:breakseconds_passed= time.time() -t_beginningif notskip:if seconds_passed >timeout:#p.terminate()
#p.kill()
#raise TimeoutError(cmd, timeout)
custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")#当shell=True时,只有os.killpg才能kill子进程
try:#time.sleep(1)
os.killpg(p.pid, signal.SIGUSR1)exceptException as e:pass
returnFalse
result= p.stdout.readlines() #结果输出列表
returnresultclassNetworkTest(object):def __init__(self):
self.flag_list=[]defcheck_ping(self,ip,timeout):"""检查ping
:param ip: ip地址
:param ip: 超时时间
:return: none"""cmd= "ping %s -c 2 -W %s" %(ip,timeout)#print(cmd)
#本机执行命令
res = execute_linux2(cmd,2)#print("res",res,"ip",ip,"len",len(res))
if notres:
custom_print("错误, 执行命令: {} 失败".format(cmd), "red")
self.flag_list.append(False)returnFalseif len(res) != 7:
custom_print("错误,执行命令: {} 异常".format(cmd), "red")
self.flag_list.append(False)returnFalse
res.pop()#删除最后一个元素
last_row = res.pop().decode('utf-8').strip() #再次获取最后一行结果
if notlast_row:
custom_print("错误,执行命令: {} 获取结果异常","red")
self.flag_list.append(False)returnFalse
res= last_row.split() #切割结果
#print(res,type(res),len(res))
if len(res) <10:
custom_print("错误,切割 ping 结果异常","red")
self.flag_list.append(False)returnFalseif res[5] == "0%": #判断丢包率
custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:
self.flag_list.append(False)
custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")defmain(self):"""主程序
:return:"""pool= gevent.pool.Pool(100) #协程池固定为100个
ip_list = ["192.168.0.{}".format(i) for i in range(1, 256)]#使用协程池,执行任务。语法: pool.map(func,iterator)
#partial使用偏函数传递参数
#注意:func第一个参数,必须是迭代器遍历的值。后面的参数,必须使用有命名传参
pool.map(partial(self.check_ping, timeout=1), ip_list)if __name__ == '__main__':
startime= time.time() #开始时间
NetworkTest().main()
endtime=time.time()
take_time= endtime -startimeif take_time < 1: #判断不足1秒时
take_time = 1 #设置为1秒
#计算花费时间
m, s = divmod(take_time, 60)
h, m= divmod(m, 60)
custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
执行脚本,效果同上
本文参考链接:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。