当前位置:   article > 正文

调用wireshark接口(tshark.exe)抓包,实现深度自定义,使用python处理抓包数据

tshark.exe
# _*_ coding: utf-8 _*_
"""
Time:     2022/11/16 21:05
Author:   Jyun
Version:  V 1.2
File:     demo.py
Blog:     https://ctrlcv.blog.csdn.net
"""

import datetime
import gzip
import json
import os
import time

import pyshark


class GetPacket:
    def __init__(self, network_name=None, item_filter=None):
        """
        :param network_name: 网卡名称
        :param item_filter: 抓包过滤条件/规则同wireshark
        """
        self.network_name = network_name
        tshark_path = r"D:\Program Files\Wireshark\tshark.exe"  # wireshark安装路径下的tshark.exe
        os.system(f'"{tshark_path}" -D')
        self.cap = pyshark.LiveCapture(
            interface=network_name, display_filter=item_filter, tshark_path=tshark_path, use_json=True, include_raw=True
        )

        self.requests_dict = {}
        # self.cap.close()

    def packet_callback(self, pkt):
        try:
            request_in = pkt.http.request_in
        except AttributeError:
            self.requests_dict[pkt.frame_info.number] = pkt
            return
        self.parse_http(self.requests_dict.pop(request_in), pkt)

    def action(self):
        print(f'\n\n[{time.ctime()}]', 'Network drive:', self.network_name, '\nListening...')
        self.cap.apply_on_packets(self.packet_callback)

    def gzip(self, data, charset):
        """ pytshark并没有帮我们进行gzip解压缩,需要手动判断解压一下
        :param data: 16进制原数据
        :param charset: 字符编码
        :return: 文本数据
        """
        try:
            # 先将16进制数据转换为字节,在进行gzip解压操作
            return gzip.decompress(bytearray.fromhex(data)).decode(charset)
        except ValueError:
            return data
        except EOFError:
            print('Parsing failed')
            return data

    def parse_hex16(self, data_hex16, charset):
        try:
            return bytes.fromhex(data_hex16).decode(charset)
        except UnicodeDecodeError:
            pass

    def _parse_headers(self, http_line):
        headers = {}
        for header_line in http_line:
            inx = header_line.index(':')
            headers.update({header_line[:inx]: header_line[inx + 1:].strip()})
        return headers

    # noinspection PyProtectedMember
    def parse_http(self, request, response):
        _inx = [k for k, v in request.http._all_fields.items() if isinstance(v, dict) and 'http.request.uri' in v][0]
        _http = getattr(request.http, _inx)
        print(f'[{time.ctime()}]', _http.uri, _http.method, _http.version, '\n')
        _content_type = response.http.get('content_type', '')
        _charset = 'GBK' if "charset=GBK" in _content_type else 'UTF-8'
        data: dict = {'Request': {
            'BaseInfo': {'method': _http.method, 'version': _http.version, 'url': request.http.full_uri},
            'Heasers': self._parse_headers(request.http.line),
            'Data': request.http.get('file_data')
        }, 'Response': {
            'Heasers': self._parse_headers(response.http.line),
            'Data': '原始数据: ' + str(response.http.get('file_data'))
        }}
        # 处理响应数据
        if 'application' in _content_type or 'text' in _content_type:
            if response.http.get('content_encoding') == 'gzip' and response.http.get('data'):  # 如果经过了gzip压缩,需要先使用原数据解压
                data['Response']['Data'] = self.gzip(response.http.data, _charset)
            elif response.http.get('file_data_raw'):  # 解决中文乱码问题
                file_data = self.parse_hex16(response.http.get('file_data_raw')[0], _charset)
                if file_data:
                    data['Response']['Data'] = file_data

        self.write_file(data)

    def write_file(self, data):
        _time = datetime.datetime.now().strftime(f"%Y%m%d_{int(time.time() // 1000)}")
        try:
            with open(f'packet_{_time}.txt', 'a', encoding='GBK') as f:
                f.write(json.dumps(data, indent=4, ensure_ascii=False))
                f.write('\n')
        except UnicodeEncodeError:
            with open(f'packet_{_time}.txt', 'a', encoding='UTF-8') as f:
                f.write(json.dumps(data, indent=4, ensure_ascii=False))
                f.write('\n')


if __name__ == '__main__':
    # gp = GetPacket('VPN2 - VPN Client', 'http')
    gp = GetPacket(item_filter='http')
    gp.action()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/326643
推荐阅读
相关标签
  

闽ICP备14008679号