TimeDeltaSensor失败,操作数类型不受支持
我正在“Google Cloud Composer”上使用Airflow,我需要从不同帐户的“FaceBook广告”中获取数据,所以我需要在两次通话之间设置一些延迟秒。 我试过使用TimeDeltaSensor,但是在dag链中插入这个操作符的调用,会发送错误的代码。你知道吗
这是我的狗:
dag_start_date = get_yesterday(at_midnight=True)
dag_args = get_dag_args(FLOW, dag_name, dag_start_date=dag_start_date)
with DAG(dag_id=dag_name, default_args=dag_args, start_date=dag_start_date, schedule_interval='@once') as dag:
worker = FacebookAdsToBigQuery(FLOW, dag)
end_date = dag_start_date
start_date = end_date - timedelta(days=3)
worker.run_dag(start_date, end_date)
这是关于时间传感器的代码:
....
for level in FacebookAdsToBigQuery.LEVELS:
table_schema_fname = self.level_table_schema_file[level]
schema_fields = bb_utils.load_json_file(table_schema_fname)
for j, account_id in enumerate(self.account_ids):
tsk_prefix = f"{self.dag.dag_id}_{account_id}_{level}"
delta = j * 10
if delta > 0:
wait_x_nextact_tsk = TimeDeltaSensor(task_id=f'{tsk_prefix}_wait_{delta}',
delta=timedelta(seconds=delta), dag=self.dag)
else:
wait_x_nextact_tsk = DummyOperator(task_id=f'{tsk_prefix}_nowait', dag=self.dag)
....
这是错误:
[2019-10-08 11:18:25,547] {base_task_runner.py:101} INFO - Job 199692: Subtask facebook_ads_233070883986464_ad_wait_40 [2019-10-08 11:18:25,546] {cli.py:520} INFO - Running on host airflow-worker-5fd4678587-l6nlf
[2019-10-08 11:18:25,642] {models.py:1796} ERROR - unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta'
原因可能是操作员无法理解dag参数调度间隔='@once'?你知道吗
谢谢
目前没有回答
相关问题 更多 >
编程相关推荐