1、前言
本想借助dpkt解析mail、dns、http来辅助分析pcap包进行分析,查阅资料学习却发现并不如使用scapy那么方便。
dpkt是一个python模块,可以对简单的数据包创建/解析,以及基本TCP / IP协议的解析,速度很快。
dpkt 手册
看官方手册发现DPKT是读取每个pcap包里的内容,用isinstance判断是不是有IP的包,再判断是属于哪个协议,对应的协议已经封装好API如果发现可以匹配某个协议API就输出来相关值。
想要扩展这个源码还需要去学习一下协议相关的字段含义。
API调用:
https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq
在手册中找到了在Github中部分API的示例代码,具备参考价值。
2、手册例子
以下代码是手册中的例子,通过查询发现inet_pton无法直接使用,按照网络上的解决方法修改了一下。
打印数据包
使用DPKT读取pcap文件并打印出数据包的内容。打印出以太网帧和IP数据包中的字段。
python2测试代码:
- #!/usr/bin/env python
- """
- Use DPKT to read in a pcap file and print out the contents of the packets
- This example is focused on the fields in the Ethernet Frame and IP packet
- """
- import dpkt
- import datetime
- import socket
- from dpkt.compat import compat_ord
- import ctypes
- import os
-
-
- def mac_addr(address):
- """Convert a MAC address to a readable/printable string
- Args:
- address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
- Returns:
- str: Printable/readable MAC address
- """
- return ':'.join('%02x' % compat_ord(b) for b in address)
-
-
- class sockaddr(ctypes.Structure):
- _fields_ = [("sa_family", ctypes.c_short),
- ("__pad1", ctypes.c_ushort),
- ("ipv4_addr", ctypes.c_byte * 4),
- ("ipv6_addr", ctypes.c_byte * 16),
- ("__pad2", ctypes.c_ulong)]
-
- if hasattr(ctypes, 'windll'):
- WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
- WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
- else:
- def not_windows():
- raise SystemError(
- "Invalid platform. ctypes.windll must be available."
- )
- WSAStringToAddressA = not_windows
- WSAAddressToStringA = not_windows
-
-
- def inet_pton(address_family, ip_string):
- addr = sockaddr()
- addr.sa_family = address_family
- addr_size = ctypes.c_int(ctypes.sizeof(addr))
-
- if WSAStringToAddressA(
- ip_string,
- address_family,
- None,
- ctypes.byref(addr),
- ctypes.byref(addr_size)
- ) != 0:
- raise socket.error(ctypes.FormatError())
-
- if address_family == socket.AF_INET:
- return ctypes.string_at(addr.ipv4_addr, 4)
- if address_family == socket.AF_INET6:
- return ctypes.string_at(addr.ipv6_addr, 16)
-
- raise socket.error('unknown address family')
-
-
- def inet_ntop(address_family, packed_ip):
- addr = sockaddr()
- addr.sa_family = address_family
- addr_size = ctypes.c_int(ctypes.sizeof(addr))
- ip_string = ctypes.create_string_buffer(128)
- ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
-
- if address_family == socket.AF_INET:
- if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
- raise socket.error('packed IP wrong length for inet_ntoa')
- ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
- elif address_family == socket.AF_INET6:
- if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
- raise socket.error('packed IP wrong length for inet_ntoa')
- ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
- else:
- raise socket.error('unknown address family')
-
- if WSAAddressToStringA(
- ctypes.byref(addr),
- addr_size,
- None,
- ip_string,
- ctypes.byref(ip_string_size)
- ) != 0:
- raise socket.error(ctypes.FormatError())
-
- return ip_string[:ip_string_size.value - 1]
-
- # Adding our two functions to the socket library
- if os.name == 'nt':
- socket.inet_pton = inet_pton
- socket.inet_ntop = inet_ntop
-
- def inet_to_str(inet):
- return socket.inet_ntop(socket.AF_INET, inet)
-
- def print_packets(pcap):
- """Print out information about each packet in a pcap
- Args:
- pcap: dpkt pcap reader object (dpkt.pcap.Reader)
- """
- # packet num count
- r_num = 0
- # For each packet in the pcap process the contents
- for timestamp, buf in pcap:
- r_num=r_num+1
- print ('packet num count :' , r_num )
- # Print out the timestamp in UTC
- print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
-
- # Unpack the Ethernet frame (mac src/dst, ethertype)
- eth = dpkt.ethernet.Ethernet(buf)
- print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
-
- # Ma