有没有办法在Airflow中并行处理我的pandas数据框函数?

0 投票
1 回答
34 浏览
提问于 2025-04-12 15:15

我现在在做一个学术项目,需要用到airflow。现在我只在处理一个数据库表,所以我需要一些帮助,想知道我是不是在正确的方向上。

我有一个叫做data_transformation.py的.py文件:

def get_df_from_db():
    conn = pymysql.connect(host='HOST',
                                 user='USER',
                                 password='PW',
                                 database='DB')
    query = "SELECT * FROM table"
    df = pd.read_sql(query, conn)
    df.reset_index(inplace=True)
    df.drop('index', axis=1, inplace=True)
    connection.close()
    return df

def clean_colorTypes(df):
    df['colorType'] = df['colorType'].apply(lambda x: x if x in ['Red', 'Blue', 'Orange'] else 'Others')
    df = df[df['numOfButtons'] <= 2]
    return df

def add_yearsSinceManufactured(df):
    df['yearsSinceManu'] = df['yearListed'] - df['yearManufactured']
    return df

def add_to_db(df):
    #create connection to mysql using create_engine
    df.to_sql(name = 'cleaned', con = conn, if_exists = 'append', index = False)

    return df

我对我的airflow dag文件的想法是:

  1. 执行get_df_from_db
  2. 同时执行clean_colorTypes和add_yearsSinceManufactured,因为它们之间没有依赖关系
  3. 一旦第二步完成,就执行add_to_db

不过,我不太确定这种方法是否可行,因为我在每个函数中都传入了df。

1 个回答

0

我会按照已经提到的方法来做,具体有两种选择:

  • 让数据库来处理所有复杂的工作,每个查询作为一个单独的任务,中间可以用临时表来过渡,或者;

  • 如果你坚持的话,可以使用Pandas,每次处理完的数据保存为一个CSV文件。

Airflow的数据库(XCOM)并不适合存放大量数据,它只是一个用来在任务之间传递数据的功能。

如果选择数据库的方式,实际上还是在做一个ETL流程:

  1. 你先提取需要的数据
  2. 把数据加载到你的数据库里
  3. 然后按照你想要的方式进行转换

撰写回答