如何使用气流访问平面文件

2024-04-19 18:34:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在用一个Python操作员创建一个简单的DAG。尝试打开文件“sample.csv”时出错 (错误:没有这样的文件或目录:“sample.csv”)

from airflow.operators.python_operator   import PythonOperator
import datetime
import airflow
import csv, sqlite3, smtplib,os

default_args = {
    "depends_on_past" : False,
    "start_date"      : airflow.utils.dates.days_ago( 1 ),
    "retries"         : 1,
    "retry_delay"     : datetime.timedelta( hours= 5 ),
    "email"           : "email@gmail.com",
    "max_active_runs" : 2,
    "concurrency"     : 2,
}

def python_callable_new():
    print("Attempting sqlite connection 3...")
    con = sqlite3.connect('test.db') 
    cur = con.cursor()
    cur.execute("CREATE TABLE IF NOT EXISTS LOCATIONS (city TEXT, state TEXT, country TEXT, postcode TEXT);")
    
    # ERROR occurs here when trying to open the csv file
    with open('sample.csv',encoding='utf8') as fin: 
        dr = csv.DictReader(fin)
        to_db = [(i['city'], i['state'], i['country'], i['postcode']) for i in dr]

    cur.executemany("INSERT INTO LOCATIONS (city, state, country, postcode) VALUES (?,?,?,?);", to_db)
    con.commit()
    con.close()
    return 'End of callable. '

with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:


    do_python_task = PythonOperator(
        task_id = 'do-py-operation',
        python_callable= python_callable_new,
    )

要启动气流环境,我运行以下docker命令

docker run -d -p 8080:8080 -v C:\Users\chris\VisualStudioCodeWorkspace\airflow\dag:/usr/local/airflow/dags  puckel/docker-airflow webserver

基本上,我只想访问python可调用文件中csv文件的内容。我应该怎么做,因为我目前的方法不起作用


Tags: 文件csvsampletextimportdefaultcitydb