在Pysp中查找每个id的模态值

2024-04-20 00:41:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个约17亿行的pyspark数据帧,其模式为:

INPUT SCHEMA
id  
ip  
datetime

我正试图找到每个id的模式ip

我现在有一个函数,在这个函数中我创建了一个单独的

INT TABLE
id
ip
number_of_records

然后过滤模态ip

这似乎是难以置信的缓慢和笨重,什么是一个更有效的方式来获得模式ip的每一个ip

Proposed Output Schema
id
modal_ip

谢谢大家


Tags: of数据函数ipidnumberinputdatetime
1条回答
网友
1楼 · 发布于 2024-04-20 00:41:42

根据我的意见,这里有一个解决方案,它演示了如何从技术上通过两种数据传递实现这一点—一种是计数,另一种是减少并找到(多种)模式。我已经用RDDAPI实现了第二部分—转换为DataFrameAPI的工作留给读者;)(tbh我不知道是否有可能对多个输出行进行自定义聚合(如下所示):

from pyspark.sql import types

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

# Example data
data = [
    (0 ,'12.2.25.68'),
    (0 ,'12.2.25.68'),
    (0 ,'12.2.25.43'),
    (1 ,'62.251.0.149'),  # This ID has two modes
    (1 ,'62.251.0.140'),
]

schema = types.StructType([
    types.StructField('id', types.IntegerType()),
    types.StructField('ip', types.StringType()),
])

df = spark.createDataFrame(data, schema)

# Count id/ip pairs
df = df.groupBy('id', 'ip').count()

def find_modes(a, b):
    """
    Reducing function to find modes (can return multiple). 

    a and b are lists of Row
    """
    if a[0]['count'] > b[0]['count']:
        return a
    if a[0]['count'] < b[0]['count']:
        return b
    return a + b

result = (
    df.rdd
    .map(lambda row: (row['id'], [row]))
    .reduceByKey(find_modes)
    .collectAsMap()
)

结果:

{0: [Row(id=0, ip='12.2.25.68', count=2)],
 1: [Row(id=1, ip='62.251.0.149', count=1),
 Row(id=1, ip='62.251.0.140', count=1)]}

这种方法的一个小警告:因为我在内存中聚合了重复的模式,所以如果一个ID有许多不同的ip具有相同的计数,那么就存在OOM问题。对于这个特定的应用程序,我想说这是非常不可能的(例如,一个用户可能不会有100万个不同的IP,都有一个事件)

但我倾向于同意@absolutelycrasted,最简单的解决方案可能就是你已经有了的解决方案,即使它有额外的数据传递。但是您可能应该避免执行sort/rank,而是尽可能在窗口中查找最大计数

相关问题 更多 >