我已将程序(如下)写给:
pandas dataframe
的形式读取一个巨大的文本文件groupby
使用特定的列值分割数据并存储为数据帧列表。multiprocess Pool.map()
以并行处理每个数据帧。一切都很好,程序在我的小测试数据集上运行良好。但是,当我输入大数据(大约14gb)时,内存消耗会呈指数级增长,然后冻结计算机或被杀死(在HPC集群中)。
一旦数据/变量不起作用,我就添加了清除内存的代码。游泳池一完工,我就把它关了。仍然有14GB的输入,我只期望有2*14GB的内存负担,但似乎很多事情正在进行中。我也尝试过使用chunkSize and maxTaskPerChild, etc
来调整,但是我没有看到在测试和大文件中优化的任何区别。
当我开始multiprocessing
时,我认为在这个代码位置需要对这个代码进行改进。
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
但是,我要把整个密码都发出去。
测试示例:我创建了一个高达250 mb的测试文件(“genome_matrix_final-chr1234-1mb.txt”)并运行了该程序。当我检查系统监视器时,我可以看到内存消耗增加了大约6gb。我不太清楚为什么250MB的文件加上一些输出会占用这么多内存空间。我已经通过drop box共享了这个文件,如果它有助于看到真正的问题的话。https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0
有人能建议我怎样才能解决这个问题吗?
我的python脚本:
#!/home/bin/python3
import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource
print()
print('Checking required modules')
print()
''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt" # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt" # test file 02
#genome_matrix_file = "genome_matrix_final.txt" # large file
def main():
with open("genome_matrix_header.txt") as header:
header = header.read().rstrip('\n').split('\t')
print()
time01 = time.time()
print('starting time: ', time01)
'''load the genome matrix file onto pandas as dataframe.
This makes is more easy for multiprocessing'''
gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)
# now, group the dataframe by chromosome/contig - so it can be multiprocessed
gen_matrix_df = gen_matrix_df.groupby('CHROM')
# store the splitted dataframes as list of key, values(pandas dataframe) pairs
# this list of dataframe will be used while multiprocessing
gen_matrix_df_list = collections.OrderedDict()
for chr_, data in gen_matrix_df:
gen_matrix_df_list[chr_] = data
# clear memory
del gen_matrix_df
'''Now, pipe each dataframe from the list using map.Pool() '''
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
del gen_matrix_df_list # clear memory
p.close()
p.join()
# concat the results from pool.map() and write it to a file
result_merged = pd.concat(result)
del result # clear memory
pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)
print()
print('completed all process in "%s" sec. ' % (time.time() - time01))
print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
print()
'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):
print()
time02 = time.time()
# index position of the samples in genome matrix file
sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
{'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
{'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
{'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
{'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
{'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
{'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
{'8a': 32, '8b': 17}]
# sample index stored as ordered dictionary
sample_idx_ord_list = []
for ids in sample_idx:
ids = collections.OrderedDict(sorted(ids.items()))
sample_idx_ord_list.append(ids)
# for haplotype file
header = ['contig', 'pos', 'ref', 'alt']
# adding some suffixes "PI" to available sample names
for item in sample_idx_ord_list:
ks_update = ''
for ks in item.keys():
ks_update += ks
header.append(ks_update+'_PI')
header.append(ks_update+'_PG_al')
#final variable store the haplotype data
# write the header lines first
haplotype_output = '\t'.join(header) + '\n'
# to store the value of parsed the line and update the "PI", "PG" value for each sample
updated_line = ''
# read the piped in data back to text like file
matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)
matrix_df = matrix_df.rstrip('\n').split('\n')
for line in matrix_df:
if line.startswith('CHROM'):
continue
line_split = line.split('\t')
chr_ = line_split[0]
ref = line_split[2]
alt = list(set(line_split[3:]))
# remove the alleles "N" missing and "ref" from the alt-alleles
alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))
# if no alt alleles are found, just continue
# - i.e : don't write that line in output file
if len(alt_up) == 0:
continue
#print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
#so, we have data for CHR, POS, REF, ALT so far
# now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
sample_data_for_vcf = []
for ids in sample_idx_ord_list:
sample_data = []
for key, val in ids.items():
sample_value = line_split[val]
sample_data.append(sample_value)
# now, update the phased state for each sample
# also replacing the missing allele i.e "N" and "-" with ref-allele
sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
sample_data_for_vcf.append(str(chr_))
sample_data_for_vcf.append(sample_data)
# add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
# and .. write it to final haplotype file
sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
'\t' + sample_data_for_vcf + '\n'
haplotype_output += updated_line
del matrix_df # clear memory
print('completed haplotype preparation for chromosome/contig "%s" '
'in "%s" sec. ' %(chr_, time.time()-time02))
print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))
# return the data back to the pool
return pd.read_csv(io.StringIO(haplotype_output), sep='\t')
''' to monitor memory '''
def current_mem_usage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.
if __name__ == '__main__':
main()
赏金猎人更新:
我已经使用Pool.map()
实现了多处理,但是代码造成了很大的内存负担(输入测试文件~300MB,但内存负担大约为6GB)。我只期望3*300MB的最大内存负载
使用
multiprocessing.Pool
时,将使用fork()
系统调用创建许多子进程。这些进程中的每一个都是从当时父进程的内存的精确副本开始的。因为在创建大小为3的Pool
之前要加载csv,所以池中这3个进程中的每一个都将不必要地拥有数据帧的副本。(gen_matrix_df
和gen_matrix_df_list
将存在于当前进程以及3个子进程中的每个进程中,因此这些结构的每个副本都将存在于内存中)在加载应该减少内存使用的文件(实际上是在文件的最开始)之前,尝试创建
Pool
。如果仍然太高,您可以:
将gen_matrix_dfu_list转储到文件中,每行1个项目,例如:
在迭代器上对转储到此文件的行使用
Pool.imap()
,例如:(注意,
matrix_to_vcf
在上面的示例中接受一个(key, value)
元组,而不仅仅是一个值)我希望这会有帮助。
注:我还没有测试过上面的代码。只是为了证明这个想法。
先决条件
在Python中(在下面我使用Python 3.6.5的64位构建),一切都是一个对象。这有其开销,使用^{} 我们可以准确地看到对象的大小(字节):
multiprocessing.get_start_method()
)创建子进程时,不会复制父进程的物理内存,而是使用copy-on-write技术。数据帧
不让我们单独看你的} 会帮助我们的。
DataFrame
。^{仅Pd.py
现在让我们使用探查器:
我们可以看到情节:
逐行追踪:
我们可以看到,数据帧在构建时需要~2 GiB,峰值为~3 GiB。更有趣的是^{} 的输出。
但是
info(memory_usage='deep')
(“deep”是指通过询问object
dtype
s对数据进行深入的自省,见下文)给出:啊?!从这个过程的外部来看,我们可以确保} 。
memory_profiler
的数字是正确的。sys.getsizeof
还显示帧的相同值(很可能是由于自定义的__sizeof__
),使用它来估计分配的gc.get_objects()
的其他工具也会显示相同的值,例如^{给出:
那么这7.93吉布是从哪里来的呢?我们来解释一下。我们有4M行和34列,这给了我们134M的值。它们要么是
int64
,要么是object
(这是一个64位指针;有关详细说明,请参见using pandas with large data)。因此,我们只有134 * 10 ** 6 * 8 / 2 ** 20
~1022 MiB用于数据帧中的值。剩下的6.93吉布呢?串接
为了理解这种行为,有必要知道Python确实在进行string interning。关于Python 2中的字符串实习,有两篇很好的文章(one,two)。除了Python 3中的Unicode更改和python3.3中的PEP 393之外,C结构也发生了变化,但想法是一样的。基本上,看起来像标识符的每个短字符串都将由Python缓存在内部字典中,并且引用将指向相同的Python对象。换言之,我们可以说它的行为就像一个单身汉。上面我提到的文章解释了它提供了什么重要的内存配置文件和性能改进。我们可以使用} 字段检查字符串是否已被暂存:
PyASCIIObject
的^{然后:
使用两个字符串,我们还可以进行标识比较(在CPython的情况下,在内存中进行寻址比较)。
因此,对于
object
dtype
,数据帧最多分配20个字符串(每个氨基酸一个)。不过,值得注意的是,Pandas推荐使用categorical types进行枚举。熊猫记忆
因此,我们可以解释7.93吉布的天真估计:
注意
str_size
是58字节,而不是我们在上面看到的1字符文本的50字节。这是因为PEP 393定义了压缩字符串和非压缩字符串。你可以用sys.getsizeof(gen_matrix_df.REF[0])
检查。实际内存消耗应该是
gen_matrix_df.info()
报告的~1gib,是它的两倍。我们可以假设这与熊猫或小熊猫的记忆(预)分配有关。下面的实验表明这并非没有原因(多次运行显示保存图片):我想以熊猫的原著作者fresh article about design issues and future Pandas2的一句话来结束这一部分。
过程树
最后,让我们来到池中,看看是否可以在写时使用copy。我们将使用^{} (可从Ubuntu存储库获得)来估计进程组内存共享,并使用^{} 来记下系统范围的可用内存。两者都可以编写JSON。
我们将运行原始脚本
Pool(2)
。我们需要3个终端窗口。smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
glances -t 1 --export-json glances.json
mprof run -M script.py
然后
mprof plot
产生:总和图表(
mprof run --nopython --include-children ./script.py
)看起来像:注意上面的两个图表显示RSS。假设是,由于写时拷贝,它并不能反映实际的内存使用情况。现在我们有两个来自
smemstat
和glances
的JSON文件。我将使用以下脚本将JSON文件转换为CSV。首先让我们看看
free
内存。第一个和最小值之间的差异约为4.15 GiB。PSS数据如下:
和总数:
因此我们可以看到,由于写时拷贝,实际内存消耗约为4.15 GiB。但我们仍在序列化数据,以便通过
Pool.map
将其发送到工作进程。我们能不能也利用这里的抄写功能?共享数据
要使用写时拷贝,我们需要让
list(gen_matrix_df_list.values())
可以全局访问,这样fork之后的工作线程仍然可以读取它。让我们在
main
中的del gen_matrix_df
之后修改代码,如下所示:del gen_matrix_df_list
。修改
matrix_to_vcf
的第一行如下:现在让我们重新运行它。可用内存:
进程树:
和它的总和:
因此,我们的实际内存使用量最多约为2.9gib(在构建数据帧时,主进程的峰值是2.9gib),而写时拷贝有帮助!
顺便说一下,这里有所谓的读时复制,即Python的引用循环垃圾收集器described in Instagram Engineering(这导致issue31558中的
gc.freeze
)的行为。但是gc.disable()
在这种特殊情况下没有影响。更新
另一种替代的方法是使用^{} 从一开始就将数据共享委托给内核。以下是Python中高性能数据处理的谈话。然后tricky part将使熊猫使用mmaped Numpy数组。
我也有同样的问题。我需要处理一个庞大的文本语料库,同时在内存中保存数百万行的少量数据帧的知识库。我认为这个问题很常见,所以我会把我的答案放在一般的目的上。
设置的组合为我解决了问题(1&3&5可能只为您解决):
使用
Pool.imap
(或imap_unordered
)而不是Pool.map
。这将在数据上缓慢地迭代,而不是在开始处理之前将所有数据加载到内存中。将值设置为
chunksize
参数。这也会使imap
更快。将值设置为
maxtasksperchild
参数。将输出追加到磁盘而不是内存中。当它达到一定尺寸时立即或每隔一段时间。
分批运行代码。如果有迭代器,可以使用itertools.islice。这样做的目的是将
list(gen_matrix_df_list.values())
分成三个或更多的列表,然后只将第一个第三个列表传递给map
或imap
,然后在另一次运行中传递第二个第三个列表,等等。因为您有一个列表,所以只需在同一行代码中对其进行切片即可。相关问题 更多 >
编程相关推荐