Scapy:如何访问通过post_build()计算的层字段?

2024-10-02 08:29:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在编写一些定制的scapy层,用于解析/构建协议的数据包,我的大部分工作都在进行中。由于该协议位于TCP之上,我一直在使用StreamSocket发送/接收数据包,如下所示:

from scapy.all import *

s = socket.socket()
s.connect(('127.0.0.1', 1337))
ss = StreamSocket(s, MyLayer)
ans, unans = ss.sr(MyLayer())

我重载了层的answers()方法来定义什么构成“响应”。此协议使用类似于TCP的ACK编号,但它不是确认下一个数据包的序列号,而是确认接收到的最后一个数据包的序列号。除了SEQ和ACK数字之外,该层的数据包还包含一个长度字段和两个校验和字段,因此我重载了该层的post_build()方法来自动计算它们。总之,我的图层如下所示:

from numpy import uint8
from random import randint
from scapy.all import *

class MyLayer(Packet):
    name = 'MyLayer'
    fields_desc = [
        XShortField('sop', 0xFF5A),
        XShortField('len', None),
        FlagsField('ctl', 0x40, 8, [
            'ZERO0',
            'ZERO1',
            'ZERO2',
            'SUS',
            'RST',
            'EAK',
            'ACK',
            'SYN'
        ]),
        ByteField('seq', None),
        ByteField('ack', None),
        ByteField('sess', 0),
        XByteField('chk', None)
        StrLenField('data', '', length_from=lambda pkt: pkt.len - 10),
        XByteField('datachk', None)
    ]
    send_seq = uint8(randint(0, 255))
    recv_seq = uint8(0)

    def post_build(self, p, pay):
        if self.len is None:
            len_off = 2
            len_size = 2
            p = p[:len_off] + struct.pack('!H', len(p)) + p[len_off+len_size:]

        if self.seq is None:
            seq_off = 5
            seq_size = 1

            # Increments send_seq before sending by examining the stack trace.
            for line in traceback.format_stack():
                if '_sndrcv_snd' in line or 'in send' in line:
                    MyLayer.send_seq = uint8(MyLayer.send_seq + 1)
                    break

            p = p[:seq_off] + MyLayer.send_seq + p[seq_off+seq_size:]

        if self.ack is None:
            ack_off = 6
            ack_size = 1
            p = p[:ack_off] + MyLayer.recv_seq + p[ack_off+ack_size:]

        if self.chk is None:
            chk_off = 8
            chk_size = 1
            chk = uint8(0x100 - sum(p[:chk_off]))
            p = p[:chk_off] + chk + p[chk_off+chk_size:]

        if self.datachk is None:
            datachk_off = len(p) - 1
            datachk = uint8(0x100 - sum(p[:datachk_off]))
            p = p[:datachk_off] + datachk

        return p + pay
        
    def update_recv_seq(self):
        MyLayer.recv_seq = uint8(self.seq)

    def answers(self, other):
        self.update_recv_seq()
        if isinstance(other, MyLayer) and self.ack == other.seq:
            return 1
        return 0

不幸的是,当我将数据包的seq设置为None以便自动计算它时,answers()无法将收到的数据包识别为对我发送的数据包的响应,因为other.seq仍然设置为None。将行self.seq = MyLayer.send_seq添加到post_build()的末尾似乎也不起作用。在重建层后,我求助于重铸原始字节,但这似乎是一个笨拙的解决方案:

pkts = rdpcap('path/to/cap.pcap')
for pkt in pkts:
    if pkt.haslayer(MyLayer):
        del pkt[MyLayer].seq
        del pkt[MyLayer].ack
        del pkt[MyLayer].chk
        del pkt[MyLayer].datachk
        to_send = MyLayer(raw(pkt[MyLayer]))
        ans, unans = ss.sr(to_send)

有没有更好的方法来解决这个问题


Tags: selfnonesendsizelenif数据包seq

热门问题