我试图确定特定的处理作业在接下来的五次运行中是否出现了负面结果。以下是我的数据设置:
from pyspark.sql import SQLContext, functions as func
from pyspark.sql.window import Window
import datetime
job_history_df = sqlContext.createDataFrame(
[
('A', 'X', datetime.datetime.strptime('2018-01-02 19:00:13.0', '%Y-%m-%d %H:%M:%S.0')),
('A', 'X', datetime.datetime.strptime('2018-01-03 19:00:09.0', '%Y-%m-%d %H:%M:%S.0')),
('S', 'X', datetime.datetime.strptime('2018-01-04 19:00:24.0', '%Y-%m-%d %H:%M:%S.0')),
('S', 'X', datetime.datetime.strptime('2018-01-05 19:00:21.0', '%Y-%m-%d %H:%M:%S.0')),
('S', 'X', datetime.datetime.strptime('2018-01-06 19:00:33.0', '%Y-%m-%d %H:%M:%S.0')),
('S', 'Y', datetime.datetime.strptime('2018-01-08 19:00:12.0', '%Y-%m-%d %H:%M:%S.0')),
('S', 'Y', datetime.datetime.strptime('2018-01-09 19:00:22.0', '%Y-%m-%d %H:%M:%S.0')),
('A', 'Y', datetime.datetime.strptime('2018-01-10 19:00:21.0', '%Y-%m-%d %H:%M:%S.0')),
('S', 'Y', datetime.datetime.strptime('2018-01-10 19:00:23.0', '%Y-%m-%d %H:%M:%S.0')),
],
['jhr_status', 'ajb_name', 'jhr_run_date']
)
def dt_to_timestamp():
def _dt_to_timestamp(dt):
return int(dt.timestamp() * 1000)
return func.udf(_dt_to_timestamp)
job_history_df = job_history_df.withColumn('jhr_run_date_ts', dt_to_timestamp()(func.col('jhr_run_date')).cast('long'))
job_history_df = job_history_df.withColumn('was_abend', func.when(job_history_df['jhr_status'] == 'A', 1).otherwise(0))
以下是job_history_df
的外观:
接下来,我将创建我的Window
:
base_job_window = Window().partitionBy('ajb_name').orderBy('jhr_run_date_ts')
接下来,我们将指定要求和的范围:
n_next_runs = 5
next_n_runs_window = base_job_window.rangeBetween(1, n_next_runs)
job_history_df = job_history_df.withColumn('n_abends_next_n_runs', func.sum('was_abend').over(next_n_runs_window))
让我们看看我们得到了什么:
>>> job_history_df.show(20, False)
+----------+--------+---------------------+---------------+---------+--------------------+
|jhr_status|ajb_name|jhr_run_date |jhr_run_date_ts|was_abend|n_abends_next_n_runs|
+----------+--------+---------------------+---------------+---------+--------------------+
|S |Y |2018-01-08 19:00:12.0|1515459612000 |0 |null |
|S |Y |2018-01-09 19:00:22.0|1515546022000 |0 |null |
|A |Y |2018-01-10 19:00:21.0|1515632421000 |1 |null |
|S |Y |2018-01-10 19:00:23.0|1515632423000 |0 |null |
|A |X |2018-01-02 19:00:13.0|1514941213000 |1 |null |
|A |X |2018-01-03 19:00:09.0|1515027609000 |1 |null |
|S |X |2018-01-04 19:00:24.0|1515114024000 |0 |null |
|S |X |2018-01-05 19:00:21.0|1515200421000 |0 |null |
|S |X |2018-01-06 19:00:33.0|1515286833000 |0 |null |
+----------+--------+---------------------+---------------+---------+--------------------+
真奇怪。我相信n_abends_next_n_runs
的输出应该一直是1s。如果我们把以前的失败加起来呢?在
all_previous_window = base_job_window.rangeBetween(Window.unboundedPreceding, -1)
job_history_df = job_history_df.withColumn('n_abends_to_pt', func.sum('was_abend').over(all_previous_window))
这样可以得到正确的结果:
+----------+--------+---------------------+---------------+---------+--------------------+--------------+
|jhr_status|ajb_name|jhr_run_date |jhr_run_date_ts|was_abend|n_abends_next_n_runs|n_abends_to_pt|
+----------+--------+---------------------+---------------+---------+--------------------+--------------+
|S |Y |2018-01-08 19:00:12.0|1515459612000 |0 |null |null |
|S |Y |2018-01-09 19:00:22.0|1515546022000 |0 |null |0 |
|A |Y |2018-01-10 19:00:21.0|1515632421000 |1 |null |0 |
|S |Y |2018-01-10 19:00:23.0|1515632423000 |0 |null |1 |
|A |X |2018-01-02 19:00:13.0|1514941213000 |1 |null |null |
|A |X |2018-01-03 19:00:09.0|1515027609000 |1 |null |1 |
|S |X |2018-01-04 19:00:24.0|1515114024000 |0 |null |2 |
|S |X |2018-01-05 19:00:21.0|1515200421000 |0 |null |2 |
|S |X |2018-01-06 19:00:33.0|1515286833000 |0 |null |2 |
+----------+--------+---------------------+---------------+---------+--------------------+--------------+
指定整数边界而不是使用Window.unboundedPreceding
或Window.unboundedFollowing
会有什么问题?在
作为参考,我在RHEL6VM上运行Spark版本2.1.1.2.6.2.14-5。在
进一步深入研究后,我想看看“常规的旧”SQL是否可以工作:
job_history_df.registerTempTable('table')
job_history_df = sqlContext.sql(
'''
SELECT
*,
SUM(was_abend) OVER (PARTITION BY ajb_name ORDER BY jhr_run_date_ts ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) AS abends_last_5_runs
FROM table
'''
)
事实上,确实如此!在
>>> job_history_df.show(20, False)
+----------+--------+---------------------+---------------+---------+--------------------+------------------+
|jhr_status|ajb_name|jhr_run_date |jhr_run_date_ts|was_abend|n_abends_next_n_runs|abends_last_5_runs|
+----------+--------+---------------------+---------------+---------+--------------------+------------------+
|S |Y |2018-01-08 19:00:12.0|1515459612000 |0 |null |null |
|S |Y |2018-01-09 19:00:22.0|1515546022000 |0 |null |0 |
|A |Y |2018-01-10 19:00:21.0|1515632421000 |1 |null |0 |
|S |Y |2018-01-10 19:00:23.0|1515632423000 |0 |null |1 |
|A |X |2018-01-02 19:00:13.0|1514941213000 |1 |null |null |
|A |X |2018-01-03 19:00:09.0|1515027609000 |1 |null |1 |
|S |X |2018-01-04 19:00:24.0|1515114024000 |0 |null |2 |
|S |X |2018-01-05 19:00:21.0|1515200421000 |0 |null |2 |
|S |X |2018-01-06 19:00:33.0|1515286833000 |0 |null |2 |
+----------+--------+---------------------+---------------+---------+--------------------+------------------+
虽然这仍然不能解决从尝试中纯sparksql尝试的问题,但它确实让我明天的工作轻松多了:)
目前没有回答
相关问题
PyPI热门下载资源包