从Spark GroupedData对象中选择随机项

2024-04-27 22:51:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我刚开始在Python中使用Spark,但一直无法解决这个问题:在一个pyspark.sql.dataframe.DataFrame上运行groupBy之后

df = sqlsc.read.json("data.json")
df.groupBy('teamId')

如何在不替换的情况下从每个结果组(按teamId分组)中选择N随机样本?

我基本上是想从每个团队中随机选择N用户,也许使用groupBy开始是错误的?


Tags: 用户jsondataframedfreadsqldata情况
2条回答

我发现这一个数据框架,而不是进入rdd的方式。

您可以使用window函数在组内创建排名,其中排名可以是随机的,以适合您的情况。然后,可以根据每组的样本数进行筛选(N)

window_1 = Window.partitionBy(data['teamId']).orderBy(F.rand())
data_1 = data.select('*', F.rank().over(window_1).alias('rank')).filter(F.col('rank') <= N).drop('rank')

嗯,这有点不对。GroupedData不是真正为数据访问而设计的。它只描述分组条件并提供聚合方法。有关详细信息,请参阅我对Using groupBy in Spark and getting back to a DataFrame的答复。

这个想法的另一个问题是选择N random samples。如果没有精神上的数据分组,这是一项很难并行完成的任务,而且当你在一个DataFramecallgroupBy时不会发生这种情况:

至少有两种处理方法:

  • 转换为RDD,groupBy并执行本地采样

    import random
    
    n = 3
    
    def sample(iter, n): 
        rs = random.Random()  # We should probably use os.urandom as a seed
        return rs.sample(list(iter), n)    
    
    df = sqlContext.createDataFrame(
        [(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"], 
        ("teamId", "x1", "x2"))
    
    grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey()
    
    sampled = sqlContext.createDataFrame(
        grouped.flatMap(lambda kv: sample(kv[1], n)))
    
    sampled.show()
    
    ## +------+---+-------------------+
    ## |teamId| x1|                 x2|
    ## +------+---+-------------------+
    ## |     1|  g|   0.81921738561455|
    ## |     1|  f| 0.8563875814036598|
    ## |     1|  a| 0.9010425238735935|
    ## |     2|  c| 0.3864428179837973|
    ## |     2|  g|0.06233470405822805|
    ## |     2|  d|0.37620872770129155|
    ## |     3|  f| 0.7518901502732027|
    ## |     3|  e| 0.5142305439671874|
    ## |     3|  d| 0.6250620479303716|
    ## +------+---+-------------------+
    
  • 使用窗口函数

    from pyspark.sql import Window
    from pyspark.sql.functions import col, rand, rowNumber
    
    w = Window.partitionBy(col("teamId")).orderBy(col("rnd_"))
    
    sampled = (df
        .withColumn("rnd_", rand())  # Add random numbers column
        .withColumn("rn_", rowNumber().over(w))  # Add rowNumber over windw
        .where(col("rn_") <= n)  # Take n observations
        .drop("rn_")  # drop helper columns
        .drop("rnd_"))
    
    sampled.show()
    
    ## +------+---+--------------------+
    ## |teamId| x1|                  x2|
    ## +------+---+--------------------+
    ## |     1|  f|  0.8563875814036598|
    ## |     1|  g|    0.81921738561455|
    ## |     1|  i|  0.8173912535268248|
    ## |     2|  h| 0.10862995810038856|
    ## |     2|  c|  0.3864428179837973|
    ## |     2|  a|  0.6695356657072442|
    ## |     3|  b|0.012329360826023095|
    ## |     3|  a|  0.6450777858109182|
    ## |     3|  e|  0.5142305439671874|
    ## +------+---+--------------------+
    

但恐怕这两样东西都很贵。如果单个组的大小是平衡的,并且相对较大,我只需使用DataFrame.randomSplit

如果组数相对较少,则可以尝试其他方法:

from pyspark.sql.functions import count, udf
from pyspark.sql.types import BooleanType
from operator import truediv

counts = (df
    .groupBy(col("teamId"))
    .agg(count("*").alias("n"))
    .rdd.map(lambda r: (r.teamId, r.n))
    .collectAsMap()) 

# This defines fraction of observations from a group which should
# be taken to get n values 
counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})

to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())

sampled = (df
    .withColumn("rnd_", rand())
    .where(to_take(col("teamId"), col("rnd_")))
    .drop("rnd_"))

sampled.show()

## +------+---+--------------------+
## |teamId| x1|                  x2|
## +------+---+--------------------+
## |     1|  d| 0.14815204548854788|
## |     1|  f|  0.8563875814036598|
## |     1|  g|    0.81921738561455|
## |     2|  a|  0.6695356657072442|
## |     2|  d| 0.37620872770129155|
## |     2|  g| 0.06233470405822805|
## |     3|  b|0.012329360826023095|
## |     3|  h|  0.9022527556458557|
## +------+---+--------------------+

在Spark 1.5+中,可以用调用sampleBy方法替换udf

df.sampleBy("teamId", counts_bd.value)

它不会给你确切的观察次数,但只要每组的观察次数足够大,能够得到合适的样本,它在大多数情况下都应该足够好。您还可以用类似的方式在RDD上使用sampleByKey

相关问题 更多 >