当前位置:   article > 正文

Python 抓取并解码原始数据包_python抓包解析数据

python抓包解析数据

本文介绍了如何使用Python中的混杂模式来捕获流经网卡的数据包,并对其中的IP和ICMP数据包进行解析和打印所需字段信息的操作。文章提供了获取本机MAC地址和IP地址的方法,以便确定网络接口和IP地址的绑定。接着通过使用原始套接字创建一个原始数据包捕获器,并将其绑定到本机的IP地址上。在Windows平台上,需要启用IOCTL混杂模式以捕获所有流经网卡的数据包。最后通过读取单个数据包并打印其内容,实现了捕获数据包的功能。

  • 针对IP数据包,文章提供了解析IP包头的方法。通过定义一个IP头部的结构体,并根据特定的包头格式解析数据包,获取其中的字段信息,如协议类型、源地址、目标地址和生存周期等,并将其打印出来。

  • 对于ICMP数据包,文章提供了解析ICMP包头的方法。与解析IP数据包类似,先判断数据包是否为ICMP协议,然后根据ICMP包头的格式解析数据包,获取字段信息,并将其打印出来。

抓取原始数据包

Python中默认的Socket模块就可以实现对原始数据包的解包操作,如下代码所示,需要注意这段代码只能在Windows平台使用,因为我们需要开启网卡的IOCTL混杂模式,这是Win平台特有的。

import socket
import uuid

# 获取本机MAC地址
def GetHostMAC():
    mac=uuid.UUID(int = uuid.getnode()).hex[-12:]
    this_mac = ":".join([mac[e:e+2] for e in range(0,11,2)])
    this_name = socket.getfqdn(socket.gethostname())
    print("本机名: {} --> 本机MAC: {}".format(this_name,this_mac))

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 开始跟踪原始数据包
def SnifferIOSock(address):
    # 创建原始套接字,然后绑定在公开接口上
    socket_protocol = socket.IPPROTO_IP

    # 开启原始数据包模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))

    # 设置在捕获的数据包中包含IP头
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    # Windows平台需要设置IOCTL以启动混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 读取单个数据包
    while True:
        print(sniffer.recvfrom(65565))

    # 退出时,关闭混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    GetHostMAC()
    address = GetHostAddress()
    print("本机IP: {}".format(address))
    SnifferIOSock(address)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

解码IP数据包头

解码方法同样运用的是上方的方法,只不过这里我们需要找到完整的IP地址的包头封装格式,然后根据特定的包头格式对数据包进行解包操作即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 输出协议和通信双方IP地址
            print("传输协议: {} --> 原地址: {} --> 传输到: {} --> 生存周期: {}".format(ip_header.protocol,ip_header.src_address,ip_header.dst_address,ip_header.this_ttl))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89

解码ICMP数据包头

原理与解包IP头相同,但需要注意,由于ICMP头在IP头的下方,所以我们需要先解析出IP头数据包,然后根据IP头中的protocol_num判断如果是ICMP则将其传入ICMP结构做进一步解包即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 定义ICMP结构包头
class ICMP(Structure):
    _fields_ = [
        ("type",            c_ubyte),
        ("code",            c_ubyte),
        ("checksum",        c_ushort),
        ("unused",          c_ushort),
        ("next_hop_mtu",    c_ushort)
    ]

    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        self.icmp_type = self.type
        self.icmp_code = self.code
        self.icmp_checksum = self.checksum


# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 判断协议类型是否为ICMP
            if ip_header.protocol == "ICMP":

                # 计算ICMP包的起始位置
                offset = ip_header.ihl * 4
                buf = raw_buffer[offset:offset + sizeof(ICMP)]

                #解析ICMP数据
                icmp_header = ICMP(buf)
                print("原地址: {} --> 发送到: {} --> 解包协议: {} --> 解包代码: {} --> 校验和: {}".format(ip_header.src_address, ip_header.dst_address, icmp_header.icmp_type,icmp_header.icmp_code,icmp_header.icmp_checksum))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116

我们运行上方的代码,然后在本机Ping测试一下www.lyshark.com此时即可看到,本机与对端的来往数据包.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/601559
推荐阅读
相关标签
  

闽ICP备14008679号