pyspark中的where子句可以作用于不存在的列吗
我偶然发现了pyspark一个奇怪的行为。基本上,它可以在一个在数据框中不存在的列上执行where
函数:
print(spark.version)
df = spark.read.format("csv").option("header", True).load("abfss://some_abfs_path/df.csv")
print(type(df), df.columns.__len__(), df.count())
c = df.columns[0] # A column name before renaming
df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns]) # Add suffix to column names
print(c in df.columns)
try:
df.select(c)
except:
print("SO THIS DOESN'T WORK, WHICH MAKES SENSE.")
# BUT WHY DOES THIS WORK:
print(df.where(col(c).isNotNull()).count())
# IT'S USING c AS f"{c}_new"
print(df.where(col(f"{c}_new").isNotNull()).count())
输出结果:
3.1.2
<class 'pyspark.sql.dataframe.DataFrame'> 102 1226791
False
SO THIS DOESN'T WORK, WHICH MAKES SENSE.
1226791
1226791
如你所见,奇怪的地方在于,当列c
在重命名列后不再存在于df
中时,它仍然可以用于where
函数。
我的直觉是,pyspark在后台会在select
重命名之前就编译了where
。但如果真是这样,那就设计得很糟糕,也无法解释为什么旧的和新的列名都能正常工作。
如果有人能提供一些见解,我会很感激,谢谢。
我是在Azure Databricks上运行这些代码的。
1 个回答
3
当你有疑问的时候,可以使用 df.explain()
来了解背后发生了什么。这可以帮助你确认自己的直觉:
Spark context available as 'sc' (master = local[*], app id = local-1709748307134).
SparkSession available as 'spark'.
>>> df = spark.read.option("header", True).option("inferSchema", True).csv("taxi.csv")
>>> c = df.columns[0]
>>> from pyspark.sql.functions import *
>>> df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns])
>>> df.explain()
== Physical Plan ==
*(1) Project [VendorID#17 AS VendorID_new#51, tpep_pickup_datetime#18 AS tpep_pickup_datetime_new#52, tpep_dropoff_datetime#19 AS tpep_dropoff_datetime_new#53, passenger_count#20 AS passenger_count_new#54, trip_distance#21 AS trip_distance_new#55, RatecodeID#22 AS RatecodeID_new#56, store_and_fwd_flag#23 AS store_and_fwd_flag_new#57, PULocationID#24 AS PULocationID_new#58, DOLocationID#25 AS DOLocationID_new#59, payment_type#26 AS payment_type_new#60, fare_amount#27 AS fare_amount_new#61, extra#28 AS extra_new#62, mta_tax#29 AS mta_tax_new#63, tip_amount#30 AS tip_amount_new#64, tolls_amount#31 AS tolls_amount_new#65, improvement_surcharge#32 AS improvement_surcharge_new#66, total_amount#33 AS total_amount_new#67]
+- FileScan csv [VendorID#17,tpep_pickup_datetime#18,tpep_dropoff_datetime#19,passenger_count#20,trip_distance#21,RatecodeID#22,store_and_fwd_flag#23,PULocationID#24,DOLocationID#25,payment_type#26,fare_amount#27,extra#28,mta_tax#29,tip_amount#30,tolls_amount#31,improvement_surcharge#32,total_amount#33] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/charlie/taxi.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<VendorID:int,tpep_pickup_datetime:string,tpep_dropoff_datetime:string,passenger_count:int,...
>>> df = df.where(col(c).isNotNull())
>>> df.explain()
== Physical Plan ==
*(1) Project [VendorID#17 AS VendorID_new#51, tpep_pickup_datetime#18 AS tpep_pickup_datetime_new#52, tpep_dropoff_datetime#19 AS tpep_dropoff_datetime_new#53, passenger_count#20 AS passenger_count_new#54, trip_distance#21 AS trip_distance_new#55, RatecodeID#22 AS RatecodeID_new#56, store_and_fwd_flag#23 AS store_and_fwd_flag_new#57, PULocationID#24 AS PULocationID_new#58, DOLocationID#25 AS DOLocationID_new#59, payment_type#26 AS payment_type_new#60, fare_amount#27 AS fare_amount_new#61, extra#28 AS extra_new#62, mta_tax#29 AS mta_tax_new#63, tip_amount#30 AS tip_amount_new#64, tolls_amount#31 AS tolls_amount_new#65, improvement_surcharge#32 AS improvement_surcharge_new#66, total_amount#33 AS total_amount_new#67]
+- *(1) Filter isnotnull(VendorID#17)
+- FileScan csv [VendorID#17,tpep_pickup_datetime#18,tpep_dropoff_datetime#19,passenger_count#20,trip_distance#21,RatecodeID#22,store_and_fwd_flag#23,PULocationID#24,DOLocationID#25,payment_type#26,fare_amount#27,extra#28,mta_tax#29,tip_amount#30,tolls_amount#31,improvement_surcharge#32,total_amount#33] Batched: false, DataFilters: [isnotnull(VendorID#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/charlie/taxi.csv], PartitionFilters: [], PushedFilters: [IsNotNull(VendorID)], ReadSchema: struct<VendorID:int,tpep_pickup_datetime:string,tpep_dropoff_datetime:string,passenger_count:int,...
从下往上看:首先是 FileScan
来读取数据,然后是 Filter
来丢掉不需要的数据,最后是 Project
来应用别名。这是Spark构建其DAG(有向无环图)的一种合理方式——尽可能早地丢掉不需要的数据,这样就不会浪费时间去处理它们。不过,正如你所注意到的,这可能会导致一些意想不到的行为。如果你想避免这种情况,可以在你的 df.where()
语句之前使用 df.checkpoint()
来固定DataFrame——这样当你尝试引用旧的列名时,就会得到你预期的错误:
>>> from pyspark.sql.functions import *
>>> spark.sparkContext.setCheckpointDir("file:/tmp/")
>>> df = spark.read.option("header", True).option("inferSchema", True).csv("taxi.csv")
>>> c = df.columns[0]
>>> df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns])
>>> df = df.checkpoint()
>>> df.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[VendorID_new#51,tpep_pickup_datetime_new#52,tpep_dropoff_datetime_new#53,passenger_count_new#54,trip_distance_new#55,RatecodeID_new#56,store_and_fwd_flag_new#57,PULocationID_new#58,DOLocationID_new#59,payment_type_new#60,fare_amount_new#61,extra_new#62,mta_tax_new#63,tip_amount_new#64,tolls_amount_new#65,improvement_surcharge_new#66,total_amount_new#67]
>>> df = df.where(col(c).isNotNull())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/homebrew/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py", line 3325, in filter
jdf = self._jdf.filter(condition._jc)
File "/opt/homebrew/opt/apache-spark/libexec/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/homebrew/opt/apache-spark/libexec/python/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `VendorID` cannot be resolved. Did you mean one of the following? [`VendorID_new`, `extra_new`, `RatecodeID_new`, `mta_tax_new`, `DOLocationID_new`].;
'Filter isnotnull('VendorID)
+- LogicalRDD [VendorID_new#51, tpep_pickup_datetime_new#52, tpep_dropoff_datetime_new#53, passenger_count_new#54, trip_distance_new#55, RatecodeID_new#56, store_and_fwd_flag_new#57, PULocationID_new#58, DOLocationID_new#59, payment_type_new#60, fare_amount_new#61, extra_new#62, mta_tax_new#63, tip_amount_new#64, tolls_amount_new#65, improvement_surcharge_new#66, total_amount_new#67], false
>>>