从数据框列中提取标签

2024-04-26 02:22:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据帧,其中包含来自Azure Consumtion Databricks python笔记本的数据。我在这里只显示列/行的子集。你知道吗

[Row(ResourceRate='0.029995920244854', PreTaxCost='0.719902085876484',  
ResourceType='Microsoft.Compute/virtualMachines',  Tags=None, ),
 Row(ResourceRate='1.10999258782982',  PreTaxCost='26.6398221079157',  
ResourceType='Microsoft.Compute/virtualMachines',  
Tags='"{  ""project"": ""70023"",  ""service"": ""10043""}"')
 ]

我需要从tags列中提取标签,并将它们公开为(表)列。
顺便说一句,我不知道从哪里得到这对双引号。可能来自源表beeing.csv。但这很可能很容易在最后解决。你知道吗

我用Pypark。我试着做这样的事Split Spark Dataframe string column into multiple columns

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import split, posexplode, concat, expr, lit, col, first
df2 = df.withColumn("num", monotonically_increasing_id())
df3 = df2.select(
        "num",
        split("Tags", ", ").alias("Tags"),
        posexplode(split("Tags", ",")).alias("pos", "val")
    )
#display(df3)
df4 = df3.drop("val")\
    .select(
        "num",
        concat(lit("Tag"),col("pos").cast("string")).alias("name"),
        expr("Tags[pos]").alias("val")
    )
# display(df4)
df5 = df4.groupBy("num").pivot("name").agg(first("val"))
display(df5)

那不是我想要的。你知道吗

num     Tag0
964     
1677    """project"": ""70023"", """service"": ""10024""
2040    """project"": ""70025"", """service"": ""10034""
2214    
...

我更喜欢用cols标记:

num     project        service       ResourceRate       PreTaxCost
964                                  0.029995920244854  0.719902085876484
677     70023          10024         1.10999258782982   26.6398221079157
2040    70025          10034         0.029995920244854  0.719902085876484
2214                                 0.029995920244854  0.719902085876484
...

Tags: 数据posprojectservicedisplaytagsaliasval
2条回答

下面是尝试将标记拆分为多列的示例代码:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f


def columnList(r):
  val = str(r[0].tags)
  i = int(val.index("{") + 1)
  j = int(val.index("}"))
  val = val[i:j]
  vals = val.split(",")
  collist = []
  collist.append('id')
  for val in vals:
    keyval = val.split(":")
    key = keyval[0]
    collist.append(key.replace('"',""))
  return collist

def valueList(r):
  val = r[1]
  i = int(val.index("{")+1)
  j = int(val.index("}"))
  val = val[i:j]
  vals = val.split(",")
  valList = []
  valList.append(r[0])
  for val in vals:
      keyval = val.split(":")
      value = keyval[1]
      valList.append(value.replace('"',""))
  return valList

sc = SparkSession.builder.appName("example").\
config("spark.driver.memory","1g").\
config("spark.executor.cores",2).\
config("spark.max.cores",4).getOrCreate()

df = 

你知道吗sc.read.格式(“csv”).option(“header”,“true”).option(“delimiter”,“|”).load(”列.csv“”)

tagsdf = df.select("id","tags")


colList = columnList(tagsdf.rdd.take(1))
tagsdfrdd = tagsdf.rdd.map(lambda r : valueList(r))

dfwithnewcolumns = tagsdfrdd.toDF(colList)

newdf = df.drop("tags").join(dfwithnewcolumns,on=["id"])

newdf.show()

样本测试文件 id |资源费率|税前成本|资源类型|标签 1|'1.10999258782982'|'26.6398221079157'|'Microsoft.Compute/virtualMachines软件“{”“项目”“:”“70023”“,”“服务”“:”“10043”“}”

如果没有id列,那么可能需要合并rdd

IIUC,您可以将Tags转换为一列JSON字符串(trim前导和尾随"regexp_replace"转换为一个单独的"),然后使用json_tuple()检索所需字段。见以下代码:

from pyspark.sql.functions import expr, json_tuple

df.withColumn('Tags', expr("""regexp_replace(trim(BOTH '"' FROM Tags), '""', '"')""")) \
  .select('*', json_tuple('Tags', 'project', 'service').alias('project','service'))\
  .show()                                                  
#+        -+        -+          +          +   -+   -+
#|       PreTaxCost|     ResourceRate|        ResourceType|                Tags|project|service|
#+        -+        -+          +          +   -+   -+
#|0.719902085876484|0.029995920244854|Microsoft.Compute...|                null|   null|   null|
#| 26.6398221079157| 1.10999258782982|Microsoft.Compute...|{ "project": "700...|  70023|  10043|
#+        -+        -+          +          +   -+   -+

相关问题 更多 >

    热门问题