使用Pandas和PyMongo将MongoDB数据加载到DataFrame的更好方法?
我有一个大小为0.7GB的MongoDB数据库,里面存储着推文,我想把它加载到一个数据框中。但是,我遇到了一个错误。
MemoryError:
我的代码是这样的:
cursor = tweets.find() #Where tweets is my collection
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)
我尝试过一些答案中的方法,这些方法在加载数据之前,会先创建一个包含数据库中所有元素的列表。
不过,在另一个答案中提到的list()方法,作者说这个方法适合小数据集,因为它会把所有数据都加载到内存里。
在我的情况下,我觉得这就是错误的原因。要加载的数据太多,内存装不下。那我可以用什么其他方法呢?
4 个回答
2
一个优雅的做法如下:
import pandas as pd
def my_transform_logic(x):
if x :
do_something
return result
def process(cursor):
df = pd.DataFrame(list(cursor))
df['result_col'] = df['col_to_be_processed'].apply(lambda value: my_transform_logic(value))
#making list off dictionaries
db.collection_name.insert_many(final_df.to_dict('records'))
# or update
db.collection_name.update_many(final_df.to_dict('records'),upsert=True)
#make a list of cursors.. you can read the parallel_scan api of pymongo
cursors = mongo_collection.parallel_scan(6)
for cursor in cursors:
process(cursor)
我在一个有260万条记录的mongoDB集合上尝试了上述过程,使用了Joblib来运行上面的代码。我的代码没有出现任何内存错误,处理在2小时内完成。
3
from_records
这个类方法可能是实现这个功能的最佳方式:
from pandas import pd
import pymongo
client = pymongo.MongoClient()
data = db.mydb.mycollection.find() # or db.mydb.mycollection.aggregate(pipeline)
df = pd.DataFrame.from_records(data)
11
我把我的代码改成了这样:
cursor = tweets.find(fields=['id'])
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)
通过在find()函数中添加fields参数,我限制了输出的内容。这意味着我不是把所有的字段都加载进来,而只是加载我选定的字段到数据框(DataFrame)中。现在一切都正常了。