可变嗅探停止

2024-10-01 13:34:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我发现了一个类似的问题: (Instance variables not being updated Python when using Multiprocessing), 但仍然不知道我的任务的解决方案。在

任务是在测试脚本完成后停止scapy sniff函数。单个测试脚本的运行时间可能会有很大的变化(从几秒钟到几小时)。我的嗅探功能在一个单独的威胁中运行。testscript在开始时调用init函数,该函数从另一个模块调用sniff函数。在

@classmethod
    def SaveFullTrafficPcap(self, TestCase, Termination):
        try:
            Full_Traffic = []
            PktList = []
            FullPcapName = Settings['GeneralSettings']['ResultsPath']+TestCase.TestCaseName +"Full_Traffic_PCAP.pcap"
            #while Term.Termination < 1:             
            Full_Traffic = sniff(lfilter=None, iface=str(Settings['GeneralSettings']['EthInterface']), store=True, prn = lambda x: Full_Traffic.append(x), count=0, timeout=Term.Termination)
            print(Full_Traffic)   
            wrpcap(FullPcapName, Full_Traffic)
        except(Exception):
            SYS.ABS_print("No full traffic PCAP file wirtten!\n")

在testscript的末尾调用一个exit函数。在我设置的退出函数中期限终止参数设置为1并等待5秒,但它不起作用。sniff函数被系统停止,我没有得到文件“FullPCAPName” 如果count或timeout得到一个值,那么代码就可以正常工作,并且我可以在我的接口上获得完整的通信量的FullPCAPName文件。在

有谁知道我在完成测试脚本后怎么能经常停止嗅探功能?在


Tags: 函数功能脚本settingscountpcaptestcasefull
2条回答

现在我解决了全局变量的问题。不好看,但效果很好。在

不过,我对变量嗅探停止的更好的解决方案感兴趣。在

对我来说,使用指定的stop{a1}命令很有效。为了方便起见,我复制了下面的HenningCash's代码:

import time, threading
from scapy.all import sniff
e = threading.Event()
def _sniff(e):
    a = sniff(filter="tcp port 80", stop_filter=lambda p: e.is_set())
    print("Stopped after %i packets" % len(a))

print("Start capturing thread")
t = threading.Thread(target=_sniff, args=(e,))
t.start()

time.sleep(3)
print("Try to shutdown capturing...")
e.set()

# This will run until you send a HTTP request somewhere
# There is no way to exit clean if no package is received
while True:
    t.join(2)
    if t.is_alive():
        print("Thread is still running...")
    else:
        break

print("Shutdown complete!")

但是,您仍然需要等待最后一个包被嗅探,这在您的场景中可能并不理想。在

相关问题 更多 >