我在Airflow上构建了一个自定义操作符,它调用API获取数据,然后将数据写入BigQuery。但是,问题是我必须将execution_date宏作为API参数传递,以便能够调用该日期的数据。遗憾的是,当我尝试这样做时,我的操作员无法解析我传递的jinja模板。当我检查我为此做的日志记录时,它只显示如下图所示的模板。 我希望你们能帮忙
这是自定义运算符和dag的代码。谢谢
...
class MyOperator(BaseOperator):
def __init__(self,date):
super(MyOperator,self).__init__(*arg,**kwargs)
self.date = date
def __pull_from_api(self):
api_link = "somelink.com/api/date={}".format(self.date)
data = request.get(api_link).json()
return data
def execute(self,context):
data = self.__pull_from_api()
...
dag = DAG('My Pipeline', default_args=default_args)
t1 = MyOperator(date='{{ execution_date}}', task_id='my_pipeline_1', dag=dag)
t1
我鼓励大家看看PythonOperator和context dictionary。在Composer中使用变量时,这两种方法都很有用
首先,由于您已经有了自定义操作符,我强烈建议您查看一下here。有一些常用的宏和模板使用气流。因此,您可以找出您可能有错误的地方
最好的方法是从上下文中获取执行日期,该日期已经传递给execute(self,context)
例如,我在这里设置执行日期的字符串表示形式:
Example in LatestOnlyOperator
相关问题 更多 >
编程相关推荐