并行文件匹配,Python

2024-09-29 22:35:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试改进一个脚本扫描文件恶意代码。我们在一个文件中有一个regex模式的列表,每行一个模式。这些regex用于grep,因为我们当前的实现基本上是bash脚本find\grep组合框。bash脚本在我的基准目录上花费了358秒。我能够在72秒内编写一个python脚本,但还想改进更多。首先,我将发布基本代码,然后我尝试了一些调整:

import os, sys, Queue, threading, re

fileList = []
rootDir = sys.argv[1]

class Recurser(threading.Thread):

    def __init__(self, queue, dir):
    self.queue = queue
    self.dir = dir
    threading.Thread.__init__(self)

    def run(self):
    self.addToQueue(self.dir)

    ## HELPER FUNCTION FOR INTERNAL USE ONLY
    def addToQueue(self,  rootDir):
      for root, subFolders, files in os.walk(rootDir):
    for file in files:
       self.queue.put(os.path.join(root,file))
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)

class Scanner(threading.Thread):

    def __init__(self, queue, patterns):
    self.queue = queue
    self.patterns = patterns
    threading.Thread.__init__(self)

    def run(self):
    nextFile = self.queue.get()
    while nextFile is not -1:
       #print "Trying " + nextFile
       self.scanFile(nextFile)
       nextFile = self.queue.get()


    #HELPER FUNCTION FOR INTERNAL UES ONLY
    def scanFile(self, file):
       fp = open(file)
       contents = fp.read()
       i=0
       #for patt in self.patterns:
       if self.patterns.search(contents):
      print "Match " + str(i) + " found in " + file

############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################


fileQueue = Queue.Queue()

#Get the shell scanner patterns
patterns = []
fPatt = open('/root/patterns')
giantRE = '('
for line in fPatt:
   #patterns.append(re.compile(line.rstrip(), re.IGNORECASE))
   giantRE = giantRE + line.rstrip() + '|'

giantRE = giantRE[:-1] + ')'
giantRE = re.compile(giantRE, re.IGNORECASE)

#start recursing the directories
recurser = Recurser(fileQueue,rootDir)
recurser.start()

print "starting scanner"
#start checking the files
for scanner in xrange(0,8):
   scanner = Scanner(fileQueue, giantRE)
   scanner.start()

这显然是调试\难看的代码,千万别介意队列.put(-1),我稍后会清理这个。有些压痕没有正确显示,尤其是在scanFile中。在

不管怎样,我注意到了一些事情。使用1,4,甚至8个线程(对于X范围内的扫描仪(0,???))没什么区别。不管怎样,我还有72秒的时间。我想这是由于python的GIL。在

与生成一个巨大的regex相反,我尝试将每一行(模式)作为compilex RE放在一个列表中,并在scanfile函数中遍历这个列表。这导致执行时间更长。在

为了避免python的GIL,我尝试让每个线程fork都指向grep,如下所示:

^{pr2}$

这导致执行时间更长。在

对提高绩效有什么建议吗。在

::::::::::编辑::::::::

我还不能回答我自己的问题,但是这里有几个问题的答案:

@David Nehme-只是想让人们知道我有一百万队列.put(-1)的

@Blender-标记队列的底部。我的扫描器线程一直在下降,直到到达底部的-1(而nextFile不是-1:)。处理器核心是8,但是由于GIL使用1个线程,4个线程,或8个线程没有区别。生成8个子进程导致代码明显变慢(142秒vs 72秒)

@ed-是的,而且它和find\grep组合框一样慢,实际上更慢,因为它不分青红皂白地greps不需要的文件

@Ron-不能升级,这一定是万能的。你认为这会加快72秒吗?贝什·格雷珀做了358秒。我的python giant RE方法使用1-8个线程执行72秒。popen方法w\8 thrads(8个子进程)运行时间为142秒。到目前为止,这个只重python的方法是一个明显的赢家

@插管

这是我们当前find\grep组合的主要部分(不是我的脚本)。很简单。还有一些其他的东西,比如ls,但是没有什么会导致5倍的减速。即使grep-r的效率稍微高一点,5倍的增长速度也是一个巨大的放缓。在

 find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -and -ls | tee -a "${HOME}/found.txt"

python代码更高效,我不知道为什么,但我通过实验测试了它。我更喜欢用python来做这个。我已经用python实现了5倍的加速,我想让它更快一些。在

:::::::::::优胜劣汰:::::::::::::::::

看来我们赢了。在

Intuied的shell脚本以34秒排在第二位,而@steveha的则以24秒排在第一位。由于我们很多盒子里没有Python2.6,我不得不把它冷冻起来。我可以编写一个shell脚本包装器来wget a tar并解包它。不过,为了简单起见,我确实喜欢Intuid。在

谢谢你们所有的帮助,我现在有了一个高效的系统管理工具


Tags: inselfre脚本queueputmaindef
3条回答

