在Composer中使用模板化SQL文件时遇到问题。我认为这个问题与我将DAG打包为zip文件以包含其他代码有关。在
我从这个开始(只展示相关部分):
dag = DAG('my_dag',
default_args=default_args,
schedule_interval=schedule_interval)
task0 = BigQueryOperator(
task_id='task0',
use_legacy_sql=False,
bql='sql/query_file.sql',
bigquery_conn_id=bigquery_conn_id,
dag=dag)
文件结构如下所示:
^{pr2}$我像这样压缩它并复制到Composer dags文件夹:
zip -r my_zip_file.zip *.py my_pkg/ sql/
这在本地有效,但在Composer上部署时出现错误:
TemplateNotFound: sql/query_file.sql
我确信我在zip文件中包含了SQL文件。我也试着把它移到根文件夹(没有sql/子目录),但是我得到了相同的结果。在
我在某个地方读到在实例化DAG对象时需要设置template_searchpath
。我没能成功地做到这一点。当我尝试相对路径(sql
)时,我得到更多TemplateNotFound
错误。当我尝试如下的绝对路径时,我得到not a directory
。在
我试了一下:
dag = DAG('my_dag',
default_args=default_args,
schedule_interval=schedule_interval,
template_searchpath = os.path.dirname(__file__) + "/sql"
)
task0 = BigQueryOperator(
task_id='task0',
use_legacy_sql=False,
bql='query_file.sql',
bigquery_conn_id=bigquery_conn_id,
dag=dag)
我还尝试将'sql'作为任务路径的一部分,而不是模板searchpath,我再次尝试将所有内容移动到根级别,但得到了相同的“nota directory”错误。在
据我所知,这个问题与文件包含在zip文件中这一事实有关。__file__
返回/home/airflow/gcs/dags/my_zip_file.zip/my_dag_file.py
。但是os.listdir(os.path.dirname(__file__))
抛出相同的not a directory
错误。所以,也许因为我们是在一个zip归档文件中执行的,我们不能用同样的方式使用文件夹和路径。也许Jinja被这个绊倒了。。。?或者,在打包zip文件时,可能还有其他事情要做?在
[2018-06-20 15:35:34,837] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-06-20 15:35:34,838] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-06-20 15:35:34,840] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-06-20 15:35:34,841] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-06-20 15:35:34,841] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-06-20 15:35:34,842] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-06-20 15:35:34,843] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-06-20 15:35:34,843] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1477, in _run_raw_task
[2018-06-20 15:35:34,844] {base_task_runner.py:98} INFO - Subtask: self.render_templates()
[2018-06-20 15:35:34,844] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1760, in render_templates
[2018-06-20 15:35:34,845] {base_task_runner.py:98} INFO - Subtask: rendered_content = rt(attr, content, jinja_context)
[2018-06-20 15:35:34,847] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 2481, in render_template
[2018-06-20 15:35:34,848] {base_task_runner.py:98} INFO - Subtask: return jinja_env.get_template(content).render(**context)
[2018-06-20 15:35:34,849] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/environment.py", line 812, in get_template
[2018-06-20 15:35:34,849] {base_task_runner.py:98} INFO - Subtask: return self._load_template(name, self.make_globals(globals))
[2018-06-20 15:35:34,850] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/environment.py", line 774, in _load_template
[2018-06-20 15:35:34,851] {base_task_runner.py:98} INFO - Subtask: cache_key = self.loader.get_source(self, name)[1]
[2018-06-20 15:35:34,852] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/loaders.py", line 171, in get_source
[2018-06-20 15:35:34,854] {base_task_runner.py:98} INFO - Subtask: f = open_if_exists(filename)
[2018-06-20 15:35:34,855] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/utils.py", line 151, in open_if_exists
[2018-06-20 15:35:34,856] {base_task_runner.py:98} INFO - Subtask: return open(filename, mode)
[2018-06-20 15:35:34,856] {base_task_runner.py:98} INFO - Subtask: IOError: [Errno 20] Not a directory: '/home/airflow/gcs/dags/my_zip_file.zip/sql/query_file.sql'
气流当前(从1.10版本起)似乎不支持从压缩的dag加载模板,因为它只使用} )。在
jinja2.FileSystemLoader
来加载模板(参见^{首先,确认ZIP中的文件结构与预期一致。在
然后,尝试一下获取路径:
这就是我们在气流部署中获取查询路径的方法。在
使用文件的绝对路径更安全,如下所示
sql = os.path.abspath(os.path.join(os.path.dirname(__file__), "sql/query_file.sql"))
因为Airflow操作符/任务可能在新创建的临时目录下运行命令/方法,该目录不会复制依赖项。勾选一个Implementation example on Airflow Github,你会明白的。在
相关问题 更多 >
编程相关推荐