我正在用一个Python操作员创建一个简单的DAG。尝试打开文件“sample.csv”时出错 (错误:没有这样的文件或目录:“sample.csv”)
from airflow.operators.python_operator import PythonOperator
import datetime
import airflow
import csv, sqlite3, smtplib,os
default_args = {
"depends_on_past" : False,
"start_date" : airflow.utils.dates.days_ago( 1 ),
"retries" : 1,
"retry_delay" : datetime.timedelta( hours= 5 ),
"email" : "email@gmail.com",
"max_active_runs" : 2,
"concurrency" : 2,
}
def python_callable_new():
print("Attempting sqlite connection 3...")
con = sqlite3.connect('test.db')
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS LOCATIONS (city TEXT, state TEXT, country TEXT, postcode TEXT);")
# ERROR occurs here when trying to open the csv file
with open('sample.csv',encoding='utf8') as fin:
dr = csv.DictReader(fin)
to_db = [(i['city'], i['state'], i['country'], i['postcode']) for i in dr]
cur.executemany("INSERT INTO LOCATIONS (city, state, country, postcode) VALUES (?,?,?,?);", to_db)
con.commit()
con.close()
return 'End of callable. '
with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:
do_python_task = PythonOperator(
task_id = 'do-py-operation',
python_callable= python_callable_new,
)
要启动气流环境,我运行以下docker命令
docker run -d -p 8080:8080 -v C:\Users\chris\VisualStudioCodeWorkspace\airflow\dag:/usr/local/airflow/dags puckel/docker-airflow webserver
基本上,我只想访问python可调用文件中csv文件的内容。我应该怎么做,因为我目前的方法不起作用
目前没有回答
相关问题 更多 >
编程相关推荐