如果您愿意升级到3.2版或更高版本,可以利用concurrent.futures.ProcessPoolExecutor. 我认为它将比您尝试的popen方法提高性能,因为它将预先创建一个进程池,其中popen方法每次都会创建一个新进程。如果由于某种原因不能迁移到3.2版本,您可以编写自己的代码来为早期版本执行相同的操作。在

我认为,与其使用threading模块,不如将multiprocessing模块用于Python解决方案。Python线程可能会与GIL相冲突;如果只需要多个Python进程,GIL就不是问题。在

我认为对于你正在做的事情,一个工人进程池正是你想要的。默认情况下,对于系统处理器中的每个内核,池将默认为一个进程。只需使用要检查的文件名列表和执行检查的函数调用.map()方法。在

http://docs.python.org/library/multiprocessing.html

如果这不比您的threading实现快,那么我不认为GIL是您的问题。在

编辑:好的,我正在添加一个正在工作的Python程序。这使用一个工作进程池来打开每个文件并在每个文件中搜索模式。当worker找到匹配的文件名时,它只需将其打印(到标准输出),这样您就可以将此脚本的输出重定向到一个文件中,您就可以得到文件列表了。在

编辑:我认为这是一个稍微容易阅读的版本,更容易理解。在

我在电脑上搜索/usr/include中的文件,以此来计时。它在半秒钟内完成搜索。使用find管道通过xargs来运行尽可能少的grep进程,大约需要0.05秒,大约10倍的加速。但是我讨厌你必须使用巴洛克风格的怪异语言来使find正常工作,我喜欢Python版本。也许在真正大的目录上,差距会更小,因为Python的半秒时间一定是启动时间。也许半秒钟对大多数目的来说已经足够快了!在

import multiprocessing as mp
import os
import re
import sys

from stat import S_ISREG


# uncomment these if you really want a hard-coded $HOME/patterns file
#home = os.environ.get('HOME')
#patterns_file = os.path.join(home, 'patterns')

target = sys.argv[1]
size_limit = int(sys.argv[2])
assert size_limit >= 0
patterns_file = sys.argv[3]


# build s_pat as string like:  (?:foo|bar|baz)
# This will match any of the sub-patterns foo, bar, or baz
# but the '?:' means Python won't bother to build a "match group".
with open(patterns_file) as f:
    s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))

# pre-compile pattern for speed
pat = re.compile(s_pat)


def walk_files(topdir):
    """yield up full pathname for each file in tree under topdir"""
    for dirpath, dirnames, filenames in os.walk(topdir):
        for fname in filenames:
            pathname = os.path.join(dirpath, fname)
            yield pathname

def files_to_search(topdir):
    """yield up full pathname for only files we want to search"""
    for fname in walk_files(topdir):
        try:
            # if it is a regular file and big enough, we want to search it
            sr = os.stat(fname)
            if S_ISREG(sr.st_mode) and sr.st_size >= size_limit:
                yield fname
        except OSError:
            pass

def worker_search_fn(fname):
    with open(fname, 'rt') as f:
        # read one line at a time from file
        for line in f:
            if re.search(pat, line):
                # found a match! print filename to stdout
                print(fname)
                # stop reading file; just return
                return

mp.Pool().map(worker_search_fn, files_to_search(target))

我有点搞不懂Python脚本是如何比find/grep组合更快的。如果你想用一种类似于罗恩·史密斯在他的回答中建议的方式使用grep,你可以做如下的事情

find -type f | xargs -d \\n -P 8 -n 100 grep --file=/root/patterns

启动grep进程,该进程将在退出前处理100个文件,同时保持8个这样的进程处于活动状态。让它们处理100个文件应该可以使每个文件的进程启动开销时间可以忽略不计。在

注意:对xargs-d \\n选项是一个GNU扩展,它不能在所有POSIX-ish系统上工作。它指定文件名之间的*d*elimiter是一个换行符。虽然从技术上讲,文件名可以包含新行,但实际上没有人这样做并保留他们的工作。为了与非GNU xargs兼容,您需要将-print0选项添加到find中,并使用-0,而不是{}与{}。这将安排空字节\0(十六进制0x00)作为分隔符,find和{}。在

您也可以采取的方法,首先计算要重新映射的文件数

^{pr2}$

然后使用这个数字在8个进程中得到偶数拆分(假设bash为shell)

find -type f | xargs -d \\n -P 8 -n $(($NUMFILES / 8 + 1)) grep --file=/root/patterns

我认为这样做可能会更好,因为find的磁盘I/O不会干扰各种grep的磁盘I/O。我想这部分取决于文件有多大,以及它们是否连续存储-对于小文件,磁盘无论如何都会寻找很多,所以这无关紧要。还请注意,特别是如果您有相当数量的RAM,这样一个命令的后续运行将更快,因为一些文件将保存在您的内存缓存中。在

当然,您可以参数化8,以便更容易地试验不同数量的并发进程。在

正如ed.在评论中提到的,这种方法的性能很可能仍然不如单个进程grep -r那样令人印象深刻。我想这取决于磁盘[阵列]的相对速度、系统中处理器的数量等

相关问题 更多 >

    热门问题