Pandas并行任务

2024-03-28 11:00:52 发布

您现在位置:Python中文网/ 问答频道 /正文

希望你做得很好 我正在努力解决气流的小问题,这就是我想要做的

  • 我正在读取一个文件,然后根据该文件对列应用转换,等待所有任务完成并创建一个新文件 下面是我的管道的外观 enter image description here

正如你所看到的,仅仅在列上应用一些转换并没有什么特别之处

def process_file_1(**context):
    df = context.get("ti").xcom_pull(key="df")
    df["type"]  = df['type'].apply(lambda x:  'ok')
    context['ti'].xcom_push(key='df', value=df)
    return df


def process_file_2(**context):
    df = context.get("ti").xcom_pull(key="df")
    df["director"]  = df['director'].apply(lambda x:  'ok')
    context['ti'].xcom_push(key='df', value=df)
    return df

def process_file_3(**context):
    df = context.get("ti").xcom_pull(key="df")
    df["title"]  = df['title'].apply(lambda x:  'ok')
    context['ti'].xcom_push(key='df', value=df)
    return df
  • 问题是,当流程在process中结束时,一切都很正常。csv仅对一列进行了更改

enter image description here

以下是整个DAG代码:


try:
    import os
    import sys

    from datetime import timedelta,datetime
    from airflow import DAG

    # Operators
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.utils.trigger_rule import TriggerRule

    import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


# ===============================================
default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
    'email': ['shahsoumil519@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}
dag  = DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False)
# ================================================


def read_file(**context):
    path = os.path.join(os.getcwd(), "dags/common/netflix_titles.csv")
    df = pd.read_csv(path)
    context['ti'].xcom_push(key='df', value=df)


def process_file_1(**context):
    df = context.get("ti").xcom_pull(key="df")
    df["type"]  = df['type'].apply(lambda x:  'ok')
    context['ti'].xcom_push(key='df', value=df)
    return df


def process_file_2(**context):
    df = context.get("ti").xcom_pull(key="df")
    df["director"]  = df['director'].apply(lambda x:  'ok')
    context['ti'].xcom_push(key='df', value=df)
    return df

def process_file_3(**context):
    df = context.get("ti").xcom_pull(key="df")
    df["title"]  = df['title'].apply(lambda x:  'ok')
    context['ti'].xcom_push(key='df', value=df)
    return df

def complete_task(**context):
    df = context.get("ti").xcom_pull(key="df")
    path = os.path.join(os.getcwd(), "dags/common/process.csv")
    df.to_csv(path)



with DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False) as dag:

    read_file = PythonOperator(task_id="read_file",python_callable=read_file,provide_context=True,)

    process_file_1 = PythonOperator(task_id="process_file_1",python_callable=process_file_1,provide_context=True,)
    process_file_2 = PythonOperator(task_id="process_file_2",python_callable=process_file_2,provide_context=True,)
    process_file_3 = PythonOperator(task_id="process_file_3",python_callable=process_file_3,provide_context=True,)

    complete_task = PythonOperator(task_id="complete_task",python_callable=complete_task,provide_context=True,)


read_file >> process_file_1
read_file >> process_file_2
read_file >> process_file_3

process_file_1 >> complete_task
process_file_2 >> complete_task
process_file_3 >> complete_task

Tags: keyimportdftaskreadgetdefcontext
1条回答
网友
1楼 · 发布于 2024-03-28 11:00:52

基本上,不能使用相同的对象引用并行化操作。而是将数据并行化

  • 将文件拆分为3
  • 用3个唯一的键将它们全部推入xcom
  • 在3个并行任务中拉取它们,对所有列执行转换,并使用相同的键推送输出
  • 在最后一步中,收回所有文件和concat以获得最终输出

相关问题 更多 >