我正在尝试为一项任务创建工人,这项任务涉及阅读大量文件并对其进行分析。你知道吗
我想要这样的东西:
list_of_unique_keys_from_csv_file = [] # About 200mb array (10m rows)
# a list of uniquekeys for comparing inside worker processes to a set of flatfiles
我需要更多的线程,因为它是非常慢,做一个进程(每个文件10分钟)比较。你知道吗
我有另一组平面文件,我比较CSV文件,看看是否存在唯一的键。这似乎是一个地图减少类型的问题。你知道吗
你知道吗主.py地址:
def worker_process(directory_glob_of_flat_files, list_of_unique_keys_from_csv_file):
# Do some parallel comparisons "if not in " type stuff.
# generate an array of
# lines of text like : "this item_x was not detected in CSV list (from current_flatfile)"
if current_item not in list_of_unique_keys_from_csv_file:
all_lines_this_worker_generated.append(sometext + current_item)
return all_lines_this_worker_generated
def main():
all_results = []
pool = Pool(processes=6)
partitioned_flat_files = [] # divide files from glob by 6
results = pool.starmap(worker_process, partitioned_flat_files, {{{{i wanna pass in my read-only parameter}}}})
pool.close()
pool.join()
all_results.extend(results )
resulting_file.write(all_results)
我同时使用linux和windows环境,所以可能需要一些跨平台兼容的东西(整个fork()讨论)。你知道吗
主要问题:我需要某种管道或队列吗?我似乎找不到好的例子来说明如何在一个大的只读字符串数组中进行传输,每个工作进程都有一个副本?
只需拆分只读参数,然后将其传入即可。
multiprocessing
模块是跨平台兼容的,所以不用担心。你知道吗实际上,每个进程,甚至子进程,都有自己的资源,这意味着无论您如何将参数传递给它,它都会保留原始进程的一个副本,而不是共享它。在这个简单的例子中,当您将参数从主进程传递到子进程时,
Pool
会自动生成变量的副本。因为子流程只有原始流程的副本,所以修改不能共享。在这种情况下,这并不重要,因为变量是只读的。你知道吗但是要小心你的代码,你需要把你需要的参数包装成一个 iterable集合,例如:
相关问题 更多 >
编程相关推荐