我使用的是pyspark 2.2,其模式如下
root
|-- col1: string (nullable = true)
|-- col2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
和数据
+----+----------------------------------------------+
|col1|col2 |
+----+----------------------------------------------+
|A |[[id1, [k -> v1]], [id2, [k2 -> v5, k -> v2]]]|
|B |[[id3, [k -> v3]], [id4, [k3 -> v6, k -> v4]]]|
+----+----------------------------------------------+
col2
是一个复杂的结构。它是一个结构数组,每个结构都有两个元素,一个id
字符串和一个metadata
映射。(这是一个简化的数据集,真正的数据集在struct中有10+个元素,在metadata
字段中有10+个键值对)
我想形成一个查询,返回与我的筛选逻辑匹配的数据帧(比如col1 == 'A'
和col2.id == 'id2'
和col2.metadata.k == 'v2'
)
结果如下所示,过滤逻辑最多可以匹配数组中的一个结构,因此在第二列中,它只是一个结构,而不是一个结构的数组
+----+--------------------------+
|col1|col2_filtered |
+----+--------------------------+
|A |[id2, [k2 -> v5, k -> v2]]|
+----+--------------------------+
我知道如何通过explode
实现这一点,但问题是col2
通常有100多个结构,最多会有一个匹配我的过滤逻辑,因此我认为explode
不是一个可伸缩的解决方案
有人能告诉我怎么做吗,提前谢谢
下面是设置的代码块
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
schema = StructType([
StructField('col1', StringType(), True),
StructField('col2', ArrayType(
StructType([
StructField('id', StringType(), True),
StructField('metadata', MapType(StringType(), StringType()), True)
])
))
])
data = [
('A', [('id1', {'k': 'v1'}), ('id2', {'k': 'v2', 'k2': 'v5'})]),
('B', [('id3', {'k': 'v3'}), ('id4', {'k': 'v4', 'k3': 'v6'})])
]
df = spark.createDataFrame(data=data, schema=schema)
编辑:您可以尝试自定义项:
您可以将列强制转换为JSON,并检查col2是否包含所需的JSON:
如果只想在col2中保留匹配的结构,可以使用
withColumn
替换它:除了@mck的解决方案外,我在搜索后尝试了另外三种方法,所有这些方法都得到了预期的结果
udf
进行筛选并返回匹配的结构udf
进行筛选并获取匹配结构的索引expr
进行过滤,这是Spark 2.4中的一项功能,因此可以作为未来升级的候选功能相关问题 更多 >
编程相关推荐