我正在将Pandas/Numpy代码转换为Dask,以处理更大的数据集。我似乎无法重新创建以下Pandas/Numpy代码:
df['days_to_complete'] = np.busday_count(begindates=df['time_order_date'].values.astype('datetime64[D]'),enddates=df['time_complete_date'],weekmask='1111111',holidays=hols_list)
在考虑工作周和假日列表时,此函数返回时间\订单\日期和时间\完成\日期之间的整数天数。它在我的数据框中创建并填充一个新列,没有问题
映射调用numpy函数的分区:
ddf['days_to_complete'] = ddf.time_order.map_partitions(func=np.busday_count,args= ddf['time_order_date'].values.astype('datetime64[D]'),ddf['time_complete_date']),meta=(None, 'i8'))
还可以使用lambda映射_分区:
ddf['days_to_complete'] = ddf.map_partitions(lambda ddf: ddf.assign(result = np.busday_count(begindates=ddf['time_order_date'].values.astype('datetime64[D]'),enddates=ddf['time_complete_date'],weekmask='1111111',holidays=hols_list)),meta=(None,'i8'))
并在运行ddf.compute()后获得以下错误:
TypeError: busday_count() got multiple values for argument 'begindates'
如何以并行处理/Dask友好的方式最好地使用此numpy函数? 我没有成功地使用Dask文档/示例或其他SO线程。 我还想使用Pandas CustomBusinessHour rollfoward,就像我在这里使用基本Pandas一样:
bis_hour = CustomBusinessHour(n=1,weekmask='Mon Tue Wed Thu Fri Sat Sun',holidays=hols_list,start = bus_hours_start,end = bus_hours_end,offset=0)
df['time_order_bis'] = pd.to_datetime(df['time_order'])
df['time_order_bis'] = df['time_order_bis'].apply(lambda row: bis_hour.rollforward(row))
此“前滚”订单时间在规定的客户营业时间内(周六订单现在是周一上午7点,一个工作日)。谢谢
编辑: 我尝试过编写和调用函数:
def bdays(df):
return np.busday_count(df.time_order_date.values.astype('datetime64[D]'),df.time_complete_date,weekmask='1111111',holidays=hols_list)
ddf['days_to_complete'] = ddf.map_partitions(bdays,df=ddf,meta=('days_to_complete','i8')).compute()
我得到以下错误:TypeError: bdays() got multiple values for argument 'df'
我让它工作了!关键是返回一个Dask数组,不要过早计算,这会破坏类型。我建议做大量的type()检查并一步一步地进行,您需要Dask对象,本质上,pandas对象/numpy数组会破坏分区/并行性
功能:
使用映射分区。请注意,上述函数的第一个参数需要数据帧/分区->;我们不在地图分区中指定此选项!只有附加参数
在分配给数据帧中的新列之前进行计算(compute())会导致错误
调试建议: 测试您的输入并只使用一个分区测试函数。bdays是上面的函数
output: dask.dataframe.core.Series
相关问题 更多 >
编程相关推荐