在每次特定的in之后训练ML模型时的内存使用问题

2024-04-16 20:00:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我试着每隔1小时运行一次ML训练脚本,但是每小时内存使用量增加大约20%,3-4小时后,内存使用量达到90%,然后这个脚本抛出内存错误。 我想知道为什么当列车功能完成时内存没有被释放。你知道吗

尽管如果我手动运行train函数(通过不使用任何类型的线程调度器和连续两次或三次调用train函数),则不会显示这种行为。你知道吗

在每个特定的时间间隔后训练模型的任何建议。你知道吗

这是密码。你知道吗

import pickle
import pandas as pd
from pymongo import MongoClient
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def train():

    client = MongoClient(databaseURI)
    db = client['mydb']

    movie_data = []
    for index, obj in enumerate(db.movies.find({})):
        movie_obj = {}

        movie_obj['_id'] = obj['_id']
        movie_obj['title'] = obj['title']
        movie_obj['rating'] = obj['rating']
        movie_data.append(movie_obj)


    user_data = []
    for index, obj in enumerate(db.users.find({})):
        user_obj = {}

        user_obj['_id'] = obj['_id']
        user_obj['username'] = obj['username']
        user_obj['movie_id'] = obj['movie_id']
        user_obj['rating'] = obj['rating']
        user_data.append(user_obj)


    movie_data_df = pd.DataFrame(movie_data)
    user_data_df = pd.DataFrame(user_data)

    # some ML training ALGO
    trainedModel = algo.train(user_data_df, movie_data_df)

    trained.to_pickle('files/trained.pkl')


scheduler = BlockingScheduler()
scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())
scheduler.start()

Tags: 内存importidobjdfdbdatadatetime
1条回答
网友
1楼 · 发布于 2024-04-16 20:00:20

Job stores house the scheduled jobs. The default job store simply keeps the jobs in memory, but others store them in various kinds of databases. A job’s data is serialized when it is saved to a persistent job store, and deserialized when it’s loaded back from it. Job stores (other than the default one) don’t keep the job data in memory, but act as middlemen for saving, loading, updating and searching jobs in the backend.

我建议尝试以下解决方案之一:

  1. jobstore从默认位置(即内存)更改为某个持久位置(Example)。

  2. 或者尝试将参数replace_existing设置为True,默认值为False)。你知道吗

    scheduler.add_job(train, 'interval', hours=1, 
                      next_run_time=datetime.datetime.now(), replace_existing=True)
    

旁注:

我想还有另一种方法可以解决这个问题(我还没试过!),这样您就可以添加一个Listener来侦听崩溃并重新启动整个过程!(如果你能尝试一下,用一种更具python风格的方式来修改它!)你知道吗

scheduler = BlockingScheduler()
scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())

def my_listener(event):
    if event.exception:       
        global scheduler
        scheduler.shutdown()
        gc.collect()
        scheduler = BlockingScheduler()
        scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())
        scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        scheduler.start()

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()

相关问题 更多 >