Python中并行读取和过滤文件

2024-04-16 20:51:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个简单的脚本,首先读取一个CSV表(95MB,672343行),然后从中创建5个列表(chrs、type、name、start、end)。然后它打开另一个文件(37MB,795516行),读取每一行并将其与进行比较,如果一切正常,则将字符串写入输出文件。这需要很多时间。在

for line in inputFile:
    line = line[0:len(line) - 1]
    arr = line.split('\t')
    for i in xrange(len(chrs)):
        if (arr[0]==chrs[i]) and (int(arr[1])>=int(starts[i])) and  (int(arr[1])<=int(ends[i])):
            outputFile.write(chrs[i]+ '\t' + types[i] + '\t' + starts[i] + '\t' + ends[i] + '\t' + names[i] + '\t' + line + '\n');

我想知道是否可以并行运行,我可以访问服务器,在那里我可以同时运行多达10个进程。在


Tags: and文件csvin脚本列表forlen
2条回答

我可以想出一些可以加快进度的方法,从第一个文件重新排列数据开始。在

与其将它变成5个单独的lists,不如将其设为tuples的lists的dict,并将chr值作为键:

import csv
import collections
import bisect

# Use a defaultdict so we don't have to worry about whether a chr already exists
foobars = collections.defaultdict(list)
with open('file1.csv', 'rb') as csvfile:
    rdr = csv.reader(csvfile)
    for (chrs, typ, name, start, end) in rdr:
        foobars[chrs].append((int(start), int(end), typ, name))

然后对foobars中的每个列表进行排序(显然,您应该将其重命名为适合您的任务的名称),这将首先按start值排序,因为我们将其放在元组中的第一个值:

^{pr2}$

现在要处理第二个文件:

for line in inputFile:
    line = line.rstrip('\n')
    arr = line.split('\t')
    arr1int = int(arr[1])
    # Since we rearranged our data, we only have to check one of our sublists
    search = foobars[arr[0]]
    # We use bisect to quickly find the first item where the start value
    # is higher than arr[1]
    highest = bisect.bisect(search, (arr1int + 1,))
    # Now we have a much smaller number of records to check, and we've 
    # already ensured that chr is a match, and arr[1] >= start
    for (start, end, typ, name) in search[:highest]:
        if arr1int <= end:
            outputFile.write('\t'.join((arr[0], typ, str(start), str(end), name, line)) + '\n')

bisect.bisect()行需要一点额外的解释。如果您有一个排序的值序列,bisect可用于查找新值将插入序列的位置。{{cd7>在这里,我们首先要想的是,这些概念是如何用的。奇怪的(arr1int + 1,)值只是确保我们包括start == arr[1]的所有条目,并将其转换为元组,以便我们比较相似的值。在

这几乎可以肯定地提高代码的性能。我实在没资格说。在

如果没有输入数据,我就无法真正测试这段代码,因此几乎可以肯定存在一些小错误。希望它们能很容易修复。

问题是,你重复672343*795516=534'859'613'988次,这太多了。你需要一个更聪明的解决方案。在

所以我们发现问题是我们看了太多的数据,我们需要改变这一点。一个方法就是试着变得聪明。也许创建一个字典,其中的键对应于chr,所以我们只需要检查这些条目。但是我们还没有处理start和{}。也许也有一个聪明的方法

这看起来很像数据库。所以如果它是一个数据库,也许我们应该把它当作一个数据库。Python附带了sqlite3。在

这里有一个解决方案,但还有无数的其他可能性。在

import sqlite3
import csv

# create an in-memory database
conn = sqlite3.connect(":memory:")

# create the tables
c = conn.cursor()
c.execute("""CREATE TABLE t1 (
    chr   TEXT,
    type  TEXT,
    name  TEXT,
    start INTEGER,
    end   INTEGER
);""")

# if you only have a few columns, just name them all,
# if you have a lot, maybe just put everything in one
# column as a string
c.execute("""CREATE TABLE t2 (
    chr TEXT,
    num INTEGER,
    col3,
    col4
);""")

# create indices on the columns we use for selecting
c.execute("""CREATE INDEX i1 ON t1 (chr, start, end);""")
c.execute("""CREATE INDEX i2 ON t2 (chr, num);""")

# fill the tables
with open("comparison_file.csv", 'rb') as f:
    reader = csv.reader(f)
    # sqlite takes care of converting the number-strings to numbers
    c.executemany("INSERT INTO t1 VALUES (?, ?, ?, ?, ?)", reader)

with open("input.csv", 'rb') as f:
    reader = csv.reader(f)
    # sqlite takes care of converting the number-strings to numbers
    c.executemany("INSERT INTO t2 VALUES (?, ?, ?, ?)", reader)

# now let sqlite do its magic and select the correct lines
c.execute("""SELECT t2.*, t1.* FROM t1
             JOIN t2 ON t1.chr == t2.chr
             WHERE t2.num BETWEEN t1.start AND t1.end;""")

# write result to disk
with open("output.csv", "wb") as f:
    writer = csv.writer(f)
    for row in c:
        writer.writerow(row)

Python编码技巧

下面是我如何编写您的原始代码。在

^{pr2}$

备注1

line = line[0:len(line) - 1]

可以写成

^{4}$

备注2

而不是

my_list = [1,2,3]
for i in xrange(len(my_list)):
    # do something with my_list[i]

您应该:

my_list = [1,2,3]
for item in my_list:
    # do something with item

如果需要索引,请将其与enumerate()合并。在

相关问题 更多 >