我有不同程序产生的实验数据。一种是记录试验的开始和结束时间以及试验的类型(一个类别)。你知道吗
start trial type end
0 6.002987 2 c 7.574240
1 7.967054 3 b 19.084946
2 21.864419 5 b 23.298480
3 23.656995 7 c 24.087210
4 24.194764 9 c 27.960752
另一个记录一个连续的数据流并记录每次观测的时间。你知道吗
X Y Z
0.0000 0.324963 -0.642636 -2.305040
0.0333 0.025089 -0.480412 -0.637273
0.0666 0.364149 0.966594 0.789467
0.0999 -0.087334 -0.761769 0.399813
0.1332 0.841872 2.306711 -1.059608
我将这两个表作为pandas DataFrames,只想检索在trials DataFrames行中从开始到结束范围之间的连续数据部分。我通过使用遍历行的for循环实现了这一点,但我认为必须有更多的“熊猫方式”来实现这一点。因此,我研究了应用,但到目前为止,我得出的结果甚至比循环慢得多。 由于我正在处理大量的大型数据集,我正在寻找执行时间方面最有效的方法来解决这个问题。你知道吗
这是连续数据帧预期结果的一部分:
X Y Z trial type
13.6863 0.265358 0.116529 1.196689 NaN NaN
13.7196 -0.715096 -0.413416 0.696454 NaN NaN
13.7529 0.714897 -0.158183 1.735958 4.0 b
13.7862 -0.259513 0.194762 -0.531482 4.0 b
13.8195 -0.929080 -1.200593 -1.233834 4.0 b
[编辑:这里我测试不同方法的性能。我找到了一种使用apply()的方法,但它并不比使用iterrows快多少。你知道吗
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def create_trials_df(num_trials=360, max_start=1400.0):
# First df holds start and end times (as seconds) of a trial as well as type of trial.
d = {'trial': pd.Series(np.sort(np.random.choice(np.arange(1, 400), replace=False, size=(360,)))),
'type': pd.Series(np.random.choice(('a', 'b', 'c', 'd'),size=num_trials)),
'start': pd.Series(np.sort(np.random.random_sample((num_trials,))) * max_start)}
trials_df = pd.DataFrame(d)
# Create column for when the trial ended.
trials_df['end'] = trials_df['start'].shift(-1)
trials_df.loc[num_trials-1, 'end'] = trials_df['start'].iloc[-1] + 2.0
trials_df['diff'] = trials_df['end'] - trials_df['start']
trials_df['end'] = trials_df['end'] - trials_df['diff'] * 0.2
del trials_df['diff']
return trials_df
def create_continuous_df(num_trials=360, max_start=1400.0):
# Second df has continuously recorded data with time as index.
time_delta = 1.0/30.0
rows = int((max_start+2) * 1/time_delta)
idx_time = pd.Index(np.arange(rows) * time_delta)
continuous_df = pd.DataFrame(np.random.randn(rows, 3), index=idx_time, columns=list('XYZ'))
print("continuous rows:", continuous_df.index.size)
print("continuous last time:", continuous_df.last_valid_index())
return continuous_df
# I want to group the continuous data by trial and type later on.
def iterrows_test(trials_df, continuous_df):
for index, row in trials_df.iterrows():
continuous_df.loc[row['start']:row['end'], 'trial'] = row['trial']
continuous_df.loc[row['start']:row['end'], 'type'] = row['type']
def itertuples_test(trials_df, continuous_df):
continuous_df['trial'] = np.NaN
continuous_df['type'] = np.NaN
for row in trials_df.itertuples():
continuous_df.loc[slice(row[1],row[4]), ['trial','type']] = [row[2],row[3]]
def apply_test(trials_df, continuous_df):
trial_series = pd.Series([x[0] for x in zip(trials_df.values)])
continuous_df['trial'] = np.NaN
continuous_df['type'] = np.NaN
def insert_trial_data_to_continuous(vals, con_df):
con_df.loc[slice(vals[0], vals[3]), ['trial','type']] = [vals[1],vals[2]]
trial_series.apply(insert_trial_data_to_continuous, args=(continuous_df,))
def real_slow_index_map(trials_df, continuous_df):
# Transform trial_data to new df: merge start and end ordered, make it float index.
trials_df['pre-start'] = trials_df['start'] - 0.0001
trials_df['post-end'] = trials_df['end'] + 0.0001
start_df = pd.DataFrame(data={'type': trials_df['type'].values, 'trial': trials_df['trial'].values},
index=trials_df['start'])
end_df = pd.DataFrame(data={'type': trials_df['type'].values, 'trial': trials_df['trial'].values},
index=trials_df['end'])
# Fill inbetween trials with NaN.
pre_start_df = pd.DataFrame({'trial': np.NaN, 'type': np.NaN}, index=trials_df['pre-start'])
post_end_df = pd.DataFrame({'trial': np.NaN, 'type': np.NaN}, index=trials_df['post-end'])
new_df = start_df.append([end_df, pre_start_df, post_end_df])
new_df.sort_index(inplace=True)
# Each start/end index in new_df has corresponding value in type and trial column.
def get_tuple(idx):
res = new_df.iloc[new_df.index.get_loc(idx, method='nearest')]
# return trial and type column values.
return tuple(res.values)
# Apply this to all indices.
idx_series = continuous_df.index.to_series()
continuous_df['trial'] = idx_series.apply(get_tuple).values
continuous_df[['trial', 'type']] = continuous_df['trial'].apply(pd.Series)
def jp_data_analysis_answer(trials_df, continuous_df):
ranges = trials_df[['trial', 'type', 'start', 'end']].values
def return_trial(n):
for i, r in enumerate(ranges):
if r[2] <= n <= r[3]:
return tuple((i, r[1]))
else:
return np.nan, np.nan
continuous_df['trial'], continuous_df['type'] = list(zip(*continuous_df.index.map(return_trial)))
def performance_test(func, trials_df, continuous_df):
return_df = continuous_df.copy()
time_ref = time.perf_counter()
func(trials_df, return_df)
time_delta = time.perf_counter() - time_ref
print("time delta for {}:".format(func.__name__), time_delta)
return return_df
# Just to illustrate where this is going:
def plot_trial(continuous_df):
continuous_df['type'] = continuous_df['type'].astype('category')
continuous_df = continuous_df.groupby('type').filter(lambda x: x is not np.NaN)
# Without the NaNs in column, let's set the trial column to dtype integer.
continuous_df['trial'] = continuous_df['trial'].astype('int64')
# Plot the data by trial.
for key, group in continuous_df.groupby('trial'):
group.drop(['trial', 'type'], axis=1).plot()
plt.title('Trial {}, Type: {}'.format(key, group['type'].iloc[0]))
plt.show()
break
if __name__ == '__main__':
import time
num_trials = 360
max_start_time = 1400
trials_df = create_trials_df(max_start=max_start_time)
data_df = create_continuous_df(max_start=max_start_time)
# My original approach with a for-loop over iterrows.
iterrows_df = performance_test(iterrows_test,trials_df, data_df)
# itertuples test
itertuples_df = performance_test(itertuples_test,trials_df, data_df)
# apply() on trial data, continuous data is manipulated therein
apply_df = performance_test(apply_test,trials_df, data_df)
# Mapping on index of continuous data. SLOW!
map_idx_df = performance_test(real_slow_index_map,trials_df, data_df)
# method by jp_data_analysis' answer. Works well with small continuous_df, but doesn't scale well.
jp_df = performance_test(jp_data_analysis_answer,trials_df, data_df)
plot_trial(apply_df)
我看到一个因素~7倍的改善与下面的逻辑。诀窍是在
continuous_df
上使用index.map(custom_function)
并将结果与(在我看来)未充分使用的for..else..
构造一起解包。这仍然是次优的,但对于您的目的来说可能已经足够了,而且肯定比迭代行要好。你知道吗相关问题 更多 >
编程相关推荐