如何通过分组计算实现每个唯一id的行选择标准?

2024-06-01 03:42:20 发布

您现在位置:Python中文网/ 问答频道 /正文

数据帧组循环替代方案

我有一个13mil行的数据集,有1214个站点(唯一ID):

# copy the data to the clipboard, and read in with
df = pd.read_clipboard(sep=',', index_col=[0])

,tmc_code,measurement_tstamp,travel_time_minutes
0,133-04199,2019-01-01 18:15:00,2.01
1,133-04199,2019-01-01 18:20:00,2.01
2,133-04198,2019-01-01 18:25:00,9.23
3,133-04191,2019-01-01 20:35:00,2.88
4,133-04191,2019-01-01 20:40:00,2.62
5,133-04190,2019-01-01 20:40:00,1.3
6,133-04193,2019-01-01 20:20:00,4.96
7,133-04193,2019-01-01 20:25:00,4.96
8,133-04192,2019-01-01 20:30:00,5.05
9,133-04192,2019-01-01 20:35:00,5.14
10,133-04195,2019-01-01 19:45:00,9.52
11,133-04195,2019-01-01 19:50:00,10.69
12,133-04195,2019-01-01 19:55:00,9.37
13,133-04194,2019-01-01 20:10:00,5.96
14,133-04194,2019-01-01 20:15:00,5.96
15,133-04194,2019-01-01 20:20:00,5.96
16,133P04359,2019-01-01 22:25:00,0.66
17,133P04359,2019-01-01 22:30:00,0.78
18,133P04359,2019-01-01 23:25:00,0.8
19,133P04126,2019-01-01 23:10:00,0.01
20,133P04125,2019-01-01 23:10:00,0.71
21,133+04361,2019-01-01 05:00:00,2.56
22,133+04361,2019-01-01 22:30:00,2.07
23,133+04361,2019-01-01 23:25:00,2.0
24,133+04126,2019-01-01 23:10:00,0.59
25,133+04127,2019-01-01 23:10:00,0.61
26,133+04128,2019-01-01 23:10:00,0.58
27,133+04129,2019-01-01 23:10:00,0.2
28,133+04360,2019-01-01 04:55:00,1.15
29,133+04360,2019-01-01 05:00:00,2.31

有一些极端的最大值在物理上是不可能的,所以为了剔除它们,我尝试使用95%加上模式来创建一个阈值并过滤掉极端值

车站产生不同的行程时间值(由于长度/交通模式),因此百分比和模式必须按车站确定

我所尝试的:

def feature_cleaner(df, feature, column):
    
    df_feature_group = pd.DataFrame()
    feature_list = list(df[feature].unique())
    for item in feature_list:
        feature_df = df.loc[df[feature]==item]

        feature_df_clean_mode = feature_df.loc[feature_df[column] 
        < (feature_df[column].quantile(.95)
        + feature_df[column].mode().iloc[0])]

        df_feature_group = df_feature_group.append(feature_df_clean_mode)
        
    return df_feature_group
        
df_features = feature_cleaner(df, 'tmc_code', 'travel_time_seconds')

这是可行的,但速度很慢

df_clean_tmc = df.groupby(['tmc_code'], as_index=False)['travel_time_seconds'].apply(lambda x: x[x['travel_time_seconds'] 
< (x['travel_time_seconds'].quantile(.95) 
+ x['travel_time_seconds'].apply(lambda x: stats.mode(x)[0]))])

我也试过了,但速度很慢,结果没有进行任何计算,它与原始数据帧的大小相同

我怀疑第二个apply是错误的,但是groupby对象没有“mode”函数,stats.mode在单独的groupby测试中正常工作

我也试过:

df_clean_tmc = df.groupby(['tmc_code'], as_index=False)
np.where(df_clean_tmc['travel_time_seconds'] 
< (df_clean_tmc['travel_time_seconds'].quantile(.95)
+ df_clean_tmc['travel_time_seconds'].apply(lambda x: stats.mode(x)[0]),df['travel_time_seconds']))

但出现了一个类型错误:

TypeError: '<' not supported between instances of 'DataFrameGroupBy' and 'tuple'

实现这一目标的更有效、更恰当的方法是什么


Tags: cleandfindextimemode模式groupcode
1条回答
网友
1楼 · 发布于 2024-06-01 03:42:20

根据测试结果,不可能实现几个数量级的改进(不使用底层工具,如numba甚至Cython)。这可以从执行聚合计算所需的时间中看出

但是,仍然可以进行两项关键优化:

  • 减少显式数据传递的数量-主要是df[df['col'] = val]过滤。在我的实现中,for循环被(1)使用.groupby().agg()立即聚合所有内容和(2)使用查找表(dict)检查阈值所取代。我不确定是否存在更有效的方法,但它总是涉及一次数据传递,最多只能再节省几秒钟
  • 尽可能访问df["col"].values而不是df["col"]。(注意,这不会复制数据,因为打开tracemalloc模块可以很容易地验证。)

基准代码

使用您的样本生成了1500万条记录

import pandas as pd
import numpy as np
from datetime import datetime
# check memory footprint
# import tracemalloc
# tracemalloc.start()

# data
df = pd.read_csv("/mnt/ramdisk/in.csv", index_col="idx")
del df['measurement_tstamp']
df.reset_index(drop=True, inplace=True)
df["travel_time_minutes"] = df["travel_time_minutes"].astype(np.float64)
# repeat
cols = df.columns
df = pd.DataFrame(np.repeat(df.values, 500000, axis=0))
df.columns = cols

# Aggregation starts
t0 = datetime.now()
print(f"Program begins....")

# 1. aggregate everything at once
df_agg = df.groupby("tmc_code").agg(
    mode=("travel_time_minutes", pd.Series.mode),
    q95=("travel_time_minutes", lambda x: np.quantile(x, .95))
)

t1 = datetime.now()
print(f"  Aggregation: {(t1 - t0).total_seconds():.2f}s")

# 2. construct a lookup table for the thresholds
threshold = {}
for tmc_code, row in df_agg.iterrows():  # slow but only 1.2k rows
    threshold[tmc_code] = np.max(row["mode"]) + row["q95"]

t2 = datetime.now()  # doesn't matter
print(f"  Computing Threshold: {(t2 - t1).total_seconds():.2f}s")

# 3. filtering
def f(tmc_code, travel_time_minutes):
    return travel_time_minutes <= threshold[tmc_code]

df = df[list(map(f, df["tmc_code"].values, df["travel_time_minutes"].values))]

t3 = datetime.now()
print(f"  Filter: {(t3 - t2).total_seconds():.2f}s...")
print(f"Program ends in {(datetime.now() - t0).total_seconds():.2f}s")

# memory footprint
# current, peak = tracemalloc.get_traced_memory()
# print(f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB")
# tracemalloc.stop()

print()

结果:(3次运行)

| No. | old   | new   | new(aggr) | new(filter) |
|  -|   -|   -|     -|      -|
| 1   | 24.55 | 14.04 | 9.87      | 4.16        |
| 2   | 23.84 | 13.58 | 9.66      | 3.92        |
| 3   | 24.81 | 14.37 | 10.02     | 4.34        |
| avg | 24.40 | 14.00 |           |             |

=> ~74% faster

使用python 3.7和pandas 1.1.2进行测试

相关问题 更多 >