如何将文件分块以进行多进程处理

0 投票
2 回答
1250 浏览
提问于 2025-04-17 20:35

我有一个大约1.5GB的文件,我想把这个文件分成几个小块,这样我就可以用多进程来处理每个小块,使用的是Python中的pp(并行Python)模块。到目前为止,我一直在用f.seek这个方法,但这个方法速度很慢,因为它是一个字节一个字节地移动。那么有没有其他的方法呢?我能不能通过Python的mrjob(一个Map-Reduce包)来做到这一点?

示例代码:我正在做的事情是这样的

def multi(i,slots,,file_name,date):
f1=open(date+'/'+file_name,"rb")
f1.seek(i*slots*69)
data=f1.read(69)
counter=0
print 'process',i
while counter<slots:
    ##do some processing
    counter+=1
    data=f1.read(69)

我的每一行包含69个字节的元组数据,并且多功能函数会并行调用n次(这里的n等于插槽的数量)来完成工作。

2 个回答

1

为什么不对文件打开多个句柄呢?这样的话,你每个句柄只需要“查找”一次。

f1 = open('file')

f2 = open('file')
f2.seek(100) # in practice the number would be <file size>/<no of threads>

f3 = open('file')
f3.seek(200)
1

最简单的方法是创建一个公共的函数,用来读取记录并返回它。不过,这个函数是有锁保护的。就像下面这样。请注意,我不是Python程序员,所以你需要理解我的伪代码。

f = open file
l = new lock

function read
    acquire lock
        read record
    release lock
    return record

接下来,启动几个线程,但不要超过你的处理器核心数,每个线程都执行这个操作:

while not end of file
    record = read();
    process record

所以,不是为每条记录都启动一个新线程,而是有几个持久的线程在工作。

另一种方法是专门用一个线程来读取记录。这个线程负责读取记录,并把它们放进一个线程安全的队列里。这个队列的大小是有限制的(比如100条记录,10,000条记录,随你定)。处理的线程从这个队列中读取记录。这样做的好处是,读取线程可以在其他线程处理的时候继续填充队列。这样,处理线程就能很快获取到下一条记录。

撰写回答