使用Python加速并行读取大文件

2024-06-08 00:04:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要处理两个大文件(>;10亿行),并根据一个文件中特定行中的信息将每个文件拆分为小文件。 这些文件在blocks(我们称之为测序reads)中记录高通量测序数据,而每个read包含4行,(namesequencenquality)。read记录在两个文件中的顺序相同。你知道吗

待办事项

基于file2.fq中的id字段拆分file1.fq

这两个文件如下所示:

$ head -n 4 file1.fq
@name1_1
ACTGAAGCGCTACGTCAT
+
A#AAFJJJJJJJJFJFFF

$ head -n 4 file2.fq
@name1_2
TCTCCACCAACAACAGTG
+
FJJFJJJJJJJJJJJAJJ

为此,我编写了以下python函数:

def p7_bc_demx_pe(fn1, fn2, id_dict):
    """Demultiplex PE reads, by p7 index and barcode"""
    # prepare writers for each small files
    fn_writer = {}
    for i in id_dict:
        fn_writer[i] = [open(id_dict[i] + '.1.fq', 'wt'),
            open(id_dict[i] + '.2.fq', 'wt')]

    # go through each record in two files
    with open(fn1, 'rt') as f1, open(fn2, 'rt') as f2:
        while True:
            try:
                s1 = [next(f1), next(f1), next(f1), next(f1)]
                s2 = [next(f2), next(f2), next(f2), next(f2)]
                tag = func(s2) # a function to classify the record
                fn_writer[tag][0].write(''.join(s1))
                fn_writer[tag][1].write(''.join(s2))
            except StopIteration:
                break
    # close writers
    for tag in p7_bc_writer: 
        fn_writer[tag][0].close() # close writers
        fn_writer[tag][1].close() # close writers

问题

有没有办法加快这一进程?(以上功能太慢)

如何将大文件分割成具有特定lines(如f.seek())的块,并与多个内核并行运行该进程?你知道吗

编辑-1

每个文件总共读取5亿次(大小约为180 GB)。瓶颈是reading and writing文件。下面是我目前的解决方案(它很有效,但肯定不是最好的)

我首先使用shell命令将大文件拆分为小文件:split -l(需要3个小时)。你知道吗

然后,将这些函数并行应用于8个小文件(需要~1小时)

最后,合并结果(大约需要2小时)

not trying PySpark yet, thanks @John H


Tags: 文件idforclosetagopendictnext
1条回答
网友
1楼 · 发布于 2024-06-08 00:04:18

看看火花。您可以将文件分布在集群中,以获得更快的处理速度。有一个pythonapi:pyspark。你知道吗

https://spark.apache.org/docs/0.9.0/python-programming-guide.html

这也给了您实际执行Java代码的优势,它不会受到GIL的影响,并且允许真正的多线程。你知道吗

相关问题 更多 >

    热门问题