from pyspark.sql import types
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
# Example data
data = [
(0 ,'12.2.25.68'),
(0 ,'12.2.25.68'),
(0 ,'12.2.25.43'),
(1 ,'62.251.0.149'), # This ID has two modes
(1 ,'62.251.0.140'),
]
schema = types.StructType([
types.StructField('id', types.IntegerType()),
types.StructField('ip', types.StringType()),
])
df = spark.createDataFrame(data, schema)
# Count id/ip pairs
df = df.groupBy('id', 'ip').count()
def find_modes(a, b):
"""
Reducing function to find modes (can return multiple).
a and b are lists of Row
"""
if a[0]['count'] > b[0]['count']:
return a
if a[0]['count'] < b[0]['count']:
return b
return a + b
result = (
df.rdd
.map(lambda row: (row['id'], [row]))
.reduceByKey(find_modes)
.collectAsMap()
)
根据我的意见,这里有一个解决方案,它演示了如何从技术上通过两种数据传递实现这一点—一种是计数,另一种是减少并找到(多种)模式。我已经用RDDAPI实现了第二部分—转换为DataFrameAPI的工作留给读者;)(tbh我不知道是否有可能对多个输出行进行自定义聚合(如下所示):
结果:
这种方法的一个小警告:因为我在内存中聚合了重复的模式,所以如果一个ID有许多不同的ip具有相同的计数,那么就存在OOM问题。对于这个特定的应用程序,我想说这是非常不可能的(例如,一个用户可能不会有100万个不同的IP,都有一个事件)
但我倾向于同意@absolutelycrasted,最简单的解决方案可能就是你已经有了的解决方案,即使它有额外的数据传递。但是您可能应该避免执行
sort
/rank
,而是尽可能在窗口中查找最大计数相关问题 更多 >
编程相关推荐