Pypark将间隔拆分为子间隔

2024-03-28 22:20:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据框,有3列“from”,“to”,“country”,例如:

from to country
1    105 abc
500  1000 def

我想通过将from和to值拆分为size=10来创建dataframe。所以我应该把数据帧作为

from to country
1    10 abc
11   20 abc
21   30 abc
31   40 abc
...
91   105 abc ( the left out values go in last bucket for that range)
500  510 def

等等。。。你知道吗


Tags: theto数据infromgodataframesize
1条回答
网友
1楼 · 发布于 2024-03-28 22:20:08
from pyspark.sql.functions import udf, col, explode, array, struct, length
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType

#Creating the DataFrame
values = [(1,105,'abc'),(500,1000,'def')]
df = sqlContext.createDataFrame(values,['from','to','country'])

step_size=10
#Creating UDFs below
def make_list_from(start,end):
    return [i for i in list(range(start, end, step_size)) if (end-i) >= (step_size-1)]
make_list_from_udf = udf(make_list_from,ArrayType(IntegerType()))

def make_list_to(start,end):
    right_list=[i+step_size-1 for i in list(range(start, end, step_size)) if (end-i) >= (step_size-1)]
    right_list[len(right_list)-1]=end
    return right_list
make_list_to_udf = udf(make_list_to,ArrayType(IntegerType()))

#Creating Lists of sub-intervals
df = df.withColumn('my_list_from',make_list_from_udf(col('from'),col('to')))\
       .withColumn('my_list_to',make_list_to_udf(col('from'),col('to')))\
       .drop('from','to')
df.show()
+   -+          +          +
|country|        my_list_from|          my_list_to|
+   -+          +          +
|    abc|[1, 11, 21, 31, 4...|[10, 20, 30, 40, ...|
|    def|[500, 510, 520, 5...|[509, 519, 529, 5...|
+   -+          +          +

#Exploding the Lists
zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())
  ]))
)
df = (df
    .withColumn("tmp", zip_("my_list_from", "my_list_to"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select(col("tmp.first").alias("from"), col("tmp.second").alias("to"), "country"))
df.show(100)
+  +  +   -+
|from|  to|country|
+  +  +   -+
|   1|  10|    abc|
|  11|  20|    abc|
|  21|  30|    abc|
|  31|  40|    abc|
|  41|  50|    abc|
|  51|  60|    abc|
|  61|  70|    abc|
|  71|  80|    abc|
|  81|  90|    abc|
|  91| 105|    abc|
| 500| 509|    def|
| 510| 519|    def|
| 520| 529|    def|
.
.
.
| 960| 969|    def|
| 970| 979|    def|
| 980| 989|    def|
| 990|1000|    def|
+  +  +   -+

相关问题 更多 >