pyspark:通过ArrayType列筛选和提取结构

2024-04-26 07:33:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是pyspark 2.2,其模式如下

root
 |-- col1: string (nullable = true)
 |-- col2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

和数据

+----+----------------------------------------------+
|col1|col2                                          |
+----+----------------------------------------------+
|A   |[[id1, [k -> v1]], [id2, [k2 -> v5, k -> v2]]]|
|B   |[[id3, [k -> v3]], [id4, [k3 -> v6, k -> v4]]]|
+----+----------------------------------------------+

col2是一个复杂的结构。它是一个结构数组,每个结构都有两个元素,一个id字符串和一个metadata映射。(这是一个简化的数据集,真正的数据集在struct中有10+个元素,在metadata字段中有10+个键值对)

我想形成一个查询,返回与我的筛选逻辑匹配的数据帧(比如col1 == 'A'col2.id == 'id2'col2.metadata.k == 'v2'

结果如下所示,过滤逻辑最多可以匹配数组中的一个结构,因此在第二列中,它只是一个结构,而不是一个结构的数组

+----+--------------------------+
|col1|col2_filtered             |
+----+--------------------------+
|A   |[id2, [k2 -> v5, k -> v2]]|
+----+--------------------------+

我知道如何通过explode实现这一点,但问题是col2通常有100多个结构,最多会有一个匹配我的过滤逻辑,因此我认为explode不是一个可伸缩的解决方案

有人能告诉我怎么做吗,提前谢谢

下面是设置的代码块

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType

schema = StructType([
    StructField('col1', StringType(), True),
    StructField('col2', ArrayType(
        StructType([
            StructField('id', StringType(), True),
            StructField('metadata', MapType(StringType(), StringType()), True)
        ])
    ))
])

data = [
    ('A', [('id1', {'k': 'v1'}), ('id2', {'k': 'v2', 'k2': 'v5'})]),
    ('B', [('id3', {'k': 'v3'}), ('id4', {'k': 'v4', 'k3': 'v6'})])
]

df = spark.createDataFrame(data=data, schema=schema)

Tags: 数据idtruestringk2结构v2col2
2条回答

编辑:您可以尝试自定义项:

import pyspark.sql.functions as F

df2 = df.filter(
    F.udf(lambda x: any([y.id == 'id2' and 'k' in y.metadata.keys() for y in x]), 'boolean')('col2')
).withColumn(
    'col2',
    F.udf(lambda x: [y for y in x if y.id == 'id2' and 'k' in y.metadata.keys()][0], 'struct<id:string,metadata:map<string,string>>')('col2')
)

df2.show(truncate=False)
+  +             +
|col1|col2                      |
+  +             +
|A   |[id2, [k2 -> v5, k -> v2]]|
+  +             +

您可以将列强制转换为JSON,并检查col2是否包含所需的JSON:

import pyspark.sql.functions as F

df2 = df.filter(
    (F.col('col1') == 'A') &
    F.to_json('col2').contains(
        F.to_json(
            F.struct(
                F.lit('id2').alias('id'),
                F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
            )
        )
    )
)

df2.show(truncate=False)
+  +                  +
|col1|col2                                |
+  +                  +
|A   |[[id1, [k -> v1]], [id2, [k -> v2]]]|
+  +                  +

如果只想在col2中保留匹配的结构,可以使用withColumn替换它:

df3 = df2.withColumn(
    'col2', 
    F.struct(
        F.lit('id2').alias('id'),
        F.create_map(F.lit('k'), F.lit('v2')).alias('metadata')
    )
)

df3.show()
+  +        +
|col1|            col2|
+  +        +
|   A|[id2, [k -> v2]]|
+  +        +

除了@mck的解决方案外,我在搜索后尝试了另外三种方法,所有这些方法都得到了预期的结果

  1. 使用udf进行筛选并返回匹配的结构
df.filter(df.col1 == 'A') \
  .select(df.col1, udf(lambda a: [s for s in a if s.id == 'id2' and s.metadata['k'] == 'v2'], df.schema['col2'].dataType)('col2')[0].alias('col2_filtered')) \
  .na.drop('any')
  1. 使用udf进行筛选并获取匹配结构的索引
df.filter(df.col1 == 'A') \
  .select(df.col1, df.col2.getItem(udf(lambda a: [i for i, s in enumerate(a) if s.id == 'id2' and s.metadata['k'] == 'v2'], ArrayType(IntegerType(), True))(df.col2)[0]).alias('col2_filtered')) \
  .na.drop('any')
  1. 使用expr进行过滤,这是Spark 2.4中的一项功能,因此可以作为未来升级的候选功能
df.filter(df.col1 == 'A') \
  .select(df.col1, expr("filter(col2, s -> s.id == 'id2' AND s.metadata['k'] == 'v2')").getItem(0).alias('col2_filtered')) \
  .na.drop('any')

相关问题 更多 >