pyspark中的where子句可以作用于不存在的列吗

2 投票
1 回答
57 浏览
提问于 2025-04-14 17:53

我偶然发现了pyspark一个奇怪的行为。基本上,它可以在一个在数据框中不存在的列上执行where函数:

print(spark.version)

df = spark.read.format("csv").option("header", True).load("abfss://some_abfs_path/df.csv")
print(type(df), df.columns.__len__(), df.count())

c = df.columns[0] # A column name before renaming
df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns]) # Add suffix to column names

print(c in df.columns)

try:
    df.select(c)
except:
    print("SO THIS DOESN'T WORK, WHICH MAKES SENSE.")

# BUT WHY DOES THIS WORK:
print(df.where(col(c).isNotNull()).count())
# IT'S USING c AS f"{c}_new"
print(df.where(col(f"{c}_new").isNotNull()).count())

输出结果:

3.1.2
<class 'pyspark.sql.dataframe.DataFrame'> 102 1226791
False
SO THIS DOESN'T WORK, WHICH MAKES SENSE.
1226791
1226791

如你所见,奇怪的地方在于,当列c在重命名列后不再存在于df中时,它仍然可以用于where函数。

我的直觉是,pyspark在后台会在select重命名之前就编译了where。但如果真是这样,那就设计得很糟糕,也无法解释为什么旧的和新的列名都能正常工作。

如果有人能提供一些见解,我会很感激,谢谢。

我是在Azure Databricks上运行这些代码的。

1 个回答

3

当你有疑问的时候,可以使用 df.explain() 来了解背后发生了什么。这可以帮助你确认自己的直觉:

Spark context available as 'sc' (master = local[*], app id = local-1709748307134).
SparkSession available as 'spark'.
>>> df = spark.read.option("header", True).option("inferSchema", True).csv("taxi.csv")
>>> c = df.columns[0]
>>> from pyspark.sql.functions import *
>>> df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns]) 
>>> df.explain()
== Physical Plan ==
*(1) Project [VendorID#17 AS VendorID_new#51, tpep_pickup_datetime#18 AS tpep_pickup_datetime_new#52, tpep_dropoff_datetime#19 AS tpep_dropoff_datetime_new#53, passenger_count#20 AS passenger_count_new#54, trip_distance#21 AS trip_distance_new#55, RatecodeID#22 AS RatecodeID_new#56, store_and_fwd_flag#23 AS store_and_fwd_flag_new#57, PULocationID#24 AS PULocationID_new#58, DOLocationID#25 AS DOLocationID_new#59, payment_type#26 AS payment_type_new#60, fare_amount#27 AS fare_amount_new#61, extra#28 AS extra_new#62, mta_tax#29 AS mta_tax_new#63, tip_amount#30 AS tip_amount_new#64, tolls_amount#31 AS tolls_amount_new#65, improvement_surcharge#32 AS improvement_surcharge_new#66, total_amount#33 AS total_amount_new#67]
+- FileScan csv [VendorID#17,tpep_pickup_datetime#18,tpep_dropoff_datetime#19,passenger_count#20,trip_distance#21,RatecodeID#22,store_and_fwd_flag#23,PULocationID#24,DOLocationID#25,payment_type#26,fare_amount#27,extra#28,mta_tax#29,tip_amount#30,tolls_amount#31,improvement_surcharge#32,total_amount#33] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/charlie/taxi.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<VendorID:int,tpep_pickup_datetime:string,tpep_dropoff_datetime:string,passenger_count:int,...


>>> df = df.where(col(c).isNotNull())
>>> df.explain()
== Physical Plan ==
*(1) Project [VendorID#17 AS VendorID_new#51, tpep_pickup_datetime#18 AS tpep_pickup_datetime_new#52, tpep_dropoff_datetime#19 AS tpep_dropoff_datetime_new#53, passenger_count#20 AS passenger_count_new#54, trip_distance#21 AS trip_distance_new#55, RatecodeID#22 AS RatecodeID_new#56, store_and_fwd_flag#23 AS store_and_fwd_flag_new#57, PULocationID#24 AS PULocationID_new#58, DOLocationID#25 AS DOLocationID_new#59, payment_type#26 AS payment_type_new#60, fare_amount#27 AS fare_amount_new#61, extra#28 AS extra_new#62, mta_tax#29 AS mta_tax_new#63, tip_amount#30 AS tip_amount_new#64, tolls_amount#31 AS tolls_amount_new#65, improvement_surcharge#32 AS improvement_surcharge_new#66, total_amount#33 AS total_amount_new#67]
+- *(1) Filter isnotnull(VendorID#17)
   +- FileScan csv [VendorID#17,tpep_pickup_datetime#18,tpep_dropoff_datetime#19,passenger_count#20,trip_distance#21,RatecodeID#22,store_and_fwd_flag#23,PULocationID#24,DOLocationID#25,payment_type#26,fare_amount#27,extra#28,mta_tax#29,tip_amount#30,tolls_amount#31,improvement_surcharge#32,total_amount#33] Batched: false, DataFilters: [isnotnull(VendorID#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/charlie/taxi.csv], PartitionFilters: [], PushedFilters: [IsNotNull(VendorID)], ReadSchema: struct<VendorID:int,tpep_pickup_datetime:string,tpep_dropoff_datetime:string,passenger_count:int,...

从下往上看:首先是 FileScan 来读取数据,然后是 Filter 来丢掉不需要的数据,最后是 Project 来应用别名。这是Spark构建其DAG(有向无环图)的一种合理方式——尽可能早地丢掉不需要的数据,这样就不会浪费时间去处理它们。不过,正如你所注意到的,这可能会导致一些意想不到的行为。如果你想避免这种情况,可以在你的 df.where() 语句之前使用 df.checkpoint() 来固定DataFrame——这样当你尝试引用旧的列名时,就会得到你预期的错误:

>>> from pyspark.sql.functions import *
>>> spark.sparkContext.setCheckpointDir("file:/tmp/")
>>> df = spark.read.option("header", True).option("inferSchema", True).csv("taxi.csv")
>>> c = df.columns[0]
>>> df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns]) 
>>> df = df.checkpoint()
>>> df.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[VendorID_new#51,tpep_pickup_datetime_new#52,tpep_dropoff_datetime_new#53,passenger_count_new#54,trip_distance_new#55,RatecodeID_new#56,store_and_fwd_flag_new#57,PULocationID_new#58,DOLocationID_new#59,payment_type_new#60,fare_amount_new#61,extra_new#62,mta_tax_new#63,tip_amount_new#64,tolls_amount_new#65,improvement_surcharge_new#66,total_amount_new#67]


>>> df = df.where(col(c).isNotNull())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/homebrew/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py", line 3325, in filter
    jdf = self._jdf.filter(condition._jc)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/homebrew/opt/apache-spark/libexec/python/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `VendorID` cannot be resolved. Did you mean one of the following? [`VendorID_new`, `extra_new`, `RatecodeID_new`, `mta_tax_new`, `DOLocationID_new`].;
'Filter isnotnull('VendorID)
+- LogicalRDD [VendorID_new#51, tpep_pickup_datetime_new#52, tpep_dropoff_datetime_new#53, passenger_count_new#54, trip_distance_new#55, RatecodeID_new#56, store_and_fwd_flag_new#57, PULocationID_new#58, DOLocationID_new#59, payment_type_new#60, fare_amount_new#61, extra_new#62, mta_tax_new#63, tip_amount_new#64, tolls_amount_new#65, improvement_surcharge_new#66, total_amount_new#67], false

>>> 

撰写回答