查找列值更改的日期

2024-04-19 22:42:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据帧,比如:

+---+---+---+---+----------+
|key| c1| c2| c3|      date|
+---+---+---+---+----------+
| k1| -1|  0| -1|2015-04-28|
| k1|  1| -1|  1|2015-07-28|
| k1|  1|  1|  1|2015-10-28|
| k1|  1|  1| -1|2015-12-28|
| k2| -1|  0|  1|2015-04-28|
| k2| -1|  1| -1|2015-07-28|
| k2|  1| -1|  0|2015-10-28|
| k2|  1| -1|  1|2015-11-28|
+---+---+---+---+----------+

创建A的代码:

 data = [('k1', '-1', '0', '-1','2015-04-28'),
    ('k1', '1', '-1', '1', '2015-07-28'),
    ('k1', '1', '1', '1', '2015-10-28'),
    ('k1', '1', '1', '-1', '2015-12-28'),
    ('k2', '-1', '0', '1', '2015-04-28'),
    ('k2', '-1', '1', '-1', '2015-07-28'),
    ('k2', '1', '-1', '0', '2015-10-28'),
    ('k2', '1', '-1', '1', '2015-11-28')]
A = spark.createDataFrame(data, ['key', 'c1', 'c2','c3','date'])
A = A.withColumn('date',A.date.cast('date'))

我想得到日期,此时c3列的值第一次发生了变化(按日期升序排列),预期结果如下:

+---+---+----------+
|key| c3|      date|
+---+---+----------+
| k1|  1|2015-07-28|
| k2| -1|2015-07-28|
+---+---+----------+

Tags: 数据key代码datadatek2k1spark
2条回答

这显然是一个窗口函数的工作:

from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, sum

# Define a window
w = Window.partitionBy("key").orderBy("date")

(A
    .withColumn(
        "ind", 
        # Compute cumulative sum of the indicator variables over window
        sum(
            # yield 1 if date has changed from the previous row, 0 otherwise
            (lag("date", 1).over(w) != col("date")).cast("int")
        ).over(w))
    # Date has change for the first time when cumulative sum is equal to 1
    .where(col("ind") == 1))

结果是:

+ -+ -+ -+ -+     + -+
|key| c1| c2| c3|      date|ind|
+ -+ -+ -+ -+     + -+
| k2| -1|  1| -1|2015-07-28|  1|
| k1|  1| -1|  1|2015-07-28|  1|
+ -+ -+ -+ -+     + -+

下面是我使用自定义项的解决方案。你知道吗

import pyspark.sql.functions as func
from pyspark.sql.types import *

data = [('k1', '-1', '0', '-1','2015-04-28'),
        ('k1', '1', '-1', '1', '2015-07-28'),
        ('k1', '1', '1', '1', '2015-10-28'),
        ('k2', '-1', '0', '1', '2015-04-28'),
        ('k2', '-1', '1', '-1', '2015-07-28'),
        ('k2', '1', '-1', '0', '2015-10-28')]

# note that I didn't cast date type here
A = spark.createDataFrame(data, ['key', 'c1', 'c2','c3','date'])
A_group = A.select('key', 'c3', 'date').groupby('key')
A_agg = A_group.agg(func.collect_list(func.col('c3')).alias('c3'), 
                    func.collect_list(func.col('date')).alias('date_list'))

# UDF to return first change for given list
def find_change(c3_list, date_list):
    """return first change"""
    for i in range(1, len(c3_list)):
        if c3_list[i] != c3_list[i-1]:
            return [c3_list[i], date_list[i]]
    else:
        return None

udf_find_change = func.udf(find_change, returnType=ArrayType(StringType()))

# find first change given 
A_first_change = A_agg.select('key', udf_find_change(func.col('c3'), func.col('date_list')).alias('first_change'))

A_first_change.select('key', 
                      func.col('first_change').getItem(0).alias('c3'), 
                      func.col('first_change').getItem(1).alias('date').cast('date').show()

输出

+ -+ -+     +
|key| c3|      date|
+ -+ -+     +
| k2| -1|2015-07-28|
| k1|  1|2015-07-28|
+ -+ -+     +

相关问题 更多 >