从API获取大量数据导致运行时间变长

0 投票
1 回答
34 浏览
提问于 2025-04-14 15:55

我有一个函数,它从一个API获取数据,但是这个API返回的数据有大约505,000行,这导致运行时间变得很长。

这是我的函数,有没有更好的方法来优化它,因为它让我的Airflow任务运行变得很慢。

def extract_people():
    all_data = []
    offset = 500

    # Continue fetching data until there are no more results left
    while True:
        url = f"https://api.apilayer.com/unogs/search/people?person_type=Director"
        payload = {}
        headers = {"apikey": '******'}

        response = requests.request("GET", url, headers=headers, data=payload)
        data = response.json()

        # Check if there are no more results left
        if not data["results"]:
            break

        # Extract relevant fields from each result and append to all_data
        for result in data["results"]:
            all_data.append({
                "netflix_id": result["netflix_id"],
                "full_name": result["full_name"],
                "person_type": result["person_type"],
                "title": result["title"]
            })
        
        # Increment the offset for the next request
        offset += limit
        print(f'loaded {offset} data')

    # Create a DataFrame from all_data
    df = pd.DataFrame(all_data)

    # Save the DataFrame as a new CSV file with timestamp
    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    csv_file_name = f'Netflix_people_{timestamp}.csv'
    df.to_csv(csv_file_name, index=False)
    print(f"CSV file {csv_file_name} saved successfully.")

    return df

这个函数运行起来很耗时间,我不知道怎么才能更好地优化代码。

1 个回答

0

我对你的代码做了一些改进,但因为没有API,所以无法进行测试,可能会有错误。请仅将其作为参考。

方法:

  1. 分页处理

  2. 并发请求

  3. 节流

代码:

import requests
import pandas as pd
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_data(offset, limit):
    url = f"https://api.apilayer.com/unogs/search/people?person_type=Director&offset={offset}&limit={limit}"
    headers = {"apikey": '******'}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()['results']
    else:
        print(f"Error fetching data: {response.status_code}")
        return []

def extract_people():
    all_data = []
    offset = 0
    limit = 500
    threads = []

    with ThreadPoolExecutor(max_workers=10) as executor:
        while True:
            threads.append(executor.submit(fetch_data, offset, limit))
            offset += limit
            if len(threads) == 10:
                break
        
        for task in as_completed(threads):
            results = task.result()
            if not results:
                break

            for result in results:
                all_data.append({
                    "netflix_id": result["netflix_id"],
                    "full_name": result["full_name"],
                    "person_type": result["person_type"],
                    "title": result["title"]
                })

    df = pd.DataFrame(all_data)
    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    csv_file_name = f'Netflix_people_{timestamp}.csv'
    df.to_csv(csv_file_name, index=False)
    print(f"CSV file {csv_file_name} saved successfully.")

    return df

撰写回答