使用Scapy嗅探和发送UDP流量

2024-10-01 13:30:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我按照下面的教程在Python中实现了一个包嗅探器:

http://www.binarytides.com/python-packet-sniffer-code-linux/

在收到每个UDP包时,我想发送一个已经保存的pcap文件(测试.pcap). 以下代码片段显示了我的实现:

# receive a packet
while True:
  packet = s.recvfrom(65565)

  #packet string from tuple
  packet = packet[0]

  #parse ethernet header
  eth_length = 14

  eth_header = packet[:eth_length]
  eth = unpack('!6s6sH' , eth_header)
  eth_protocol = socket.ntohs(eth[2])
  print 'Destination MAC : ' + eth_addr(packet[0:6]) + ' Source MAC : ' + 
    eth_addr(packet[6:12]) + ' Protocol : ' + str(eth_protocol)

  if eth_addr(packet[6:12]) != my_MAC_address:

      #Parse IP packets, IP Protocol number = 8
      if eth_protocol == 8 :
      #Parse IP header
      #take first 20 characters for the ip header
      ip_header = packet[eth_length:20+eth_length]

      #now unpack them :)
      iph = unpack('!BBHHHBBH4s4s' , ip_header)

      version_ihl = iph[0]
      version = version_ihl >> 4
      ihl = version_ihl & 0xF

      iph_length = ihl * 4

      ttl = iph[5]
      protocol = iph[6]
      s_addr = socket.inet_ntoa(iph[8]);
      d_addr = socket.inet_ntoa(iph[9]);

      print 'Version : ' + str(version) + ' IP Header Length : ' + str(ihl) + ' TTL : ' + str(ttl) + ' Protocol : ' + str(protocol) + ' Source Address : ' + str(s_addr) + ' Destination Address : ' + str(d_addr)


      #UDP packets
      if protocol == 17 :
         u = iph_length + eth_length
         udph_length = 8
         udp_header = packet[u:u+8]

         #now unpack them :)
         udph = unpack('!HHHH' , udp_header)

         source_port = udph[0]
         dest_port = udph[1]
         length = udph[2]
         checksum = udph[3]

         print 'Source Port : ' + str(source_port) + ' Dest Port : ' + str(dest_port) + ' Length : ' + str(length) + ' Checksum : ' + str(checksum)

         h_size = eth_length + iph_length + udph_length
         data_size = len(packet) - h_size

         #get data from the packet
         data = packet[h_size:]

         print 'Data : ' + data
         my_pkt = rdpcap("test.pcap")
         sendp(my_pkt)

在测试.pcap包含UDP_src=7777且UDP_dest=9999的UDP数据包。在

使用netcat生成流量,如下所示:

^{pr2}$

嗅探器只能接收第一个netcat消息并发送测试.pcap作为回应。但随后的netcat消息根本没有收到。但是,在netcat中使用UDP端口的任何其他组合,嗅探器工作得很好。例如:将netcat作为:

nc -u -p 8888 ip_dst_addr 9999

没有问题,我可以发送测试.pcap响应每个UDP包/msg。在

任何帮助将不胜感激!在


Tags: packetversionpcapprotocollengthheaderethaddr
1条回答
网友
1楼 · 发布于 2024-10-01 13:30:47

Scapy有几个内置的嗅探器,非常容易使用。在

>>> help(sniff)
Help on function sniff in module scapy.arch.windows.compatibility:

sniff(count=0, store=1, offline=None, prn=None, stop_filter=None, lfilter=None, L2socket=None, timeout=None, *arg, **karg)
    Sniff packets
    sniff([count=0,] [prn=None,] [store=1,] [offline=None,] [lfilter=None,] + L2ListenSocket args) -> list of packets
    Select interface to sniff by setting conf.iface. Use show_interfaces() to see interface names.
      count: number of packets to capture. 0 means infinity
      store: whether to store sniffed packets or discard them
        prn: function to apply to each packet. If something is returned,
             it is displayed. Ex:
             ex: prn = lambda x: x.summary()
     filter: provide a BPF filter
    lfilter: python function applied to each packet to determine
             if further action may be done
             ex: lfilter = lambda x: x.haslayer(Padding)
    offline: pcap file to read packets from, instead of sniffing them
    timeout: stop sniffing after a given time (default: None)
    L2socket: use the provided L2socket
    stop_filter: python function applied to each packet to determine
                 if we have to stop the capture after this packet
                 ex: stop_filter = lambda x: x.haslayer(TCP)

这意味着你可以简单地做:

packets = rdpcap("test.pcap")
sniff(lfilter=lambda x: x.haslayer(UDP) and x[Ether].src==sending_mac and x[UDP].sport==port, prn=lambda x: send(packets))

这将把所有UDP数据包附加到测试.pcap文件

相关问题 更多 >