当前位置:   article > 正文

python测试--scapy构造报文_python scapy发送udp报文

python scapy发送udp报文

记录一些测试经验。


前言

软件测试工作中经常需要用到收发报文验证一些逻辑,最常见的就是http包,一般的web应用接口测试的时候常用。越是上级协议,工具越多,越方便,越是底层约麻烦。到了需要用到IP协议这一级别的反而经常缺少合适的工具。
工作中遇到需要修改IP.src构造UDP报文,并大量发送(性能测试),特此记录,主要工具是python的scapy库。


一、scapy是什么?

scapy是python中用于构造报文的一个库,非常实用。Scapy is a Python program that enables the user to send, sniff and dissect and forge network packets. This capability allows construction of tools that can probe, scan or attack networks.
链接: 文档.

二、步骤

1.快速构造发包

以下示例实现了构造包并发送,但速度不快:

# 引入包
from scapy.all import *
# 最基本的udp构造
test_bytes = b'test'
# 构造一个从ip 192.168.1.1发往192.168.1.100:10000的udp包,其内容为test
udp_packet = IP(src='192.168.1.1', dst='192.168.1.100')/UDP(dport=10000)/Raw(load=test_bytes)

# Send packets at layer 3 不用关心路由网口等
send(udp_packet)
# Send packets at layer 2
# sendp()

# 使用很方便,就是windows马上就可能遇上:scapy.error.Scapy_Exception: Interface is invalid (no pcap match found)
# windows上使用需要安装npcap https://nmap.org/npcap/#download, 可以先安装wiresharck这个抓包工具,我记得有带npcap的 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.复用socket大量发包

复用socket:

# 直接使用send,可以说每次都在创建socket,再发送数据,很显然到了性能测试的时候,是无法接受的。
# 所以需要手动创建一个socket,复用发送数据
# 理论上完全可以撇开scapy,自己重复造个轮子,然后使用python的socket创建rawsocket发送报文

# linux
# 可以直接用socket库创建一个支持发送原始报文的socket,参数有兴趣可以了解下
# raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
# 也可以直接用scapy里的构造的socket

from scapy.supersocket import L3RawSocket as linuxL3
raw_socket = linuxL3()

# windows 好像是基于安全考虑,是不支持直接用python的socket创建rawSocket并修改IP发送报文(即无法自定义IP报文的来源,会被强制修改为实际IP)
# 直接用scapy的,基于npcap
from scapy.arch.pcapdnet import L3pcapSocket as windowsL3
raw_socket = windowsL3()


raw_socket.send(udp_packet)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3.控制发包并并发

说下几个思路,比较土方法:

# 性能测试要点一是要有压力,二是要压力可控,所以还需要模拟并发
# 1. 使用控制apscheduler
# 2. 自行使用time计时控制

# 使用控制apscheduler的好处是可以较为精确的控制每个报文前后发送的时间(同一并发),但是并发稍微多点就会无法工作
# 使用time控制,虽然比较蠢,但至少实现了功能。



def use_time_send_packet(iplist, serverip):
    """使用时间控制发送数据报文"""
    # 生成获取ip列表
    ip_list = iplist
    # 创建socket
    rawSocket = create_socket()
    # 指定发往的地址
    dst_addr = (serverip, 10000)
    # # 根据ip生成udp报文
    udp_pack_list = []
    for i in ip_list:
        udp_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1])
        udp_pack_list.append((i, udp_pack))
    
    # 整个队列用于计数
    message_count = queue.Queue(maxsize=0)
    count = 0

    # 循环发送报文,尽量保证每个ip的报文间隔大于5S
    while True:
        start_time = time.perf_counter()
        for j in udp_pack_list:
            # 给udp包添加raw内容
            all_pack = udp_pack/Raw(load=b"test")
            rawSocket.send(all_pack)
            # 队列计数加1
            message_count.put(1)
        stop_time = time.perf_counter()
        use_time = stop_time - start_time
        # print(f"用时{use_time}")
        if use_time < 5:
            time.sleep(5 - math.ceil(stop_time - start_time))
        count += 1 
        print(f"第{count}次循环,发送了{message_count.qsize()}个包")
        print(f"每秒消息数:{message_count.qsize()/(stop_time - start_time)}")
        message_count.queue.clear()


def aps_send_fun(s, ip, dst_addr):
    all_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1])/Raw(load=b"test")
    s.send(all_pack)

def use_aps_send_packet():
    """使用aps控制发送"""
    # 生成获取ip列表
    ip_list = iplist
    # 创建socket
    rawSocket = create_socket()
    # 指定发往的地址
    dst_addr = (serverip, 10000)
    # scheduler = GeventScheduler(timezone="Asia/Shanghai")
    scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
    for i in iplist:
        # 5s间隔的任务
        scheduler.add_job(func=aps_send_fun, kwargs={"s": rawSocket, "ip": i, "dst_addr": dst_addr}, trigger='interval', seconds=5)
    g = scheduler.start()  # g is the greenlet that runs the scheduler loop
    # print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        g.join()
    except (KeyboardInterrupt, SystemExit):
        pass
 


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

