希望你做得很好 我正在努力解决气流的小问题,这就是我想要做的
正如你所看到的,仅仅在列上应用一些转换并没有什么特别之处
def process_file_1(**context):
df = context.get("ti").xcom_pull(key="df")
df["type"] = df['type'].apply(lambda x: 'ok')
context['ti'].xcom_push(key='df', value=df)
return df
def process_file_2(**context):
df = context.get("ti").xcom_pull(key="df")
df["director"] = df['director'].apply(lambda x: 'ok')
context['ti'].xcom_push(key='df', value=df)
return df
def process_file_3(**context):
df = context.get("ti").xcom_pull(key="df")
df["title"] = df['title'].apply(lambda x: 'ok')
context['ti'].xcom_push(key='df', value=df)
return df
以下是整个DAG代码:
try:
import os
import sys
from datetime import timedelta,datetime
from airflow import DAG
# Operators
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
# ===============================================
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=1),
'email': ['shahsoumil519@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False)
# ================================================
def read_file(**context):
path = os.path.join(os.getcwd(), "dags/common/netflix_titles.csv")
df = pd.read_csv(path)
context['ti'].xcom_push(key='df', value=df)
def process_file_1(**context):
df = context.get("ti").xcom_pull(key="df")
df["type"] = df['type'].apply(lambda x: 'ok')
context['ti'].xcom_push(key='df', value=df)
return df
def process_file_2(**context):
df = context.get("ti").xcom_pull(key="df")
df["director"] = df['director'].apply(lambda x: 'ok')
context['ti'].xcom_push(key='df', value=df)
return df
def process_file_3(**context):
df = context.get("ti").xcom_pull(key="df")
df["title"] = df['title'].apply(lambda x: 'ok')
context['ti'].xcom_push(key='df', value=df)
return df
def complete_task(**context):
df = context.get("ti").xcom_pull(key="df")
path = os.path.join(os.getcwd(), "dags/common/process.csv")
df.to_csv(path)
with DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False) as dag:
read_file = PythonOperator(task_id="read_file",python_callable=read_file,provide_context=True,)
process_file_1 = PythonOperator(task_id="process_file_1",python_callable=process_file_1,provide_context=True,)
process_file_2 = PythonOperator(task_id="process_file_2",python_callable=process_file_2,provide_context=True,)
process_file_3 = PythonOperator(task_id="process_file_3",python_callable=process_file_3,provide_context=True,)
complete_task = PythonOperator(task_id="complete_task",python_callable=complete_task,provide_context=True,)
read_file >> process_file_1
read_file >> process_file_2
read_file >> process_file_3
process_file_1 >> complete_task
process_file_2 >> complete_task
process_file_3 >> complete_task
基本上,不能使用相同的对象引用并行化操作。而是将数据并行化
相关问题 更多 >
编程相关推荐