所以我试图对下面的代码做的是读取一个列表列表,并将它们放入名为checker
的函数中,然后让log_result
处理函数checker
的结果。我尝试使用多线程来实现这一点,因为变量名rows_to_parse
实际上有数百万行,所以使用多个内核应该会大大加快这个过程。
目前的代码不起作用,导致Python崩溃。
我的关注点和问题:
df
中的现有df保持
在整个过程中索引,否则log_result
将获得
不知道哪一行需要更新。apply_async
不是合适的
执行此任务的多处理函数,因为我相信
计算机读写df的顺序可能会损坏它???df
但我不确定我会怎么做。谢谢你的帮助。
import pandas as pd
import multiprocessing
from functools import partial
def checker(a,b,c,d,e):
match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
index_of_match = match.index.tolist()
if len(index_of_match) == 1: #one match in df
return index_of_match
elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
return [index_of_match[0]]
else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
return [a,b,c,d,e]
def log_result(result, dataf):
if len(result) == 1: #
dataf.loc[result[0]]['e'] += 1
else: #append new row to exisiting df
new_row = pd.DataFrame([result],columns=cols)
dataf = dataf.append(new_row,ignore_index=True)
def apply_async_with_callback(parsing_material, dfr):
pool = multiprocessing.Pool()
for var_a, var_b, var_c, var_d, var_e in parsing_material:
pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
pool.close()
pool.join()
if __name__ == '__main__':
#setting up main dataframe
cols = ['a','b','c','d','e']
existing_data = [["YES","A","16052011","13031999",3],
["NO","Q","11022003","15081999",3],
["YES","A","22082010","03012001",9]]
#main dataframe
df = pd.DataFrame(existing_data,columns=cols)
#new data
rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
['YES', 'W', '17061992', '26032012', 6],
['YES', 'G', '01122006', '07082014', 2],
['YES', 'N', '06081992', '21052008', 9],
['YES', 'Y', '18051995', '24011996', 6],
['NO', 'Q', '11022003', '15081999', 3],
['NO', 'O', '20112004', '28062008', 0],
['YES', 'R', '10071994', '03091996', 8],
['NO', 'C', '09091998', '22051992', 1],
['YES', 'Q', '01051995', '02012000', 3],
['YES', 'Q', '26022015', '26092007', 5],
['NO', 'F', '15072002', '17062001', 8],
['YES', 'I', '24092006', '03112003', 2],
['YES', 'A', '22082010', '03012001', 9],
['YES', 'I', '15072016', '30092005', 7],
['YES', 'Y', '08111999', '02022006', 3],
['NO', 'V', '04012016', '10061996', 1],
['NO', 'I', '21012003', '11022001', 6],
['NO', 'P', '06041992', '30111993', 6],
['NO', 'W', '30081992', '02012016', 6]]
apply_async_with_callback(rows_to_parse, df)
在多处理中更新这样的数据帧是行不通的:
一方面,这是非常低效的(O(n)为每个附加so O(n^2)的总和。首选的方法是一次将一些对象连接在一起。
另一方面,更重要的是,dataf并没有为每次更新锁定,所以不能保证两个操作不会冲突(我猜这是崩溃的python)。
最后,append不起作用,因此一旦回调完成,变量
dataf
就会被丢弃!!并且不会对父级dataf
进行任何更改。我们可以使用MultiProcessing list或dict。如果您不关心order或dict,请列出(例如枚举),因为您必须注意,从async返回的值不是以定义良好的顺序返回的。
(或者我们可以创建一个实现锁的对象,请参见Eli Bendersky)
因此进行了以下更改:
现在,一旦异步完成,就有了一个数据帧的MultiProcessing.list。您可以浓缩这些(和
ignore_index
)以获得所需的结果:应该会成功的。
注意:在每个阶段创建newrow数据帧的效率也比让pandas一次性将列表列表直接解析为数据帧的效率低。希望这是一个玩具的例子,真的你希望你的块相当大,以获得胜利与多处理(我听到了50kb作为经验法则…),一次一排永远不会是一个胜利在这里。
旁白:您应该避免在代码中使用globals(比如df),在函数中传递globals(在本例中,作为checker的参数)要干净得多。
相关问题 更多 >
编程相关推荐