如何在pyspark中将字符串分隔列转换为MapType?

2024-06-02 07:19:56 发布

您现在位置:Python中文网/ 问答频道 /正文

数据示例:

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customtargeting                                                                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|nocid=no;store=2007;tppid=45c566dd-00d7-4193-b5c7-17843c2764e9                                                                                                         |
|nocid=no;store=3084;tppid=4cd36fde-c59a-41d2-a2b4-b731b6cfbe05                                                                                                         |
|nocid=no;tppid=c688c1be-a9c5-47a2-8c09-aef175a19847                                                                                                                    |
|nocid=yes;search=washing liquid;store=3060                                                                                                                             |
|pos=top;tppid=278bab7b-d40b-4783-8f89-bef94a9f5150                                                                                                                     |
|pos=top;tppid=00bb87fa-f3f5-4b0e-bbf8-16079a1a5efe                                                                                                                     |
|nocid=no;shelf=cleanser-toner-and-face-mask;store=2019;tppid=84006d41-eb63-4ae1-8c3c-3ac9436d446c                                                                      |
|pos=top;tppid=ed02b037-066b-46bd-99e6-d183160644a2                                                                                                                     |
|nocid=yes;search=salad;store=3060                                                                                                                                      |
|pos=top;nocid=no;store=2882;tppid=164563e4-8e5c-4366-a5a8-438ffb10da9d                                                                                                 |
|nocid=yes;search=beer;store=3060                                                                                                                                       |
|nocid=no;search=washing capsules;store=5528;tppid=4f9b99eb-65ff-4fbc-b11c-b0552b7f158d                                                                                 |
|pos=right;tppid=ddb54247-a5c9-40a0-9f99-8412d8542b4c                                                                                                                   |
|nocid=yes;search=bedding;store=3060                                                                                                                                    |
|pos=top                                                                                                                                                                |
|pos=mpu1;keywords=helium canisters;keywords=tesco.com;keywords=helium canisters reviews;keywords=tesco;keywords=helium canisters uk;keywords=balloons;pagetype=category|

我想把一个PySpark dataframe列转换为一个映射类型,该列可以包含任意数量的键值对,列的类型是string,对于某些键,有多个值我想在数组中转换为键的值。在


Tags: 数据storenopos类型searchtopyes
2条回答

如果您想分离列并创建新的数据帧,可以使用pandas特性。在下面找到我的解决方案

>>> import pandas as pd
>>> 
>>> rdd = sc.textFile('/home/ali/text1.txt')
>>> rdd.first()
'nocid=no;store=2007;tppid=45c566dd-00d7-4193-b5c7-17843c2764e9'
>>> rddMap = rdd.map(lambda x: x.split(';'))
>>> rddMap.first()
['nocid=no', 'store=2007', 'tppid=45c566dd-00d7-4193-b5c7-17843c2764e9']
>>> 
>>> df1 = pd.DataFrame()
>>> for rdd in rddMap.collect():
...     a = {i.split('=')[0]:i.split('=')[1] for i in rdd}
...     df2 = pd.DataFrame([a], columns=a.keys())
...     df1 = pd.concat([df1, df2])
... 
>>> df = spark.createDataFrame(df1.astype(str)).replace('nan',None)
>>> df.show()
+    +  -+    +  -+        +          +  -+          +
|keywords|nocid|pagetype|  pos|          search|               shelf|store|               tppid|
+    +  -+    +  -+        +          +  -+          +
|    null|   no|    null| null|            null|                null| 2007|45c566dd-00d7-419...|
|    null|   no|    null| null|            null|                null| 3084|4cd36fde-c59a-41d...|
|    null|   no|    null| null|            null|                null| null|c688c1be-a9c5-47a...|
|    null|  yes|    null| null|  washing liquid|                null| 3060|                null|
|    null| null|    null|  top|            null|                null| null|278bab7b-d40b-478...|
|    null| null|    null|  top|            null|                null| null|00bb87fa-f3f5-4b0...|
|    null|   no|    null| null|            null|cleanser-toner-an...| 2019|84006d41-eb63-4ae...|
|    null| null|    null|  top|            null|                null| null|ed02b037-066b-46b...|
|    null|  yes|    null| null|           salad|                null| 3060|                null|
|    null|   no|    null|  top|            null|                null| 2882|164563e4-8e5c-436...|
|    null|  yes|    null| null|            beer|                null| 3060|                null|
|    null|   no|    null| null|washing capsules|                null| 5528|4f9b99eb-65ff-4fb...|
|    null| null|    null|right|            null|                null| null|ddb54247-a5c9-40a...|
|    null|  yes|    null| null|         bedding|                null| 3060|                null|
|    null| null|    null|  top|            null|                null| null|                null|
|balloons| null|category| mpu1|            null|                null| null|                null|
+    +  -+    +  -+        +          +  -+          +

试试这个

import pyspark.sql.functions as F
from pyspark.sql.types import *

def convert_to_json(_str):
    _split_str = [tuple(x.split('=')) for x in _str.split(';') if len(tuple(x.split('='))) == 2]
    _json = {}
    for k,v in _split_str:
        if k in _json:
            _json[k].append(v)
        else:
            _json[k] = [v]

    return _json

convert_udf = F.udf(convert_to_json, MapType(StringType(),ArrayType(StringType())))
df = df.withColumn('customtargeting', convert_udf('customtargeting'))

print df.schema
print df.limit(5).collect()

这将给您提供模式和输出为

^{pr2}$

相关问题 更多 >