并行文件匹配,Python

4 投票
4 回答
5824 浏览
提问于 2025-04-17 03:30

我正在尝试改进一个脚本,这个脚本用来扫描文件中的恶意代码。我们有一个包含正则表达式模式的文件,每行一个模式。这些正则表达式是为了配合grep使用,因为我们目前的实现基本上是一个bash脚本结合find和grep。这个bash脚本在我的基准目录上运行需要358秒。我写了一个python脚本,能在72秒内完成这个工作,但我想进一步提高效率。首先,我会发布基础代码,然后是我尝试过的改进:

import os, sys, Queue, threading, re

fileList = []
rootDir = sys.argv[1]

class Recurser(threading.Thread):

    def __init__(self, queue, dir):
    self.queue = queue
    self.dir = dir
    threading.Thread.__init__(self)

    def run(self):
    self.addToQueue(self.dir)

    ## HELPER FUNCTION FOR INTERNAL USE ONLY
    def addToQueue(self,  rootDir):
      for root, subFolders, files in os.walk(rootDir):
    for file in files:
       self.queue.put(os.path.join(root,file))
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)

class Scanner(threading.Thread):

    def __init__(self, queue, patterns):
    self.queue = queue
    self.patterns = patterns
    threading.Thread.__init__(self)

    def run(self):
    nextFile = self.queue.get()
    while nextFile is not -1:
       #print "Trying " + nextFile
       self.scanFile(nextFile)
       nextFile = self.queue.get()


    #HELPER FUNCTION FOR INTERNAL UES ONLY
    def scanFile(self, file):
       fp = open(file)
       contents = fp.read()
       i=0
       #for patt in self.patterns:
       if self.patterns.search(contents):
      print "Match " + str(i) + " found in " + file

############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################


fileQueue = Queue.Queue()

#Get the shell scanner patterns
patterns = []
fPatt = open('/root/patterns')
giantRE = '('
for line in fPatt:
   #patterns.append(re.compile(line.rstrip(), re.IGNORECASE))
   giantRE = giantRE + line.rstrip() + '|'

giantRE = giantRE[:-1] + ')'
giantRE = re.compile(giantRE, re.IGNORECASE)

#start recursing the directories
recurser = Recurser(fileQueue,rootDir)
recurser.start()

print "starting scanner"
#start checking the files
for scanner in xrange(0,8):
   scanner = Scanner(fileQueue, giantRE)
   scanner.start()

这显然是调试用的代码,看起来很糟糕,不用在意那些一堆的queue.put(-1),我稍后会整理的。有些缩进显示得不太对,特别是在scanFile函数里。

总之,我注意到一些事情。使用1、4甚至8个线程(在scanner中使用xrange(0,???))并没有什么区别。无论如何,我的执行时间还是大约72秒。我猜这是因为python的全局解释器锁(GIL)造成的。

为了避免使用一个巨大的正则表达式,我尝试将每一行(模式)作为一个编译过的正则表达式放在一个列表中,然后在我的scanfile函数中遍历这个列表。这导致了更长的执行时间。

为了避免python的GIL,我尝试让每个线程去调用grep,如下所示:

#HELPER FUNCTION FOR INTERNAL UES ONLY
def scanFile(self, file):
      s = subprocess.Popen(("grep", "-El", "--file=/root/patterns", file), stdout = subprocess.PIPE)
      output = s.communicate()[0]
      if output != '':
         print 'Matchfound in ' + file

这导致了更长的执行时间。

有没有什么建议可以提高性能呢?

:::::::::::::编辑::::::::

我还不能回答自己的问题,不过我可以回应几个提到的点:

@David Nehme - 只是想让大家知道我知道我有一堆queue.put(-1)。

@Blender - 这是为了标记队列的底部。我的扫描线程会一直从队列中取出,直到遇到-1,这个-1在队列的底部(while nextFile不是-1:)。处理器核心有8个,但由于GIL的原因,使用1个线程、4个线程或8个线程都没有区别。启动8个子进程导致代码运行明显变慢(142秒对比72秒)。

@ed - 是的,这和find\grep组合一样慢,实际上更慢,因为它会无差别地grep那些不需要的文件。

@Ron - 不能升级,这必须是通用的。你觉得这样会比72秒快吗?bash的grep需要358秒。我的python大正则表达式方法用1到8个线程能做到72秒。使用popen方法和8个线程(8个子进程)运行在142秒。所以到目前为止,python的大正则表达式方法明显是最好的。

@intued

这是我们当前find\grep组合的核心部分(不是我的脚本)。其实很简单。里面有一些额外的东西,比如ls,但没有什么应该导致5倍的减速。即使grep -r稍微高效一点,5倍的减速也是非常大的。

 find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -and -ls | tee -a "${HOME}/found.txt"

python代码更高效,我不知道为什么,但我通过实验测试过。我更喜欢用python来做。我已经通过python实现了5倍的提速,我希望能进一步加快。

:::::::::::::赢家赢家赢家:::::::::::::::::

看起来我们找到了赢家。

intued的shell脚本以34秒获得第二名,而@steveha的以24秒获得第一名。由于我们很多机器上没有python2.6,我不得不使用cx_freeze来处理。我可以写一个shell脚本包装器来wget一个tar包并解压。不过我还是喜欢intued的简单。

谢谢大家的帮助,我现在有了一个高效的系统管理工具。

