如何使用分组数据的后继行的值来决定当前行的值

2024-04-20 07:37:26 发布

您现在位置:Python中文网/ 问答频道 /正文

对于下面的数据集,我想将won_offer列的值更改为10。问题是我需要客户代码组合的后续行来决定该列的值

如果在当前行日期的30天内的下一行包含order,并且价格低于当前行的价格,则该行won_offer列的0可以成为1

示例数据集:

analysis = sqlContext.createDataFrame(
    [
        ('customer1', 'code1', 'date', 'order', 1.7, 0, 1),
        ('customer1', 'code2', 'date', 'offer', 1.5, 0, 2),
        ('customer1', 'code2', 'date', 'offer', 2.0, 0, 2),
        ('customer2', 'code1', 'date', 'offer', 1.2, 0,4),
        ('customer2', 'code1', 'date', 'order', 1.1, 0,4),
        ('customer2', 'code1', 'date', 'order', 2.0, 0,4),
        ('customer2', 'code1', 'date', 'offer', 1.2, 0,4)
    ],
    ('customer', 'code', 'order_date', 'type', 'price', 'final_offer', 'counter')
)

我尝试过这样的方法,但没有效果,因为我不知道如何将多行传递给我的udf:

w = \
    Window.partitionBy('customer','code').orderBy('orderoffer_date')

@F.udf(returnType=IntegerType())
def logic_udf(counter, curr_date, next_dates, current_type, next_types, curr_price, next_prices) :
    for i in range(len(counter)):
        if (next_dates[i] < curr_date+30):
            if (next_types[i] == 'order') & (next_prices[i] < curr_price ):
                return 1
            else:
                return 0
        else:
            return 0

analysis = analysis.withColumn('won_offer', 
               logic(analysis.counter, analysis.order_date,lead(analysis.order_date, 
               analysis.n).over(w), analysis.type,lead(analysis.type, 
               analysis.n).over(w), analysis.price, lead(analysis.price, 
               analysis.n).over(w)))

期望输出:

desired_result = sqlCtx.createDataFrame(
    [
        ('customer1', 'code1', 'date', 'order', 1.7, 0, 1),
        ('customer1', 'code2', 'date', 'offer', 1.5, 0, 2),
        ('customer1', 'code2', 'date', 'offer', 2.0, 0, 2),
        ('customer2', 'code1', 'date', 'offer', 1.2, 1, 4),
        ('customer2', 'code1', 'date', 'order', 1.1, 1, 4),
        ('customer2', 'code1', 'date', 'order', 1.0, 0, 4),
        ('customer2', 'code1', 'date', 'offer', 1.2, 0, 4)
    ],
    ('customer', 'code', 'order_date', 'type', 'price', 'final_offer', 'counter')
)

我知道我的问题很复杂如果有人能告诉我如何将多行分组数据传递给自定义项,我已经得到了很大的帮助

简言之:主要目标是通过查看下一行中的多个列(仍在其特定组中)来确定行中某列的值

提前谢谢! 查尔斯


Tags: 数据datetypecounterorderanalysispricenext
1条回答
网友
1楼 · 发布于 2024-04-20 07:37:26

您可以使用window和sql函数来替换逻辑\ udf。因为您只使用当前行之后的第一行,所以可以将当前行之后的第一行添加到当前行

from pyspark.sql import functions as F

analysis \
    .withColumn('next_order_date', F.first('order_date').over(w)) \
    .withColumn('next_type', F.first('type').over(w)) \
    .withColumn('next_price', F.first('price').over(w)) \
    .withColumn('won_offer', F.when(condition, 1).otherwise(0))

相关问题 更多 >