根据Python中从另一个数据帧获取的范围,高效(快速)地将连续数据分组到一个数据帧中?

2024-04-24 14:27:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我有不同程序产生的实验数据。一种是记录试验的开始和结束时间以及试验的类型(一个类别)。你知道吗

       start  trial type        end
0   6.002987      2    c   7.574240
1   7.967054      3    b  19.084946
2  21.864419      5    b  23.298480
3  23.656995      7    c  24.087210
4  24.194764      9    c  27.960752

另一个记录一个连续的数据流并记录每次观测的时间。你知道吗

               X         Y         Z
0.0000  0.324963 -0.642636 -2.305040
0.0333  0.025089 -0.480412 -0.637273
0.0666  0.364149  0.966594  0.789467
0.0999 -0.087334 -0.761769  0.399813
0.1332  0.841872  2.306711 -1.059608

我将这两个表作为pandas DataFrames,只想检索在trials DataFrames行中从开始到结束范围之间的连续数据部分。我通过使用遍历行的for循环实现了这一点,但我认为必须有更多的“熊猫方式”来实现这一点。因此,我研究了应用,但到目前为止,我得出的结果甚至比循环慢得多。 由于我正在处理大量的大型数据集,我正在寻找执行时间方面最有效的方法来解决这个问题。你知道吗

这是连续数据帧预期结果的一部分:

                X         Y         Z  trial type
13.6863  0.265358  0.116529  1.196689    NaN  NaN
13.7196 -0.715096 -0.413416  0.696454    NaN  NaN
13.7529  0.714897 -0.158183  1.735958    4.0    b
13.7862 -0.259513  0.194762 -0.531482    4.0    b
13.8195 -0.929080 -1.200593 -1.233834    4.0    b

