现有列在Delta合并中未被识别
我正在处理的数据示例:
源数据
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |not_active |name |xyz | xyz | 2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
目标数据
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |active |name |xyz | xyz | 2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
代码片段
target_dt.alias("target") \
.merge(
source=df_trusted.alias("source"),
condition="target.store_id=source.store_id AND target.store_status=source.store_status"
) \
.whenNotMatchedBySourceUpdate(
set={
"store_status": F.col("source.store_status"),
"store_asOfDate": F.col("source.store_asOfDate")
}
) \
.execute()
期望的结果:
- 目标数据中的
store_status
和store_asOfDate
这两行应该被更新。
目标数据(合并/更新后)
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |not_active |name |xyz | xyz | 2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
目前,出现了错误:
24/03/21 14:06:29 ERROR Error occured in my_method() method: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve source.store_id in UPDATE condition given columns....
请告诉我可以在哪里进一步调试以找出问题的根本原因。谢谢!
1 个回答
0
我觉得你的目标是最终得到一个DeltaTable:
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |not_active |name |xyz | xyz | 2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
让我把你的代码写得更高级一些,这样你就可以在合并语句中通过列表来设置列。event
这个别名是指已经存在的表,因为它包含了事件记录。而updates
这个别名是指应该包含需要更新的行的表。
from pyspark.sql import functions as f
merge_keys = ["store_id"]
cols_to_update = ["store_status", "store_asOfDate"]
(
target_dt.alias("events")
.merge(
source=df_trusted.alias("updates"),
condition=" AND ".join([f"events.`{x}` = updates.`{x}`" for x in merge_keys])
)
.whenMatchedUpdate(set={x: f"updates.`{x}`" for x in cols_to_update})
.execute()
)
另外,请不要使用类型为\
的换行符来写Python代码,因为这种方式已经过时了。你可以使用括号或者在提交前用黑色格式化工具来处理。