4 个回答

1

如果你愿意升级到3.2版本或更高版本,你可以使用一个叫做concurrent.futures.ProcessPoolExecutor的东西。我觉得这个方法比你之前用的popen方法要快,因为它会提前创建一组进程,而你的popen方法每次都要新建一个进程。如果你因为某些原因不能升级到3.2版本,你也可以自己写代码来实现类似的功能。

5

我有点困惑,为什么你的Python脚本比你用的find/grep组合还要快。如果你想用grep,可以参考Ron Smith的建议,做一些类似下面的操作:

find -type f | xargs -d \\n -P 8 -n 100 grep --file=/root/patterns

这样可以启动grep进程,每个进程会处理100个文件,然后再退出,同时最多保持8个这样的进程在运行。让每个进程处理100个文件,可以让每个进程启动时的时间消耗变得很小。

注意xargs-d \\n选项是GNU的扩展,不是所有的POSIX系统都能用。这个选项是用来指定文件名之间的分隔符是换行符。虽然技术上文件名可以包含换行符,但实际上没人这么做,因为这样会影响工作。为了兼容非GNU的xargs,你需要在find中加上-print0选项,并在xargs中用-0替代-d \\n。这样就会让findxargs都用空字节\0(十六进制0x00)作为分隔符。

你也可以先统计一下要grep的文件数量:

NUMFILES="$(find -type f | wc -l)";

然后用这个数量来平均分配给8个进程(假设用bash作为命令行)。

find -type f | xargs -d \\n -P 8 -n $(($NUMFILES / 8 + 1)) grep --file=/root/patterns

我觉得这样可能效果更好,因为find的磁盘输入输出不会干扰到各个grep的磁盘输入输出。我想这也部分取决于文件的大小,以及它们是否是连续存储的——如果是小文件,磁盘本来就会频繁寻址,所以影响不大。还要注意的是,特别是如果你的内存足够,后续运行这样的命令会更快,因为一些文件会被保存在你的内存缓存中。

当然,你可以把8这个数字参数化,这样可以更方便地尝试不同数量的并发进程。

正如ed.在评论中提到的,这种方法的性能可能还是比单进程的grep -r要差。我想这取决于你的磁盘[阵列]的速度、系统中的处理器数量等等。

5

我觉得,与其使用 threading 模块,不如用 multiprocessing 模块来解决你的 Python 问题。因为 Python 的线程可能会受到 GIL 的影响;但如果你使用多个 Python 进程,就不会有这个问题。

我认为对于你要做的事情,使用一组工作进程正合适。默认情况下,这个进程池会为你系统中的每个处理器核心分配一个进程。只需要调用 .map() 方法,传入要检查的文件名列表和执行检查的函数就可以了。

http://docs.python.org/library/multiprocessing.html

如果这个方法的速度没有比你的 threading 实现快,那我觉得 GIL 可能不是你的问题所在。

编辑:好的,我来添加一个可以运行的 Python 程序。这个程序使用工作进程池来打开每个文件,并在每个文件中搜索特定的模式。当某个工作进程找到匹配的文件名时,它会直接打印出来(输出到标准输出),这样你就可以把这个脚本的输出重定向到一个文件中,得到你的文件列表。

编辑:我觉得这个版本稍微容易读一些,更容易理解。

我测试过这个程序,在我的电脑上搜索 /usr/include 目录下的文件。搜索大约在半秒内完成。使用 find 命令配合 xargs 来尽量减少 grep 进程的数量,速度大约是 0.05 秒,快了大约 10 倍。不过我不喜欢使用复杂的命令来让 find 正常工作,我更喜欢 Python 的版本。而且在非常大的目录中,Python 的半秒可能会受到启动时间的影响,速度差距可能会小一些。也许半秒对于大多数用途来说已经足够快了!

import multiprocessing as mp
import os
import re
import sys

from stat import S_ISREG


# uncomment these if you really want a hard-coded $HOME/patterns file
#home = os.environ.get('HOME')
#patterns_file = os.path.join(home, 'patterns')

target = sys.argv[1]
size_limit = int(sys.argv[2])
assert size_limit >= 0
patterns_file = sys.argv[3]


# build s_pat as string like:  (?:foo|bar|baz)
# This will match any of the sub-patterns foo, bar, or baz
# but the '?:' means Python won't bother to build a "match group".
with open(patterns_file) as f:
    s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))

# pre-compile pattern for speed
pat = re.compile(s_pat)


def walk_files(topdir):
    """yield up full pathname for each file in tree under topdir"""
    for dirpath, dirnames, filenames in os.walk(topdir):
        for fname in filenames:
            pathname = os.path.join(dirpath, fname)
            yield pathname

def files_to_search(topdir):
    """yield up full pathname for only files we want to search"""
    for fname in walk_files(topdir):
        try:
            # if it is a regular file and big enough, we want to search it
            sr = os.stat(fname)
            if S_ISREG(sr.st_mode) and sr.st_size >= size_limit:
                yield fname
        except OSError:
            pass

def worker_search_fn(fname):
    with open(fname, 'rt') as f:
        # read one line at a time from file
        for line in f:
            if re.search(pat, line):
                # found a match! print filename to stdout
                print(fname)
                # stop reading file; just return
                return

mp.Pool().map(worker_search_fn, files_to_search(target))

撰写回答