耗费少量内存的工作排队
我正在尝试设置一个程序,其中一个线程负责写工作列表,另一个线程则读取这个列表并进行处理。这个列表可能会非常大,所以为了避免占用太多内存,我想把它写入一个文件中(或者用其他方法来节省内存,比如生成器)。
我做了一个简单的可运行示例,在写入线程中加了一个睡眠时间,这样读取线程就可以跟上。我在想,如何让读取线程在“超过”写入线程时不停止。我试过使用 .seek
和 .tell
,但遇到了一些奇怪的情况,不太确定这是不是正确的方法。
另外,我想问,这样做是否合理?也许还有更优雅的方法可以在不占用大量内存的情况下排队处理一系列字符串。
import threading,time
class Writer(threading.Thread):
lock= threading.Lock()
def __init__(self,file_path,size):
threading.Thread.__init__(self)
self.file_path= file_path
self.size= size
self.i=0
def how_many(self):
with self.lock:
print "Reader starting, writer is on",self.i
def run(self):
f=open(self.file_path,"w")
for i in xrange(self.size):
with self.lock:
self.i=i
if i%1000==0:
time.sleep(0.1)
f.write("%s\n"%i)
f.close()
class Reader(threading.Thread):
def __init__(self,file_path):
threading.Thread.__init__(self)
self.file_path= file_path
def run(self):
f=open(self.file_path,"r")
line=0
for line in f:
pass
print "Reader got to: %s"%line.strip()
if __name__ == "__main__":
a= Writer("testfile",2000000)
b= Reader("testfile")
a.start()
time.sleep(1)
a.how_many()
b.start()
3 个回答
多进程中的 JoinableQueue
类是为了帮助控制在等待子线程或子进程处理任务时,可能会积累的任务数量。假设你是从一个文件中读取数据,而这个文件太大,无法一次性全部放进内存。
下面是我尝试的一个解决方案,目的是限制内存的使用。在这个例子中,我处理的是以换行符结束的一系列日期,将它们转换成标准格式,然后再写回到一个新文件中。
我并不是多进程模块的专家,所以如果有人发现了错误或者更好的方法,我非常乐意听取建议。
from multiprocessing import Process, Queue, JoinableQueue
import time
date_formats = [
"%Y%m",
"%Y-%m-%d",
"%y-%m-%d",
"%y%m%d",
"%Y%m%d",
"%m/%d/%Y",
"%m/%d/%y",
"%m/%d/%Y %H:%M",
"%m%d%y",
"%m%d%Y",
"%B, %d %Y",
"%B, %d %y",
"%d %B, %Y",
"%d %B, %y",
"%B %d %Y",
"%B %d %y",
"%B %d, %Y",
"%B %d, %y",
"%B %d %Y",
"%B %d %y",
"%b %d %Y",
"%b %d, %Y",
"%b %d %y",
"%b %d, %y",
"%d-%b-%y",
"%Y-%m-%d %H:%M:%S"
]
def convert_date(date):
date = date.strip()
for dateformat in date_formats:
try:
converted = time.strptime(date, dateformat)
converted = time.strftime("%Y-%m-%d", converted)
return converted
except ValueError:
continue
def writer(result_queue):
f = open("iso_dates.out", "wb")
while True:
try:
date = result_queue.get(timeout=1)
f.write(date + '\n')
except:
break
f.close()
def worker(work_queue, result_queue):
while True:
date = work_queue.get()
if not date:
break
result_queue.put(convert_date(date))
work_queue.task_done()
dates = open("dates.out", "rb")
work_queue = JoinableQueue(512) #allow no more than 512 items on queue
result_queue = Queue()
writer_proc = Process(target=writer, args=(result_queue,))
worker_procs = 2
for i in range(worker_procs):
p = Process(target=worker, args=(work_queue, result_queue))
p.daemon = True
p.start()
writer_proc.start()
for date in dates:
work_queue.put(date) #will block until tasks are consumed if maxsize is encountered
work_queue.join()
dates.close()
在不同的线程之间发送消息时,Queue
类非常好用。你可以用from Queue import Queue
来导入它,然后创建一个队列对象,并把这个队列对象传给每个线程。这个队列支持多个生产者和消费者,你可以把几乎任何Python对象放进队列里,比如列表、对象、迭代器等等。
如果你想通过这个队列传输大量数据,只需一次写一个对象到队列中,然后在消费者中使用一个生成器函数,从队列中逐个提取数据。队列还支持深度限制,以防生产者的速度比消费者快。
我解决了这个问题,使用了一种叫做“缓冲文件队列”的方法,这个队列的内容分布在内存和文件之间。简单来说,就是把东西放进一个队列里,但如果队列里的东西超过了设定的大小,多出来的部分就会存到文件里,这样可以节省内存。取出这些东西的时候,还是可以像平常一样从队列里拿出来。
如果有人想做类似的事情,我把这个项目放在了github上,链接在这里。