如何加速数组/矩阵的迭代?尝试了Pandas和纽比阵列

2024-06-10 20:49:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我想通过一个大的二维数组(15100米)来进行一些功能扩展。在

对一个有10万条记录的样本集的研究表明我需要更快地得到这个结果。在

编辑(数据模型信息)

为了简化,我们假设只有两个相关的列:

  • IP(标识符)
  • Unix(自1970年以来的时间戳,以秒为单位)

我想添加第三列,计算这个IP在过去12小时内出现了多少次。在

结束编辑

我的第一次尝试是使用pandas,因为在命名维度上工作很舒服,但速度太慢:

for index,row in tqdm_notebook(myData.iterrows(),desc='iterrows'):
# how many times was the IP address (and specific device) around in the prior 5h?
    hours = 12
    seen = myData[(myData['ip']==row['ip'])
                 &(myData['device']==row['device'])
                 &(myData['os']==row['os'])
                 &(myData['unix']<row['unix'])
                 &(myData['unix']>(row['unix']-(60*60*hours)))].shape[0]
    ip_seen = myData[(myData['ip']==row['ip'])
                 &(myData['unix']<row['unix'])
                 &(myData['unix']>(row['unix']-(60*60*hours)))].shape[0]
    myData.loc[index,'seen'] = seen
    myData.loc[index,'ip_seen'] = ip_seen

然后我切换到numpy数组并希望得到更好的结果,但是对于完整的数据集运行它仍然太慢:

^{pr2}$

我的下一个想法是只迭代一次,并维护一个不断增长的当前计数字典,而不是每次迭代都向后看。在

但这也有一些其他的缺点(例如,如何跟踪何时减少12小时窗口外的观测值)。在

你将如何处理这个问题?在

它甚至可以选择使用低级别的Tensorflow函数来涉及GPU吗?在

谢谢


Tags: theinip编辑indexdeviceunix数组
2条回答

加速的唯一方法是循环。在您的例子中,您可以尝试将^{}与所需时间范围的窗口一起使用,使用Unix时间戳作为日期时间索引(假设记录是按时间戳排序的,否则您需要先排序)。这对于ip_seen应该可以正常工作:

ip = myData['ip']
ip.index = pd.to_datetime(myData['unix'], unit='s')
myData['ip_seen'] = ip.rolling('5h')
    .agg(lambda w: np.count_nonzero(w[:-1] == w[-1]))
    .values.astype(np.int32)

但是,当聚合涉及多个列时,比如seen列,它会变得更加复杂。目前(参见Pandas issue #15095)滚动函数不支持跨越两个维度的聚合。解决方法可以是将感兴趣的列合并到一个新的序列中,例如元组(如果值是数字,这可能会更好)或字符串(如果值已经是字符串,则可能更好)。例如:

^{pr2}$

编辑

显然^{}只适用于数字类型,这就留下了两个选项:

  1. 操作数据以使用数字类型。对于IP来说,这很简单,因为它实际上表示一个32位的数字(如果是IPv6,则为64位)。对于设备和操作系统,假设它们现在是字符串,情况变得更复杂了,你必须将每个可能的值映射到一个整数,然后将其与IP合并成一个长值,例如,将这些值放入更高的位或类似的位置(在IPv6中甚至不可能,因为NumPy现在支持的最大整数是64位)。在
  2. 滚动myData的索引(现在应该是而不是datetime,因为rolling也不能使用它),然后使用index窗口获取必要的数据并进行操作:

    # Use sequential integer index
    idx_orig = myData.index
    myData.reset_index(drop=True, inplace=True)
    # Index to roll
    idx = pd.Series(myData.index)
    idx.index = pd.to_datetime(myData['unix'], unit='s')
    # Roll aggregation function
    def agg_seen(w, data, fields):
        # Use slice for faster data frame slicing
        slc = slice(int(w[0]), int(w[-2])) if len(w) > 1 else []
        match = data.loc[slc, fields] == data.loc[int(w[-1]), fields]
        return np.count_nonzero(np.all(match, axis=1))
    # Do rolling
    myData['ip_seen'] = idx.rolling('5h') \
        .agg(lambda w: agg_seen(w, myData, ['ip'])) \
        .values.astype(np.int32)
    myData['ip'] = idx.rolling('5h') \
        .agg(lambda w: agg_seen(w, myData, ['ip', 'device', 'os'])) \
        .values.astype(np.int32)
    # Put index back
    myData.index = idx_orig
    

    但是,这并不是rolling的使用方式,我不确定这是否比仅仅循环提供了更好的性能。

正如在对@jdehesa的评论中提到的,我采用了另一种方法,它允许我只迭代一次整个数据集,并从索引中提取(衰减的)权重。在

decay_window = 60*60*12 # every 12
decay = 0.5 # fall by 50% every window
ip_idx = pd.DataFrame(myData.ip.unique())
ip_idx['ts_seen'] = 0
ip_idx['ip_seen'] = 0
ip_idx.columns = ['ip','ts_seen','ip_seen']
ip_idx.set_index('ip',inplace=True)

for index, row in myData.iterrows(): # all
    # How often was this IP seen?
    prior_ip_seen = ip_idx.loc[(row['ip'],'ip_seen')]
    prior_ts_seen = ip_idx.loc[(row['ip'],'ts_seen')]
    delay_since_count = row['unix']-ip_idx.loc[(row['ip'],'ts_seen')]
    new_ip_seen = prior_ip_seen*decay**(delay_since_count/decay_window)+1
    ip_idx.loc[(row['ip'],'ip_seen')] = new_ip_seen
    ip_idx.loc[(row['ip'],'ts_seen')] = row['unix']
    myData.iloc[index,14] = new_ip_seen-1

这样一来,结果就不是最初要求的固定时间窗口,而是先前的观察随着时间的推移“淡出”,给最近频繁的观测增加了权重。在

与最初计划的简化方法(结果证明成本更高)相比,此功能包含更多信息。在

感谢您的意见!在

编辑

同时,我切换到numpy数组来执行相同的操作,现在只需要一小部分时间(在<;2h内进行200m更新的循环)。在

以防万一有人找个起点:

^{pr2}$

相关问题 更多 >