赞
踩
# _*_ coding: utf-8 _*_ """ Time: 2022/11/16 21:05 Author: Jyun Version: V 1.2 File: demo.py Blog: https://ctrlcv.blog.csdn.net """ import datetime import gzip import json import os import time import pyshark class GetPacket: def __init__(self, network_name=None, item_filter=None): """ :param network_name: 网卡名称 :param item_filter: 抓包过滤条件/规则同wireshark """ self.network_name = network_name tshark_path = r"D:\Program Files\Wireshark\tshark.exe" # wireshark安装路径下的tshark.exe os.system(f'"{tshark_path}" -D') self.cap = pyshark.LiveCapture( interface=network_name, display_filter=item_filter, tshark_path=tshark_path, use_json=True, include_raw=True ) self.requests_dict = {} # self.cap.close() def packet_callback(self, pkt): try: request_in = pkt.http.request_in except AttributeError: self.requests_dict[pkt.frame_info.number] = pkt return self.parse_http(self.requests_dict.pop(request_in), pkt) def action(self): print(f'\n\n[{time.ctime()}]', 'Network drive:', self.network_name, '\nListening...') self.cap.apply_on_packets(self.packet_callback) def gzip(self, data, charset): """ pytshark并没有帮我们进行gzip解压缩,需要手动判断解压一下 :param data: 16进制原数据 :param charset: 字符编码 :return: 文本数据 """ try: # 先将16进制数据转换为字节,在进行gzip解压操作 return gzip.decompress(bytearray.fromhex(data)).decode(charset) except ValueError: return data except EOFError: print('Parsing failed') return data def parse_hex16(self, data_hex16, charset): try: return bytes.fromhex(data_hex16).decode(charset) except UnicodeDecodeError: pass def _parse_headers(self, http_line): headers = {} for header_line in http_line: inx = header_line.index(':') headers.update({header_line[:inx]: header_line[inx + 1:].strip()}) return headers # noinspection PyProtectedMember def parse_http(self, request, response): _inx = [k for k, v in request.http._all_fields.items() if isinstance(v, dict) and 'http.request.uri' in v][0] _http = getattr(request.http, _inx) print(f'[{time.ctime()}]', _http.uri, _http.method, _http.version, '\n') _content_type = response.http.get('content_type', '') _charset = 'GBK' if "charset=GBK" in _content_type else 'UTF-8' data: dict = {'Request': { 'BaseInfo': {'method': _http.method, 'version': _http.version, 'url': request.http.full_uri}, 'Heasers': self._parse_headers(request.http.line), 'Data': request.http.get('file_data') }, 'Response': { 'Heasers': self._parse_headers(response.http.line), 'Data': '原始数据: ' + str(response.http.get('file_data')) }} # 处理响应数据 if 'application' in _content_type or 'text' in _content_type: if response.http.get('content_encoding') == 'gzip' and response.http.get('data'): # 如果经过了gzip压缩,需要先使用原数据解压 data['Response']['Data'] = self.gzip(response.http.data, _charset) elif response.http.get('file_data_raw'): # 解决中文乱码问题 file_data = self.parse_hex16(response.http.get('file_data_raw')[0], _charset) if file_data: data['Response']['Data'] = file_data self.write_file(data) def write_file(self, data): _time = datetime.datetime.now().strftime(f"%Y%m%d_{int(time.time() // 1000)}") try: with open(f'packet_{_time}.txt', 'a', encoding='GBK') as f: f.write(json.dumps(data, indent=4, ensure_ascii=False)) f.write('\n') except UnicodeEncodeError: with open(f'packet_{_time}.txt', 'a', encoding='UTF-8') as f: f.write(json.dumps(data, indent=4, ensure_ascii=False)) f.write('\n') if __name__ == '__main__': # gp = GetPacket('VPN2 - VPN Client', 'http') gp = GetPacket(item_filter='http') gp.action()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。