赞
踩
记录一些测试经验。
软件测试工作中经常需要用到收发报文验证一些逻辑,最常见的就是http包,一般的web应用接口测试的时候常用。越是上级协议,工具越多,越方便,越是底层约麻烦。到了需要用到IP协议这一级别的反而经常缺少合适的工具。
工作中遇到需要修改IP.src构造UDP报文,并大量发送(性能测试),特此记录,主要工具是python的scapy库。
scapy是python中用于构造报文的一个库,非常实用。Scapy is a Python program that enables the user to send, sniff and dissect and forge network packets. This capability allows construction of tools that can probe, scan or attack networks.
链接: 文档.
以下示例实现了构造包并发送,但速度不快:
# 引入包
from scapy.all import *
# 最基本的udp构造
test_bytes = b'test'
# 构造一个从ip 192.168.1.1发往192.168.1.100:10000的udp包,其内容为test
udp_packet = IP(src='192.168.1.1', dst='192.168.1.100')/UDP(dport=10000)/Raw(load=test_bytes)
# Send packets at layer 3 不用关心路由网口等
send(udp_packet)
# Send packets at layer 2
# sendp()
# 使用很方便,就是windows马上就可能遇上:scapy.error.Scapy_Exception: Interface is invalid (no pcap match found)
# windows上使用需要安装npcap https://nmap.org/npcap/#download, 可以先安装wiresharck这个抓包工具,我记得有带npcap的
复用socket:
# 直接使用send,可以说每次都在创建socket,再发送数据,很显然到了性能测试的时候,是无法接受的。 # 所以需要手动创建一个socket,复用发送数据 # 理论上完全可以撇开scapy,自己重复造个轮子,然后使用python的socket创建rawsocket发送报文 # linux # 可以直接用socket库创建一个支持发送原始报文的socket,参数有兴趣可以了解下 # raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) # 也可以直接用scapy里的构造的socket from scapy.supersocket import L3RawSocket as linuxL3 raw_socket = linuxL3() # windows 好像是基于安全考虑,是不支持直接用python的socket创建rawSocket并修改IP发送报文(即无法自定义IP报文的来源,会被强制修改为实际IP) # 直接用scapy的,基于npcap from scapy.arch.pcapdnet import L3pcapSocket as windowsL3 raw_socket = windowsL3() raw_socket.send(udp_packet)
说下几个思路,比较土方法:
# 性能测试要点一是要有压力,二是要压力可控,所以还需要模拟并发 # 1. 使用控制apscheduler # 2. 自行使用time计时控制 # 使用控制apscheduler的好处是可以较为精确的控制每个报文前后发送的时间(同一并发),但是并发稍微多点就会无法工作 # 使用time控制,虽然比较蠢,但至少实现了功能。 def use_time_send_packet(iplist, serverip): """使用时间控制发送数据报文""" # 生成获取ip列表 ip_list = iplist # 创建socket rawSocket = create_socket() # 指定发往的地址 dst_addr = (serverip, 10000) # # 根据ip生成udp报文 udp_pack_list = [] for i in ip_list: udp_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1]) udp_pack_list.append((i, udp_pack)) # 整个队列用于计数 message_count = queue.Queue(maxsize=0) count = 0 # 循环发送报文,尽量保证每个ip的报文间隔大于5S while True: start_time = time.perf_counter() for j in udp_pack_list: # 给udp包添加raw内容 all_pack = udp_pack/Raw(load=b"test") rawSocket.send(all_pack) # 队列计数加1 message_count.put(1) stop_time = time.perf_counter() use_time = stop_time - start_time # print(f"用时{use_time}") if use_time < 5: time.sleep(5 - math.ceil(stop_time - start_time)) count += 1 print(f"第{count}次循环,发送了{message_count.qsize()}个包") print(f"每秒消息数:{message_count.qsize()/(stop_time - start_time)}") message_count.queue.clear() def aps_send_fun(s, ip, dst_addr): all_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1])/Raw(load=b"test") s.send(all_pack) def use_aps_send_packet(): """使用aps控制发送""" # 生成获取ip列表 ip_list = iplist # 创建socket rawSocket = create_socket() # 指定发往的地址 dst_addr = (serverip, 10000) # scheduler = GeventScheduler(timezone="Asia/Shanghai") scheduler = BackgroundScheduler(timezone="Asia/Shanghai") for i in iplist: # 5s间隔的任务 scheduler.add_job(func=aps_send_fun, kwargs={"s": rawSocket, "ip": i, "dst_addr": dst_addr}, trigger='interval', seconds=5) g = scheduler.start() # g is the greenlet that runs the scheduler loop # print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed. try: g.join() except (KeyboardInterrupt, SystemExit): pass
其他:python有个全局锁,导致默认情况下是单核运行的,我个人感觉最简单的解决的方法就是直接多开:
from scapy.all import * import time import socket import math import queue from apscheduler.schedulers.gevent import GeventScheduler from apscheduler.schedulers.background import BackgroundScheduler import threading import platform import multiprocessing def create_socket(): # 判断系统类型,建立socket os_type = platform.platform() if "Windows" in os_type: from scapy.arch.pcapdnet import L3pcapSocket raw_socket = L3pcapSocket() else: from scapy.supersocket import L3RawSocket # raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) raw_socket = L3RawSocket() return raw_socket def get_ip(number=10, start='1.1.1.1'): """创建ip""" # file = open('ip_list.txt', 'w') ip_list = [] starts = start.split('.') A = int(starts[0]) B = int(starts[1]) C = int(starts[2]) D = int(starts[3]) for A in range(A, 256): for B in range(B, 256): for C in range(C, 256): for D in range(D, 256): ip = "%d.%d.%d.%d" % (A, B, C, D) if number > 0: # file.write(ip + '\n') ip_list.append(ip) number -= 1 # elif number == 1: # 解决最后多一行回车问题 # # file.write(ip) # number -= 1 else: return ip_list D = 0 C = 0 B = 0 def use_time_send_packet(iplist): """使用时间控制发送数据报文""" # 生成获取ip列表 ip_list = iplist # 创建socket rawSocket = create_socket() # 指定发往的地址 dst_addr = ("192.168.0.1", 10000) # # 根据ip生成udp报文 udp_pack_list = [] for i in ip_list: udp_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1]) udp_pack_list.append((i, udp_pack)) # 整个队列用于计数 message_count = queue.Queue(maxsize=0) count = 0 # 循环发送报文,尽量保证每个ip的报文间隔大于5S while True: start_time = time.perf_counter() for j in udp_pack_list: # 给udp包添加raw内容 all_pack = udp_pack/Raw(load=b"test") rawSocket.send(all_pack) # 队列计数加1 message_count.put(1) stop_time = time.perf_counter() use_time = stop_time - start_time # print(f"用时{use_time}") if use_time < 5: time.sleep(5 - math.ceil(stop_time - start_time)) count += 1 print(f"第{count}次循环,发送了{message_count.qsize()}个包") print(f"每秒消息数:{message_count.qsize()/(stop_time - start_time)}") message_count.queue.clear() def aps_send_fun(s, ip, dst_addr): all_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1])/Raw(load=b"test") s.send(all_pack) def use_aps_send_packet(iplist): """使用aps控制发送""" # 生成获取ip列表 ip_list = iplist # 创建socket rawSocket = create_socket() # 指定发往的地址 dst_addr = ("192.168.0.1", 10000) # scheduler = GeventScheduler(timezone="Asia/Shanghai") scheduler = BackgroundScheduler(timezone="Asia/Shanghai") for i in iplist: # 5s间隔的任务 scheduler.add_job(func=aps_send_fun, kwargs={"s": rawSocket, "ip": i, "dst_addr": dst_addr}, trigger='interval', seconds=5) g = scheduler.start() # g is the greenlet that runs the scheduler loop # print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed. try: g.join() except (KeyboardInterrupt, SystemExit): pass def use_arg_start(): """命令行使用参数""" ip_number = None try: opts, args = getopt.getopt(sys.argv[1:],"i",["ip_number="]) except getopt.GetoptError: print(r'*.py -i <ip_number>') sys.exit() for opt, arg in opts: if opt == '-h': print(r'*.py -i <ip_number>') sys.exit() elif opt in ("-i", "--ip_number"): ip_number = arg if not ip_number: print(ip_number) print("参数缺失") print(r'*.py -i <ip_number> ') sys.exit() else: return int(ip_number) if __name__ == "__main__": # 命令行启动 # ip_number = 10 ip_number = use_arg_start() print( f"本次使用ip数量{ip_number}") ip_list = get_ip(number=ip_number, start='192.168.1.1') # 使用time use_time_send_packet(iplist=ip_list) # 使用aps # use_aps_send_packet(iplist=ip_list)
重新整理下,修改了业务相关的代码,也没试过,不知道能不能用,主要是提供下思路,供人参考,也给自己重新回顾,重新学习。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。