有没有办法在Airflow中并行处理我的pandas数据框函数?
我现在在做一个学术项目,需要用到airflow。现在我只在处理一个数据库表,所以我需要一些帮助,想知道我是不是在正确的方向上。
我有一个叫做data_transformation.py的.py文件:
def get_df_from_db():
conn = pymysql.connect(host='HOST',
user='USER',
password='PW',
database='DB')
query = "SELECT * FROM table"
df = pd.read_sql(query, conn)
df.reset_index(inplace=True)
df.drop('index', axis=1, inplace=True)
connection.close()
return df
def clean_colorTypes(df):
df['colorType'] = df['colorType'].apply(lambda x: x if x in ['Red', 'Blue', 'Orange'] else 'Others')
df = df[df['numOfButtons'] <= 2]
return df
def add_yearsSinceManufactured(df):
df['yearsSinceManu'] = df['yearListed'] - df['yearManufactured']
return df
def add_to_db(df):
#create connection to mysql using create_engine
df.to_sql(name = 'cleaned', con = conn, if_exists = 'append', index = False)
return df
我对我的airflow dag文件的想法是:
- 执行get_df_from_db
- 同时执行clean_colorTypes和add_yearsSinceManufactured,因为它们之间没有依赖关系
- 一旦第二步完成,就执行add_to_db
不过,我不太确定这种方法是否可行,因为我在每个函数中都传入了df。
1 个回答
0
我会按照已经提到的方法来做,具体有两种选择:
让数据库来处理所有复杂的工作,每个查询作为一个单独的任务,中间可以用临时表来过渡,或者;
如果你坚持的话,可以使用Pandas,每次处理完的数据保存为一个CSV文件。
Airflow的数据库(XCOM)并不适合存放大量数据,它只是一个用来在任务之间传递数据的功能。
如果选择数据库的方式,实际上还是在做一个ETL流程:
- 你先提取需要的数据
- 把数据加载到你的数据库里
- 然后按照你想要的方式进行转换