现有列在Delta合并中未被识别

0 投票
1 回答
39 浏览
提问于 2025-04-13 13:01

我正在处理的数据示例:

源数据

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

目标数据

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |active       |name                |xyz                 |    xyz       |        2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

代码片段

target_dt.alias("target") \
    .merge(
        source=df_trusted.alias("source"),
        condition="target.store_id=source.store_id AND target.store_status=source.store_status"
    ) \
    .whenNotMatchedBySourceUpdate(
        set={
            "store_status": F.col("source.store_status"),
            "store_asOfDate": F.col("source.store_asOfDate")
        }
    ) \
    .execute()

期望的结果

  • 目标数据中的 store_statusstore_asOfDate 这两行应该被更新。

目标数据(合并/更新后)

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

目前,出现了错误:

24/03/21 14:06:29 ERROR Error occured in my_method() method: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve source.store_id in UPDATE condition given columns....

请告诉我可以在哪里进一步调试以找出问题的根本原因。谢谢!

1 个回答

0

我觉得你的目标是最终得到一个DeltaTable:

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

让我把你的代码写得更高级一些,这样你就可以在合并语句中通过列表来设置列。event这个别名是指已经存在的表,因为它包含了事件记录。而updates这个别名是指应该包含需要更新的行的表。

from pyspark.sql import functions as f

merge_keys = ["store_id"]
cols_to_update = ["store_status", "store_asOfDate"]
(
    target_dt.alias("events")
    .merge(
        source=df_trusted.alias("updates"),
        condition=" AND ".join([f"events.`{x}` = updates.`{x}`" for x in merge_keys])
    )
    .whenMatchedUpdate(set={x: f"updates.`{x}`" for x in cols_to_update})
    .execute()
)

另外,请不要使用类型为\的换行符来写Python代码,因为这种方式已经过时了。你可以使用括号或者在提交前用黑色格式化工具来处理。

撰写回答