我目前正在编写一些定制的scapy层,用于解析/构建协议的数据包,我的大部分工作都在进行中。由于该协议位于TCP之上,我一直在使用StreamSocket发送/接收数据包,如下所示:
from scapy.all import *
s = socket.socket()
s.connect(('127.0.0.1', 1337))
ss = StreamSocket(s, MyLayer)
ans, unans = ss.sr(MyLayer())
我重载了层的answers()方法来定义什么构成“响应”。此协议使用类似于TCP的ACK编号,但它不是确认下一个数据包的序列号,而是确认接收到的最后一个数据包的序列号。除了SEQ和ACK数字之外,该层的数据包还包含一个长度字段和两个校验和字段,因此我重载了该层的post_build()方法来自动计算它们。总之,我的图层如下所示:
from numpy import uint8
from random import randint
from scapy.all import *
class MyLayer(Packet):
name = 'MyLayer'
fields_desc = [
XShortField('sop', 0xFF5A),
XShortField('len', None),
FlagsField('ctl', 0x40, 8, [
'ZERO0',
'ZERO1',
'ZERO2',
'SUS',
'RST',
'EAK',
'ACK',
'SYN'
]),
ByteField('seq', None),
ByteField('ack', None),
ByteField('sess', 0),
XByteField('chk', None)
StrLenField('data', '', length_from=lambda pkt: pkt.len - 10),
XByteField('datachk', None)
]
send_seq = uint8(randint(0, 255))
recv_seq = uint8(0)
def post_build(self, p, pay):
if self.len is None:
len_off = 2
len_size = 2
p = p[:len_off] + struct.pack('!H', len(p)) + p[len_off+len_size:]
if self.seq is None:
seq_off = 5
seq_size = 1
# Increments send_seq before sending by examining the stack trace.
for line in traceback.format_stack():
if '_sndrcv_snd' in line or 'in send' in line:
MyLayer.send_seq = uint8(MyLayer.send_seq + 1)
break
p = p[:seq_off] + MyLayer.send_seq + p[seq_off+seq_size:]
if self.ack is None:
ack_off = 6
ack_size = 1
p = p[:ack_off] + MyLayer.recv_seq + p[ack_off+ack_size:]
if self.chk is None:
chk_off = 8
chk_size = 1
chk = uint8(0x100 - sum(p[:chk_off]))
p = p[:chk_off] + chk + p[chk_off+chk_size:]
if self.datachk is None:
datachk_off = len(p) - 1
datachk = uint8(0x100 - sum(p[:datachk_off]))
p = p[:datachk_off] + datachk
return p + pay
def update_recv_seq(self):
MyLayer.recv_seq = uint8(self.seq)
def answers(self, other):
self.update_recv_seq()
if isinstance(other, MyLayer) and self.ack == other.seq:
return 1
return 0
不幸的是,当我将数据包的seq
设置为None
以便自动计算它时,answers()无法将收到的数据包识别为对我发送的数据包的响应,因为other.seq
仍然设置为None
。将行self.seq = MyLayer.send_seq
添加到post_build()的末尾似乎也不起作用。在重建层后,我求助于重铸原始字节,但这似乎是一个笨拙的解决方案:
pkts = rdpcap('path/to/cap.pcap')
for pkt in pkts:
if pkt.haslayer(MyLayer):
del pkt[MyLayer].seq
del pkt[MyLayer].ack
del pkt[MyLayer].chk
del pkt[MyLayer].datachk
to_send = MyLayer(raw(pkt[MyLayer]))
ans, unans = ss.sr(to_send)
有没有更好的方法来解决这个问题
目前没有回答
相关问题 更多 >
编程相关推荐