当前位置:   article > 正文

【Python】测试dpkt解析pcap

dpkt

1、前言

本想借助dpkt解析mail、dns、http来辅助分析pcap包进行分析,查阅资料学习却发现并不如使用scapy那么方便。

dpkt是一个python模块,可以对简单的数据包创建/解析,以及基本TCP / IP协议的解析,速度很快。

dpkt 手册

https://dpkt.readthedocs.io/en/latest/
dpkt 下载

https://pypi.org/project/dpkt/

看官方手册发现DPKT是读取每个pcap包里的内容,用isinstance判断是不是有IP的包,再判断是属于哪个协议,对应的协议已经封装好API如果发现可以匹配某个协议API就输出来相关值。

想要扩展这个源码还需要去学习一下协议相关的字段含义。

API调用:

https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq

在手册中找到了在Github中部分API的示例代码,具备参考价值。

https://github.com/jeffsilverm/dpkt_doc

2、手册例子

以下代码是手册中的例子,通过查询发现inet_pton无法直接使用,按照网络上的解决方法修改了一下。

打印数据包

使用DPKT读取pcap文件并打印出数据包的内容。打印出以太网帧和IP数据包中的字段。

python2测试代码:

  1. #!/usr/bin/env python
  2. """
  3. Use DPKT to read in a pcap file and print out the contents of the packets
  4. This example is focused on the fields in the Ethernet Frame and IP packet
  5. """
  6. import dpkt
  7. import datetime
  8. import socket
  9. from dpkt.compat import compat_ord
  10. import ctypes
  11. import os
  12. def mac_addr(address):
  13. """Convert a MAC address to a readable/printable string
  14. Args:
  15. address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
  16. Returns:
  17. str: Printable/readable MAC address
  18. """
  19. return ':'.join('%02x' % compat_ord(b) for b in address)
  20. class sockaddr(ctypes.Structure):
  21. _fields_ = [("sa_family", ctypes.c_short),
  22. ("__pad1", ctypes.c_ushort),
  23. ("ipv4_addr", ctypes.c_byte * 4),
  24. ("ipv6_addr", ctypes.c_byte * 16),
  25. ("__pad2", ctypes.c_ulong)]
  26. if hasattr(ctypes, 'windll'):
  27. WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
  28. WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
  29. else:
  30. def not_windows():
  31. raise SystemError(
  32. "Invalid platform. ctypes.windll must be available."
  33. )
  34. WSAStringToAddressA = not_windows
  35. WSAAddressToStringA = not_windows
  36. def inet_pton(address_family, ip_string):
  37. addr = sockaddr()
  38. addr.sa_family = address_family
  39. addr_size = ctypes.c_int(ctypes.sizeof(addr))
  40. if WSAStringToAddressA(
  41. ip_string,
  42. address_family,
  43. None,
  44. ctypes.byref(addr),
  45. ctypes.byref(addr_size)
  46. ) != 0:
  47. raise socket.error(ctypes.FormatError())
  48. if address_family == socket.AF_INET:
  49. return ctypes.string_at(addr.ipv4_addr, 4)
  50. if address_family == socket.AF_INET6:
  51. return ctypes.string_at(addr.ipv6_addr, 16)
  52. raise socket.error('unknown address family')
  53. def inet_ntop(address_family, packed_ip):
  54. addr = sockaddr()
  55. addr.sa_family = address_family
  56. addr_size = ctypes.c_int(ctypes.sizeof(addr))
  57. ip_string = ctypes.create_string_buffer(128)
  58. ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
  59. if address_family == socket.AF_INET:
  60. if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
  61. raise socket.error('packed IP wrong length for inet_ntoa')
  62. ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
  63. elif address_family == socket.AF_INET6:
  64. if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
  65. raise socket.error('packed IP wrong length for inet_ntoa')
  66. ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
  67. else:
  68. raise socket.error('unknown address family')
  69. if WSAAddressToStringA(
  70. ctypes.byref(addr),
  71. addr_size,
  72. None,
  73. ip_string,
  74. ctypes.byref(ip_string_size)
  75. ) != 0:
  76. raise socket.error(ctypes.FormatError())
  77. return ip_string[:ip_string_size.value - 1]
  78. # Adding our two functions to the socket library
  79. if os.name == 'nt':
  80. socket.inet_pton = inet_pton
  81. socket.inet_ntop = inet_ntop
  82. def inet_to_str(inet):
  83. return socket.inet_ntop(socket.AF_INET, inet)
  84. def print_packets(pcap):
  85. """Print out information about each packet in a pcap
  86. Args:
  87. pcap: dpkt pcap reader object (dpkt.pcap.Reader)
  88. """
  89. # packet num count
  90. r_num = 0
  91. # For each packet in the pcap process the contents
  92. for timestamp, buf in pcap:
  93. r_num=r_num+1
  94. print ('packet num count :' , r_num )
  95. # Print out the timestamp in UTC
  96. print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
  97. # Unpack the Ethernet frame (mac src/dst, ethertype)
  98. eth = dpkt.ethernet.Ethernet(buf)
  99. print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
  100. # Ma
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/200831
推荐阅读
相关标签
  

闽ICP备14008679号