并行文件匹配,Python
我正在尝试改进一个脚本,这个脚本用来扫描文件中的恶意代码。我们有一个包含正则表达式模式的文件,每行一个模式。这些正则表达式是为了配合grep使用,因为我们目前的实现基本上是一个bash脚本结合find和grep。这个bash脚本在我的基准目录上运行需要358秒。我写了一个python脚本,能在72秒内完成这个工作,但我想进一步提高效率。首先,我会发布基础代码,然后是我尝试过的改进:
import os, sys, Queue, threading, re
fileList = []
rootDir = sys.argv[1]
class Recurser(threading.Thread):
def __init__(self, queue, dir):
self.queue = queue
self.dir = dir
threading.Thread.__init__(self)
def run(self):
self.addToQueue(self.dir)
## HELPER FUNCTION FOR INTERNAL USE ONLY
def addToQueue(self, rootDir):
for root, subFolders, files in os.walk(rootDir):
for file in files:
self.queue.put(os.path.join(root,file))
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
self.queue.put(-1)
class Scanner(threading.Thread):
def __init__(self, queue, patterns):
self.queue = queue
self.patterns = patterns
threading.Thread.__init__(self)
def run(self):
nextFile = self.queue.get()
while nextFile is not -1:
#print "Trying " + nextFile
self.scanFile(nextFile)
nextFile = self.queue.get()
#HELPER FUNCTION FOR INTERNAL UES ONLY
def scanFile(self, file):
fp = open(file)
contents = fp.read()
i=0
#for patt in self.patterns:
if self.patterns.search(contents):
print "Match " + str(i) + " found in " + file
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
fileQueue = Queue.Queue()
#Get the shell scanner patterns
patterns = []
fPatt = open('/root/patterns')
giantRE = '('
for line in fPatt:
#patterns.append(re.compile(line.rstrip(), re.IGNORECASE))
giantRE = giantRE + line.rstrip() + '|'
giantRE = giantRE[:-1] + ')'
giantRE = re.compile(giantRE, re.IGNORECASE)
#start recursing the directories
recurser = Recurser(fileQueue,rootDir)
recurser.start()
print "starting scanner"
#start checking the files
for scanner in xrange(0,8):
scanner = Scanner(fileQueue, giantRE)
scanner.start()
这显然是调试用的代码,看起来很糟糕,不用在意那些一堆的queue.put(-1),我稍后会整理的。有些缩进显示得不太对,特别是在scanFile函数里。
总之,我注意到一些事情。使用1、4甚至8个线程(在scanner中使用xrange(0,???))并没有什么区别。无论如何,我的执行时间还是大约72秒。我猜这是因为python的全局解释器锁(GIL)造成的。
为了避免使用一个巨大的正则表达式,我尝试将每一行(模式)作为一个编译过的正则表达式放在一个列表中,然后在我的scanfile函数中遍历这个列表。这导致了更长的执行时间。
为了避免python的GIL,我尝试让每个线程去调用grep,如下所示:
#HELPER FUNCTION FOR INTERNAL UES ONLY
def scanFile(self, file):
s = subprocess.Popen(("grep", "-El", "--file=/root/patterns", file), stdout = subprocess.PIPE)
output = s.communicate()[0]
if output != '':
print 'Matchfound in ' + file
这导致了更长的执行时间。
有没有什么建议可以提高性能呢?
:::::::::::::编辑::::::::
我还不能回答自己的问题,不过我可以回应几个提到的点:
@David Nehme - 只是想让大家知道我知道我有一堆queue.put(-1)。
@Blender - 这是为了标记队列的底部。我的扫描线程会一直从队列中取出,直到遇到-1,这个-1在队列的底部(while nextFile不是-1:)。处理器核心有8个,但由于GIL的原因,使用1个线程、4个线程或8个线程都没有区别。启动8个子进程导致代码运行明显变慢(142秒对比72秒)。
@ed - 是的,这和find\grep组合一样慢,实际上更慢,因为它会无差别地grep那些不需要的文件。
@Ron - 不能升级,这必须是通用的。你觉得这样会比72秒快吗?bash的grep需要358秒。我的python大正则表达式方法用1到8个线程能做到72秒。使用popen方法和8个线程(8个子进程)运行在142秒。所以到目前为止,python的大正则表达式方法明显是最好的。
@intued
这是我们当前find\grep组合的核心部分(不是我的脚本)。其实很简单。里面有一些额外的东西,比如ls,但没有什么应该导致5倍的减速。即使grep -r稍微高效一点,5倍的减速也是非常大的。
find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -and -ls | tee -a "${HOME}/found.txt"
python代码更高效,我不知道为什么,但我通过实验测试过。我更喜欢用python来做。我已经通过python实现了5倍的提速,我希望能进一步加快。
:::::::::::::赢家赢家赢家:::::::::::::::::
看起来我们找到了赢家。
intued的shell脚本以34秒获得第二名,而@steveha的以24秒获得第一名。由于我们很多机器上没有python2.6,我不得不使用cx_freeze来处理。我可以写一个shell脚本包装器来wget一个tar包并解压。不过我还是喜欢intued的简单。
谢谢大家的帮助,我现在有了一个高效的系统管理工具。
4 个回答
如果你愿意升级到3.2版本或更高版本,你可以使用一个叫做concurrent.futures.ProcessPoolExecutor的东西。我觉得这个方法比你之前用的popen方法要快,因为它会提前创建一组进程,而你的popen方法每次都要新建一个进程。如果你因为某些原因不能升级到3.2版本,你也可以自己写代码来实现类似的功能。
我有点困惑,为什么你的Python脚本比你用的find/grep组合还要快。如果你想用grep
,可以参考Ron Smith的建议,做一些类似下面的操作:
find -type f | xargs -d \\n -P 8 -n 100 grep --file=/root/patterns
这样可以启动grep
进程,每个进程会处理100个文件,然后再退出,同时最多保持8个这样的进程在运行。让每个进程处理100个文件,可以让每个进程启动时的时间消耗变得很小。
注意:xargs
的-d \\n
选项是GNU的扩展,不是所有的POSIX系统都能用。这个选项是用来指定文件名之间的分隔符是换行符。虽然技术上文件名可以包含换行符,但实际上没人这么做,因为这样会影响工作。为了兼容非GNU的xargs
,你需要在find
中加上-print0
选项,并在xargs
中用-0
替代-d \\n
。这样就会让find
和xargs
都用空字节\0
(十六进制0x00
)作为分隔符。
你也可以先统计一下要grep的文件数量:
NUMFILES="$(find -type f | wc -l)";
然后用这个数量来平均分配给8个进程(假设用bash
作为命令行)。
find -type f | xargs -d \\n -P 8 -n $(($NUMFILES / 8 + 1)) grep --file=/root/patterns
我觉得这样可能效果更好,因为find
的磁盘输入输出不会干扰到各个grep
的磁盘输入输出。我想这也部分取决于文件的大小,以及它们是否是连续存储的——如果是小文件,磁盘本来就会频繁寻址,所以影响不大。还要注意的是,特别是如果你的内存足够,后续运行这样的命令会更快,因为一些文件会被保存在你的内存缓存中。
当然,你可以把8
这个数字参数化,这样可以更方便地尝试不同数量的并发进程。
正如ed.在评论中提到的,这种方法的性能可能还是比单进程的grep -r
要差。我想这取决于你的磁盘[阵列]的速度、系统中的处理器数量等等。
我觉得,与其使用 threading
模块,不如用 multiprocessing
模块来解决你的 Python 问题。因为 Python 的线程可能会受到 GIL 的影响;但如果你使用多个 Python 进程,就不会有这个问题。
我认为对于你要做的事情,使用一组工作进程正合适。默认情况下,这个进程池会为你系统中的每个处理器核心分配一个进程。只需要调用 .map()
方法,传入要检查的文件名列表和执行检查的函数就可以了。
http://docs.python.org/library/multiprocessing.html
如果这个方法的速度没有比你的 threading
实现快,那我觉得 GIL 可能不是你的问题所在。
编辑:好的,我来添加一个可以运行的 Python 程序。这个程序使用工作进程池来打开每个文件,并在每个文件中搜索特定的模式。当某个工作进程找到匹配的文件名时,它会直接打印出来(输出到标准输出),这样你就可以把这个脚本的输出重定向到一个文件中,得到你的文件列表。
编辑:我觉得这个版本稍微容易读一些,更容易理解。
我测试过这个程序,在我的电脑上搜索 /usr/include 目录下的文件。搜索大约在半秒内完成。使用 find
命令配合 xargs
来尽量减少 grep
进程的数量,速度大约是 0.05 秒,快了大约 10 倍。不过我不喜欢使用复杂的命令来让 find
正常工作,我更喜欢 Python 的版本。而且在非常大的目录中,Python 的半秒可能会受到启动时间的影响,速度差距可能会小一些。也许半秒对于大多数用途来说已经足够快了!
import multiprocessing as mp
import os
import re
import sys
from stat import S_ISREG
# uncomment these if you really want a hard-coded $HOME/patterns file
#home = os.environ.get('HOME')
#patterns_file = os.path.join(home, 'patterns')
target = sys.argv[1]
size_limit = int(sys.argv[2])
assert size_limit >= 0
patterns_file = sys.argv[3]
# build s_pat as string like: (?:foo|bar|baz)
# This will match any of the sub-patterns foo, bar, or baz
# but the '?:' means Python won't bother to build a "match group".
with open(patterns_file) as f:
s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))
# pre-compile pattern for speed
pat = re.compile(s_pat)
def walk_files(topdir):
"""yield up full pathname for each file in tree under topdir"""
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
pathname = os.path.join(dirpath, fname)
yield pathname
def files_to_search(topdir):
"""yield up full pathname for only files we want to search"""
for fname in walk_files(topdir):
try:
# if it is a regular file and big enough, we want to search it
sr = os.stat(fname)
if S_ISREG(sr.st_mode) and sr.st_size >= size_limit:
yield fname
except OSError:
pass
def worker_search_fn(fname):
with open(fname, 'rt') as f:
# read one line at a time from file
for line in f:
if re.search(pat, line):
# found a match! print filename to stdout
print(fname)
# stop reading file; just return
return
mp.Pool().map(worker_search_fn, files_to_search(target))