4.完整代码

其他:python有个全局锁,导致默认情况下是单核运行的,我个人感觉最简单的解决的方法就是直接多开:



from scapy.all import *
import time
import socket
import math
import queue
from apscheduler.schedulers.gevent import GeventScheduler
from apscheduler.schedulers.background import BackgroundScheduler

import threading
import platform
import multiprocessing

def create_socket():
    # 判断系统类型,建立socket
    os_type = platform.platform() 
    if "Windows" in os_type:
        from scapy.arch.pcapdnet import L3pcapSocket
        raw_socket = L3pcapSocket()
    else: 
        from scapy.supersocket import L3RawSocket
        # raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
        raw_socket = L3RawSocket()
    return raw_socket

def get_ip(number=10, start='1.1.1.1'):
    """创建ip"""
    # file = open('ip_list.txt', 'w')
    ip_list = []
    starts = start.split('.')
    A = int(starts[0])
    B = int(starts[1])
    C = int(starts[2])
    D = int(starts[3])
    for A in range(A, 256):
        for B in range(B, 256):
            for C in range(C, 256):
                for D in range(D, 256):
                    ip = "%d.%d.%d.%d" % (A, B, C, D)

                    if number > 0:
                        # file.write(ip + '\n')
                        ip_list.append(ip)
                        number -= 1
                    # elif number == 1:  # 解决最后多一行回车问题
                    #     # file.write(ip)
                    #     number -= 1
                    else:
                        return ip_list
                D = 0
            C = 0
        B = 0


def use_time_send_packet(iplist):
    """使用时间控制发送数据报文"""
    # 生成获取ip列表
    ip_list = iplist
    # 创建socket
    rawSocket = create_socket()
    # 指定发往的地址
    dst_addr = ("192.168.0.1", 10000)
    # # 根据ip生成udp报文
    udp_pack_list = []
    for i in ip_list:
        udp_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1])
        udp_pack_list.append((i, udp_pack))
    
    # 整个队列用于计数
    message_count = queue.Queue(maxsize=0)
    count = 0

    # 循环发送报文,尽量保证每个ip的报文间隔大于5S
    while True:
        start_time = time.perf_counter()
        for j in udp_pack_list:
            # 给udp包添加raw内容
            all_pack = udp_pack/Raw(load=b"test")
            rawSocket.send(all_pack)
            # 队列计数加1
            message_count.put(1)
        stop_time = time.perf_counter()
        use_time = stop_time - start_time
        # print(f"用时{use_time}")
        if use_time < 5:
            time.sleep(5 - math.ceil(stop_time - start_time))
        count += 1 
        print(f"第{count}次循环,发送了{message_count.qsize()}个包")
        print(f"每秒消息数:{message_count.qsize()/(stop_time - start_time)}")
        message_count.queue.clear()


def aps_send_fun(s, ip, dst_addr):
    all_pack = IP(src=i, dst=dst_addr[0])/UDP(dport=dst_addr[1])/Raw(load=b"test")
    s.send(all_pack)

def use_aps_send_packet(iplist):
    """使用aps控制发送"""
    # 生成获取ip列表
    ip_list = iplist
    # 创建socket
    rawSocket = create_socket()
    # 指定发往的地址
    dst_addr = ("192.168.0.1", 10000)
    # scheduler = GeventScheduler(timezone="Asia/Shanghai")
    scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
    for i in iplist:
        # 5s间隔的任务
        scheduler.add_job(func=aps_send_fun, kwargs={"s": rawSocket, "ip": i, "dst_addr": dst_addr}, trigger='interval', seconds=5)
    g = scheduler.start()  # g is the greenlet that runs the scheduler loop
    # print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        g.join()
    except (KeyboardInterrupt, SystemExit):
        pass
 

def use_arg_start():
    """命令行使用参数"""
    ip_number = None
    
    try:
        opts, args = getopt.getopt(sys.argv[1:],"i",["ip_number="])
    except getopt.GetoptError:
        print(r'*.py -i <ip_number>')
        sys.exit()
    for opt, arg in opts:
        if opt == '-h':
            print(r'*.py -i <ip_number>')
            sys.exit()
        elif opt in ("-i", "--ip_number"):
            ip_number = arg

    if not ip_number:
        print(ip_number)
        print("参数缺失")
        print(r'*.py -i <ip_number> ')
        sys.exit()
    else:
        return int(ip_number)
        
if __name__ == "__main__":
    # 命令行启动
    # ip_number = 10
    ip_number = use_arg_start()
    print( f"本次使用ip数量{ip_number}")

    ip_list = get_ip(number=ip_number, start='192.168.1.1')
    # 使用time
    use_time_send_packet(iplist=ip_list)
    # 使用aps
    # use_aps_send_packet(iplist=ip_list)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154

总结

重新整理下,修改了业务相关的代码,也没试过,不知道能不能用,主要是提供下思路,供人参考,也给自己重新回顾,重新学习。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/156983
推荐阅读
相关标签
  

闽ICP备14008679号