Python Pandas 多进程应用
我在想有没有办法让pandas的dataframe的apply函数并行运行。我查了一下,没找到相关的信息。理论上,我觉得实现起来应该挺简单的,但我还没看到有人做过。毕竟,这几乎就是并行处理的标准定义嘛。有没有人尝试过这个,或者知道有什么方法?如果没有人有主意,我想我可能会自己试着写一个。
我正在使用的代码在下面。抱歉没有导入语句,它们和其他很多东西混在一起了。
def apply_extract_entities(row):
names=[]
counter=0
print row
for sent in nltk.sent_tokenize(open(row['file_name'], "r+b").read()):
for chunk in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent))):
if hasattr(chunk, 'node'):
names+= [chunk.node, ' '.join(c[0] for c in chunk.leaves())]
counter+=1
print counter
return names
data9_2['proper_nouns']=data9_2.apply(apply_extract_entities, axis=1)
编辑:
我试过的情况是,我用我的可迭代对象的前五个元素来运行它,结果发现比起串行运行,反而花了更长的时间,所以我猜这可能没成功。
os.chdir(str(home))
data9_2=pd.read_csv('edgarsdc3.csv')
os.chdir(str(home)+str('//defmtest'))
#import stuff
from nltk import pos_tag, ne_chunk
from nltk.tokenize import SpaceTokenizer
#define apply function and apply it
os.chdir(str(home)+str('//defmtest'))
####
#this is our apply function
def apply_extract_entities(row):
names=[]
counter=0
print row
for sent in nltk.sent_tokenize(open(row['file_name'], "r+b").read()):
for chunk in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent))):
if hasattr(chunk, 'node'):
names+= [chunk.node, ' '.join(c[0] for c in chunk.leaves())]
counter+=1
print counter
return names
#need something that populates a list of sections of a dataframe
def dataframe_splitter(df):
df_list=range(len(df))
for i in xrange(len(df)):
sliced=df.ix[i]
df_list[i]=sliced
return df_list
df_list=dataframe_splitter(data9_2)
#df_list=range(len(data9_2))
print df_list
#the multiprocessing section
import multiprocessing
def worker(arg):
print arg
(arg)['proper_nouns']=arg.apply(apply_extract_entities, axis=1)
return arg
pool = multiprocessing.Pool(processes=10)
# get list of pieces
res = pool.imap_unordered(worker, df_list[:5])
res2= list(itertools.chain(*res))
pool.close()
pool.join()
# re-assemble pieces into the final output
output = data9_2.head(1).concatenate(res)
print output.head()
1 个回答
2
使用多进程时,最好先生成几个大块的数据,然后再把这些数据组合起来,最终得到我们想要的结果。
来源
import multiprocessing
def worker(arg):
return arg*2
pool = multiprocessing.Pool()
# get list of pieces
res = pool.map(worker, [1,2,3])
pool.close()
pool.join()
# re-assemble pieces into the final output
output = sum(res)
print 'got:',output
输出
got: 12