2024-04-27 22:51:16 发布
网友
我刚开始在Python中使用Spark,但一直无法解决这个问题:在一个pyspark.sql.dataframe.DataFrame上运行groupBy之后
pyspark.sql.dataframe.DataFrame
groupBy
df = sqlsc.read.json("data.json") df.groupBy('teamId')
如何在不替换的情况下从每个结果组(按teamId分组)中选择N随机样本?
N
我基本上是想从每个团队中随机选择N用户,也许使用groupBy开始是错误的?
我发现这一个数据框架,而不是进入rdd的方式。
您可以使用window函数在组内创建排名,其中排名可以是随机的,以适合您的情况。然后,可以根据每组的样本数进行筛选(N)
window
(N)
window_1 = Window.partitionBy(data['teamId']).orderBy(F.rand()) data_1 = data.select('*', F.rank().over(window_1).alias('rank')).filter(F.col('rank') <= N).drop('rank')
嗯,这有点不对。GroupedData不是真正为数据访问而设计的。它只描述分组条件并提供聚合方法。有关详细信息,请参阅我对Using groupBy in Spark and getting back to a DataFrame的答复。
GroupedData
这个想法的另一个问题是选择N random samples。如果没有精神上的数据分组,这是一项很难并行完成的任务,而且当你在一个DataFrame上callgroupBy时不会发生这种情况:
N random samples
DataFrame
call
至少有两种处理方法:
转换为RDD,groupBy并执行本地采样
import random n = 3 def sample(iter, n): rs = random.Random() # We should probably use os.urandom as a seed return rs.sample(list(iter), n) df = sqlContext.createDataFrame( [(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"], ("teamId", "x1", "x2")) grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey() sampled = sqlContext.createDataFrame( grouped.flatMap(lambda kv: sample(kv[1], n))) sampled.show() ## +------+---+-------------------+ ## |teamId| x1| x2| ## +------+---+-------------------+ ## | 1| g| 0.81921738561455| ## | 1| f| 0.8563875814036598| ## | 1| a| 0.9010425238735935| ## | 2| c| 0.3864428179837973| ## | 2| g|0.06233470405822805| ## | 2| d|0.37620872770129155| ## | 3| f| 0.7518901502732027| ## | 3| e| 0.5142305439671874| ## | 3| d| 0.6250620479303716| ## +------+---+-------------------+
使用窗口函数
from pyspark.sql import Window from pyspark.sql.functions import col, rand, rowNumber w = Window.partitionBy(col("teamId")).orderBy(col("rnd_")) sampled = (df .withColumn("rnd_", rand()) # Add random numbers column .withColumn("rn_", rowNumber().over(w)) # Add rowNumber over windw .where(col("rn_") <= n) # Take n observations .drop("rn_") # drop helper columns .drop("rnd_")) sampled.show() ## +------+---+--------------------+ ## |teamId| x1| x2| ## +------+---+--------------------+ ## | 1| f| 0.8563875814036598| ## | 1| g| 0.81921738561455| ## | 1| i| 0.8173912535268248| ## | 2| h| 0.10862995810038856| ## | 2| c| 0.3864428179837973| ## | 2| a| 0.6695356657072442| ## | 3| b|0.012329360826023095| ## | 3| a| 0.6450777858109182| ## | 3| e| 0.5142305439671874| ## +------+---+--------------------+
但恐怕这两样东西都很贵。如果单个组的大小是平衡的,并且相对较大,我只需使用DataFrame.randomSplit。
DataFrame.randomSplit
如果组数相对较少,则可以尝试其他方法:
from pyspark.sql.functions import count, udf from pyspark.sql.types import BooleanType from operator import truediv counts = (df .groupBy(col("teamId")) .agg(count("*").alias("n")) .rdd.map(lambda r: (r.teamId, r.n)) .collectAsMap()) # This defines fraction of observations from a group which should # be taken to get n values counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()}) to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType()) sampled = (df .withColumn("rnd_", rand()) .where(to_take(col("teamId"), col("rnd_"))) .drop("rnd_")) sampled.show() ## +------+---+--------------------+ ## |teamId| x1| x2| ## +------+---+--------------------+ ## | 1| d| 0.14815204548854788| ## | 1| f| 0.8563875814036598| ## | 1| g| 0.81921738561455| ## | 2| a| 0.6695356657072442| ## | 2| d| 0.37620872770129155| ## | 2| g| 0.06233470405822805| ## | 3| b|0.012329360826023095| ## | 3| h| 0.9022527556458557| ## +------+---+--------------------+
在Spark 1.5+中,可以用调用sampleBy方法替换udf:
sampleBy
udf
df.sampleBy("teamId", counts_bd.value)
它不会给你确切的观察次数,但只要每组的观察次数足够大,能够得到合适的样本,它在大多数情况下都应该足够好。您还可以用类似的方式在RDD上使用sampleByKey。
sampleByKey
我发现这一个数据框架,而不是进入rdd的方式。
您可以使用
window
函数在组内创建排名,其中排名可以是随机的,以适合您的情况。然后,可以根据每组的样本数进行筛选(N)
嗯,这有点不对。
GroupedData
不是真正为数据访问而设计的。它只描述分组条件并提供聚合方法。有关详细信息,请参阅我对Using groupBy in Spark and getting back to a DataFrame的答复。这个想法的另一个问题是选择
N random samples
。如果没有精神上的数据分组,这是一项很难并行完成的任务,而且当你在一个DataFrame
上call
groupBy时不会发生这种情况:至少有两种处理方法:
转换为RDD,
groupBy
并执行本地采样使用窗口函数
但恐怕这两样东西都很贵。如果单个组的大小是平衡的,并且相对较大,我只需使用
DataFrame.randomSplit
。如果组数相对较少,则可以尝试其他方法:
在Spark 1.5+中,可以用调用
sampleBy
方法替换udf
:它不会给你确切的观察次数,但只要每组的观察次数足够大,能够得到合适的样本,它在大多数情况下都应该足够好。您还可以用类似的方式在RDD上使用
sampleByKey
。相关问题 更多 >
编程相关推荐