Pyspark如果列位于另一个Spark数据帧中,则创建新列

2024-04-26 00:33:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在Spark数据框中创建一列,如果列的行位于单独的数据框中,则使用一个标志

这是我的主要Spark数据帧(df_main

+--------+
|main    |
+--------+
|28asA017|
|03G12331|
|1567L044|
|02TGasd8|
|1asd3436|
|A1234567|
|B1234567|
+--------+

这是我的引用(df_ref),这个引用中有数百行,所以我显然不能像这样硬编码它们solutionthis one

+--------+
|mask_vl |
+--------+
|A1234567|
|B1234567|
...
+--------+

通常,我会在熊猫的数据框中做以下操作:

df_main['is_inref'] = np.where(df_main['main'].isin(df_ref.mask_vl.values), "YES", "NO")

这样我就能得到这个

+--------+--------+
|main |is_inref|
+--------+--------+
|28asA017|NO      |
|03G12331|NO      |
|1567L044|NO      |
|02TGasd8|NO      |
|1asd3436|NO      |
|A1234567|YES     |
|B1234567|YES     |
+--------+--------+

我尝试了下面的代码,但我不明白图片中的错误是什么意思

df_main = df_main.withColumn('is_inref', "YES" if F.col('main').isin(df_ref) else "NO")
df_main.show(20, False)

Error of the mentioned code


Tags: 数据norefdfismain标志mask
3条回答

你很接近。我认为您需要的额外步骤是显式创建包含来自df_ref的值的列表

请参见下图:

# Create your DataFrames
df = spark.createDataFrame(["28asA017","03G12331","1567L044",'02TGasd8','1asd3436','A1234567','B1234567'], "string").toDF("main")
df_ref =  spark.createDataFrame(["A1234567","B1234567"], "string").toDF("mask_vl")

然后,您可以创建一个list并使用isin,就像您拥有的一样:

# Imports
from pyspark.sql.functions import col, when

# Create a list with the values of your reference DF
mask_vl_list = df_ref.select("mask_vl").rdd.flatMap(lambda x: x).collect()

# Use isin to check whether the values in your column exist in the list
df_main = df_main.withColumn('is_inref', when(col('main').isin(mask_vl_list), 'YES').otherwise('NO'))

这将为您提供:

>>> df_main.show()

+    +    +
|    main|is_inref|
+    +    +
|28asA017|      NO|
|03G12331|      NO|
|1567L044|      NO|
|02TGasd8|      NO|
|1asd3436|      NO|
|A1234567|     YES|
|B1234567|     YES|
+    +    +

我想这个问题已经回答了,你可以在这里查看 spark detecting the unchanged rows

如果您想避免对方付费,我建议您采取下一步措施:

df_ref= df_ref
          .withColumnRenamed("mask_v1", "main")
          .withColumn("isPreset", lit("yes"))
      
 main_df= main_df.join(df_ref, Seq("main"), "left_outer")
          .withColumn("is_inref", when(col("isPresent").isNull,
          lit("NO")).otherwise(lit("YES")))

相关问题 更多 >