[编辑:这里我测试不同方法的性能。我找到了一种使用apply()的方法,但它并不比使用iterrows快多少。你知道吗

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def create_trials_df(num_trials=360, max_start=1400.0):
    # First df holds start and end times (as seconds) of a trial as well as type of trial.
    d = {'trial': pd.Series(np.sort(np.random.choice(np.arange(1, 400), replace=False, size=(360,)))),
         'type': pd.Series(np.random.choice(('a', 'b', 'c', 'd'),size=num_trials)),
         'start': pd.Series(np.sort(np.random.random_sample((num_trials,))) * max_start)}
    trials_df = pd.DataFrame(d)
    # Create column for when the trial ended.
    trials_df['end'] = trials_df['start'].shift(-1)
    trials_df.loc[num_trials-1, 'end'] = trials_df['start'].iloc[-1] + 2.0
    trials_df['diff'] = trials_df['end'] - trials_df['start']
    trials_df['end'] = trials_df['end'] - trials_df['diff'] * 0.2
    del trials_df['diff']
    return trials_df


def create_continuous_df(num_trials=360, max_start=1400.0):
    # Second df has continuously recorded data with time as index.
    time_delta = 1.0/30.0
    rows = int((max_start+2) * 1/time_delta)
    idx_time = pd.Index(np.arange(rows) * time_delta)
    continuous_df = pd.DataFrame(np.random.randn(rows, 3), index=idx_time, columns=list('XYZ'))
    print("continuous rows:", continuous_df.index.size)
    print("continuous last time:", continuous_df.last_valid_index())
    return  continuous_df


# I want to group the continuous data by trial and type later on.
def iterrows_test(trials_df, continuous_df):
    for index, row in trials_df.iterrows():
        continuous_df.loc[row['start']:row['end'], 'trial'] = row['trial']
        continuous_df.loc[row['start']:row['end'], 'type'] = row['type']


def itertuples_test(trials_df, continuous_df):
    continuous_df['trial'] = np.NaN
    continuous_df['type'] = np.NaN
    for row in trials_df.itertuples():
        continuous_df.loc[slice(row[1],row[4]), ['trial','type']] = [row[2],row[3]]


def apply_test(trials_df, continuous_df):
    trial_series = pd.Series([x[0] for x in zip(trials_df.values)])
    continuous_df['trial'] = np.NaN
    continuous_df['type'] = np.NaN
    def insert_trial_data_to_continuous(vals, con_df):
        con_df.loc[slice(vals[0], vals[3]), ['trial','type']] = [vals[1],vals[2]]

    trial_series.apply(insert_trial_data_to_continuous, args=(continuous_df,))


def real_slow_index_map(trials_df, continuous_df):
    # Transform trial_data to new df: merge start and end ordered, make it float index.
    trials_df['pre-start'] = trials_df['start'] - 0.0001
    trials_df['post-end'] = trials_df['end'] + 0.0001

    start_df = pd.DataFrame(data={'type': trials_df['type'].values, 'trial': trials_df['trial'].values},
                            index=trials_df['start'])
    end_df = pd.DataFrame(data={'type': trials_df['type'].values, 'trial': trials_df['trial'].values},
                          index=trials_df['end'])
    # Fill inbetween trials with NaN.
    pre_start_df = pd.DataFrame({'trial': np.NaN, 'type': np.NaN}, index=trials_df['pre-start'])
    post_end_df = pd.DataFrame({'trial': np.NaN, 'type': np.NaN}, index=trials_df['post-end'])
    new_df = start_df.append([end_df, pre_start_df, post_end_df])
    new_df.sort_index(inplace=True)
    # Each start/end index in new_df has corresponding value in type and trial column.
    def get_tuple(idx):
        res = new_df.iloc[new_df.index.get_loc(idx, method='nearest')]
        # return trial and type column values.
        return tuple(res.values)
    # Apply this to all indices.
    idx_series = continuous_df.index.to_series()
    continuous_df['trial'] = idx_series.apply(get_tuple).values
    continuous_df[['trial', 'type']] = continuous_df['trial'].apply(pd.Series)


def jp_data_analysis_answer(trials_df, continuous_df):
    ranges = trials_df[['trial', 'type', 'start', 'end']].values
    def return_trial(n):
        for i, r in enumerate(ranges):
            if r[2] <= n <= r[3]:
                return tuple((i, r[1]))
        else:
            return np.nan, np.nan

    continuous_df['trial'], continuous_df['type'] = list(zip(*continuous_df.index.map(return_trial)))


def performance_test(func, trials_df, continuous_df):
    return_df = continuous_df.copy()
    time_ref = time.perf_counter()
    func(trials_df, return_df)
    time_delta = time.perf_counter() - time_ref
    print("time delta for {}:".format(func.__name__), time_delta)
    return  return_df


# Just to illustrate where this is going:
def plot_trial(continuous_df):
    continuous_df['type'] = continuous_df['type'].astype('category')
    continuous_df = continuous_df.groupby('type').filter(lambda x: x is not np.NaN)
    # Without the NaNs in column, let's set the trial column to dtype integer.
    continuous_df['trial'] = continuous_df['trial'].astype('int64')
    # Plot the data by trial.
    for key, group in continuous_df.groupby('trial'):
        group.drop(['trial', 'type'], axis=1).plot()
        plt.title('Trial {}, Type: {}'.format(key, group['type'].iloc[0]))
        plt.show()
        break


if __name__ == '__main__':
    import time
    num_trials = 360
    max_start_time = 1400
    trials_df = create_trials_df(max_start=max_start_time)
    data_df = create_continuous_df(max_start=max_start_time)

    # My original approach with a for-loop over iterrows.
    iterrows_df = performance_test(iterrows_test,trials_df, data_df)

    # itertuples test
    itertuples_df = performance_test(itertuples_test,trials_df, data_df)

    # apply() on trial data, continuous data is manipulated therein
    apply_df = performance_test(apply_test,trials_df, data_df)

    # Mapping on index of continuous data. SLOW!
    map_idx_df = performance_test(real_slow_index_map,trials_df, data_df)

    # method by jp_data_analysis' answer. Works well with small continuous_df, but doesn't scale well.
    jp_df = performance_test(jp_data_analysis_answer,trials_df, data_df)

    plot_trial(apply_df)

Tags: testdfdataindextimedeftypenp
1条回答
网友
1楼 · 发布于 2024-04-24 14:27:33

我看到一个因素~7倍的改善与下面的逻辑。诀窍是在continuous_df上使用index.map(custom_function)并将结果与(在我看来)未充分使用的for..else..构造一起解包。这仍然是次优的,但对于您的目的来说可能已经足够了,而且肯定比迭代行要好。你知道吗

import numpy as np
import pandas as pd

def test2():
    # First df holds start and end times (as seconds) of a trial as well as type of trial.
    num_trials = 360
    max_start = 1400.0
    d = {'trial': pd.Series(np.sort(np.random.choice(np.arange(1, 400), replace=False, size=(360,)))),
         'type': pd.Series(np.random.choice(('a', 'b', 'c', 'd'),size=num_trials)),
         'start': pd.Series(np.sort(np.random.random_sample((num_trials,))) * max_start)}
    trials_df = pd.DataFrame(d)
    # Create column for when the trial ended.
    trials_df['end'] = trials_df['start'].shift(-1)
    trials_df.loc[num_trials-1, 'end'] = trials_df['start'].iloc[-1] + 2.0
    trials_df['diff'] = trials_df['end'] - trials_df['start']
    trials_df['end'] = trials_df['end'] - trials_df['diff'] * 0.2
    del trials_df['diff']

    # Second df has continuously recorded data with time as index.
    time_delta = 0.0333
    rows = int(max_start+2/time_delta)
    idx_time = pd.Index(np.arange(rows) * time_delta)
    continuous_df = pd.DataFrame(np.random.randn(rows,3), index=idx_time, columns=list('XYZ'))

    ranges = trials_df[['trial', 'type', 'start', 'end']].values
    def return_trial(n):
        for r in ranges:
            if r[2] <= n <= r[3]:
                return tuple(r[:2])
        else:
            return (np.nan, '')

    continuous_df['trial'], continuous_df['type'] = list(zip(*continuous_df.index.map(return_trial)))

    return trials_df, continuous_df

相关问题 更多 >