PySpark窗口无法使用指定的整数边界

2021-04-12 00:20:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图确定特定的处理作业在接下来的五次运行中是否出现了负面结果。以下是我的数据设置:

from pyspark.sql import SQLContext, functions as func
from pyspark.sql.window import Window
import datetime

job_history_df = sqlContext.createDataFrame(
    [
        ('A', 'X', datetime.datetime.strptime('2018-01-02 19:00:13.0', '%Y-%m-%d %H:%M:%S.0')),
        ('A', 'X', datetime.datetime.strptime('2018-01-03 19:00:09.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'X', datetime.datetime.strptime('2018-01-04 19:00:24.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'X', datetime.datetime.strptime('2018-01-05 19:00:21.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'X', datetime.datetime.strptime('2018-01-06 19:00:33.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'Y', datetime.datetime.strptime('2018-01-08 19:00:12.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'Y', datetime.datetime.strptime('2018-01-09 19:00:22.0', '%Y-%m-%d %H:%M:%S.0')),
        ('A', 'Y', datetime.datetime.strptime('2018-01-10 19:00:21.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'Y', datetime.datetime.strptime('2018-01-10 19:00:23.0', '%Y-%m-%d %H:%M:%S.0')),

    ],
    ['jhr_status', 'ajb_name', 'jhr_run_date']
)

def dt_to_timestamp():
    def _dt_to_timestamp(dt):
        return int(dt.timestamp() * 1000)
    return func.udf(_dt_to_timestamp)

job_history_df = job_history_df.withColumn('jhr_run_date_ts', dt_to_timestamp()(func.col('jhr_run_date')).cast('long'))
job_history_df = job_history_df.withColumn('was_abend', func.when(job_history_df['jhr_status'] == 'A', 1).otherwise(0))

以下是job_history_df的外观:

^{pr2}$

接下来,我将创建我的Window

base_job_window = Window().partitionBy('ajb_name').orderBy('jhr_run_date_ts')

接下来,我们将指定要求和的范围:

n_next_runs = 5

next_n_runs_window = base_job_window.rangeBetween(1, n_next_runs)
job_history_df = job_history_df.withColumn('n_abends_next_n_runs', func.sum('was_abend').over(next_n_runs_window))

让我们看看我们得到了什么:

>>> job_history_df.show(20, False)
+----------+--------+---------------------+---------------+---------+--------------------+
|jhr_status|ajb_name|jhr_run_date         |jhr_run_date_ts|was_abend|n_abends_next_n_runs|
+----------+--------+---------------------+---------------+---------+--------------------+
|S         |Y       |2018-01-08 19:00:12.0|1515459612000  |0        |null                |
|S         |Y       |2018-01-09 19:00:22.0|1515546022000  |0        |null                |
|A         |Y       |2018-01-10 19:00:21.0|1515632421000  |1        |null                |
|S         |Y       |2018-01-10 19:00:23.0|1515632423000  |0        |null                |
|A         |X       |2018-01-02 19:00:13.0|1514941213000  |1        |null                |
|A         |X       |2018-01-03 19:00:09.0|1515027609000  |1        |null                |
|S         |X       |2018-01-04 19:00:24.0|1515114024000  |0        |null                |
|S         |X       |2018-01-05 19:00:21.0|1515200421000  |0        |null                |
|S         |X       |2018-01-06 19:00:33.0|1515286833000  |0        |null                |
+----------+--------+---------------------+---------------+---------+--------------------+

真奇怪。我相信n_abends_next_n_runs的输出应该一直是1s。如果我们把以前的失败加起来呢?在

all_previous_window = base_job_window.rangeBetween(Window.unboundedPreceding, -1)
job_history_df = job_history_df.withColumn('n_abends_to_pt', func.sum('was_abend').over(all_previous_window))

这样可以得到正确的结果:

+----------+--------+---------------------+---------------+---------+--------------------+--------------+
|jhr_status|ajb_name|jhr_run_date         |jhr_run_date_ts|was_abend|n_abends_next_n_runs|n_abends_to_pt|
+----------+--------+---------------------+---------------+---------+--------------------+--------------+
|S         |Y       |2018-01-08 19:00:12.0|1515459612000  |0        |null                |null          |
|S         |Y       |2018-01-09 19:00:22.0|1515546022000  |0        |null                |0             |
|A         |Y       |2018-01-10 19:00:21.0|1515632421000  |1        |null                |0             |
|S         |Y       |2018-01-10 19:00:23.0|1515632423000  |0        |null                |1             |
|A         |X       |2018-01-02 19:00:13.0|1514941213000  |1        |null                |null          |
|A         |X       |2018-01-03 19:00:09.0|1515027609000  |1        |null                |1             |
|S         |X       |2018-01-04 19:00:24.0|1515114024000  |0        |null                |2             |
|S         |X       |2018-01-05 19:00:21.0|1515200421000  |0        |null                |2             |
|S         |X       |2018-01-06 19:00:33.0|1515286833000  |0        |null                |2             |
+----------+--------+---------------------+---------------+---------+--------------------+--------------+

指定整数边界而不是使用Window.unboundedPrecedingWindow.unboundedFollowing会有什么问题?在

作为参考,我在RHEL6VM上运行Spark版本2.1.1.2.6.2.14-5。在

更多分析

进一步深入研究后,我想看看“常规的旧”SQL是否可以工作:

job_history_df.registerTempTable('table')

job_history_df = sqlContext.sql(
    '''
    SELECT
        *,
        SUM(was_abend) OVER (PARTITION BY ajb_name ORDER BY jhr_run_date_ts ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) AS abends_last_5_runs
    FROM table
    '''
)

事实上,确实如此!在

>>> job_history_df.show(20, False)
+----------+--------+---------------------+---------------+---------+--------------------+------------------+
|jhr_status|ajb_name|jhr_run_date         |jhr_run_date_ts|was_abend|n_abends_next_n_runs|abends_last_5_runs|
+----------+--------+---------------------+---------------+---------+--------------------+------------------+
|S         |Y       |2018-01-08 19:00:12.0|1515459612000  |0        |null                |null              |
|S         |Y       |2018-01-09 19:00:22.0|1515546022000  |0        |null                |0                 |
|A         |Y       |2018-01-10 19:00:21.0|1515632421000  |1        |null                |0                 |
|S         |Y       |2018-01-10 19:00:23.0|1515632423000  |0        |null                |1                 |
|A         |X       |2018-01-02 19:00:13.0|1514941213000  |1        |null                |null              |
|A         |X       |2018-01-03 19:00:09.0|1515027609000  |1        |null                |1                 |
|S         |X       |2018-01-04 19:00:24.0|1515114024000  |0        |null                |2                 |
|S         |X       |2018-01-05 19:00:21.0|1515200421000  |0        |null                |2                 |
|S         |X       |2018-01-06 19:00:33.0|1515286833000  |0        |null                |2                 |
+----------+--------+---------------------+---------------+---------+--------------------+------------------+

虽然这仍然不能解决从尝试中纯sparksql尝试的问题,但它确实让我明天的工作轻松多了:)