从API获取大量数据导致运行时间变长
我有一个函数,它从一个API获取数据,但是这个API返回的数据有大约505,000行,这导致运行时间变得很长。
这是我的函数,有没有更好的方法来优化它,因为它让我的Airflow任务运行变得很慢。
def extract_people():
all_data = []
offset = 500
# Continue fetching data until there are no more results left
while True:
url = f"https://api.apilayer.com/unogs/search/people?person_type=Director"
payload = {}
headers = {"apikey": '******'}
response = requests.request("GET", url, headers=headers, data=payload)
data = response.json()
# Check if there are no more results left
if not data["results"]:
break
# Extract relevant fields from each result and append to all_data
for result in data["results"]:
all_data.append({
"netflix_id": result["netflix_id"],
"full_name": result["full_name"],
"person_type": result["person_type"],
"title": result["title"]
})
# Increment the offset for the next request
offset += limit
print(f'loaded {offset} data')
# Create a DataFrame from all_data
df = pd.DataFrame(all_data)
# Save the DataFrame as a new CSV file with timestamp
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
csv_file_name = f'Netflix_people_{timestamp}.csv'
df.to_csv(csv_file_name, index=False)
print(f"CSV file {csv_file_name} saved successfully.")
return df
这个函数运行起来很耗时间,我不知道怎么才能更好地优化代码。
1 个回答
0
我对你的代码做了一些改进,但因为没有API,所以无法进行测试,可能会有错误。请仅将其作为参考。
方法:
分页处理
并发请求
节流
代码:
import requests
import pandas as pd
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_data(offset, limit):
url = f"https://api.apilayer.com/unogs/search/people?person_type=Director&offset={offset}&limit={limit}"
headers = {"apikey": '******'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()['results']
else:
print(f"Error fetching data: {response.status_code}")
return []
def extract_people():
all_data = []
offset = 0
limit = 500
threads = []
with ThreadPoolExecutor(max_workers=10) as executor:
while True:
threads.append(executor.submit(fetch_data, offset, limit))
offset += limit
if len(threads) == 10:
break
for task in as_completed(threads):
results = task.result()
if not results:
break
for result in results:
all_data.append({
"netflix_id": result["netflix_id"],
"full_name": result["full_name"],
"person_type": result["person_type"],
"title": result["title"]
})
df = pd.DataFrame(all_data)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
csv_file_name = f'Netflix_people_{timestamp}.csv'
df.to_csv(csv_file_name, index=False)
print(f"CSV file {csv_file_name} saved successfully.")
return df