我遇到了python多处理的问题。在
我有一个.csv文件(~10gb)。我想从中读取数据,转换数据,然后将其保存到python shelve文件中。数据转换很慢,所以我考虑多处理。在
我创建了两个多进程队列,第一个用于保存从csv读取的数据,第二个用于保存转换后的数据。这两个队列工作正常;但是,问题是从第二个队列向搁置写入数据的过程在几个小时后停止。在
有人遇到同样的问题吗?谢谢你的建议!在
我的代码在这里:
def read_from_csv(csv_path, csv_queue):
count = 0
fh = open(csv_path, 'r')
for line in fh:
try:
text, ID = line.split('|')
text = text.split()
ID = ID.strip()
data = (ID, text)
csv_queue.put(data, True)
count += 1
n = csv_queue.qsize()
if n > 0 and n % 10000 == 0:
logger.info('Queue size is {}'.format(n))
# Let the function sleep for a while to prevent the queue becoming too long
if n >= 100000:
logger.info('CSV queue sleeps for 100 sec')
time.sleep(100)
if count % 10000 == 0:
logger.info('Read {} lines'.format(count))
except:
pass
logger.info('Done feeding rows into the Queue')
def entities_extraction(csv_queue, shelve_queue):
count = 0
while not csv_queue.empty():
try:
item = csv_queue.get(True, 300)
except:
logger.info('Not able to fetch data from queue. Quitting')
break
# some function here
# result = ......
shelve_queue.put(result, True)
count += 1
n = shelve_queue.qsize()
if count % 10000 == 0:
logger.info('Process {} lines'.format(count))
# Let the function sleep for a while to prevent the queue becoming too long
if n > 100000:
logger.info('Shelve queue sleeps for 100 sec')
time.sleep(100)
def write_to_shelve(shelve_file, csv_queue, shelve_queue):
sh = shelve.open(shelve_file, writeback=True)
count = 0
while True:
if count % 10000 == 0:
logger.info("Written {} records".format(count))
n = shelve_queue.qsize()
if n > 0 and n % 10000 == 0:
logger.info('Shelve queue size is {}'.format(n))
#if count > 50000:
# print('stop')
# break
try:
item = shelve_queue.get(True, 600)
id, token = item
sh[id] = token
count += 1
except:
logger.info("Not able to fetch data from queue. Quitting")
#time.sleep(5)
break
sh.close()
if __name__ == "__main__":
t0 = time.time()
csv_path =
shelve_path =
csv_queue = mp.Manager().Queue()
shelve_queue = mp.Manager().Queue()
process_to_launch = 7
pool = mp.Pool(process_to_launch)
pool.apply(read_from_csv, (csv_path, csv_queue,))
jobs = [pool.apply(entities_extraction, (csv_queue, shelve_queue,)) for i in range(5)]
result = pool.apply(write_to_shelve, (shelve_path, csv_queue, shelve_queue,))
#sh.close()
print(time.time() - t0)
pool.close()
pool.join()
部分日志信息是:
^{pr2}$
目前没有回答
相关问题 更多 >
编程相关推荐