pyramid:多线程数据库操作

1 投票
2 回答
988 浏览
提问于 2025-04-18 12:59

我的应用程序会接收用户输入的一个或多个网址(通常是3到4个网址),然后从这些网址中提取一些数据,并把这些数据写入数据库。不过,因为提取数据需要一点时间,所以我在考虑把每个网址的数据提取放在一个单独的线程里,这样提取数据和写入数据库的过程就可以在后台继续进行,用户就不需要一直等待了。

为了实现这个功能,我做了以下相关的部分:

@view_config(route_name="add_movie", renderer="templates/add_movie.jinja2")
def add_movie(request):
    post_data = request.POST

    if "movies" in post_data:
        movies = post_data["movies"].split(os.linesep)

        for movie_id in movies:        
            movie_thread = Thread(target=store_movie_details, args=(movie_id,))
            movie_thread.start()

    return {}

def store_movie_details(movie_id):

    movie_details = scrape_data(movie_id)
    new_movie = Movie(**movie_details) # Movie is my model.

    print new_movie  # Works fine.

    print DBSession.add(movies(**movie_details))  # Returns None.

虽然 new_movie 这一行确实能打印出正确提取的数据,但 DBSession.add() 却不起作用。实际上,它只是返回了 None

如果我去掉线程,直接调用 store_movie_details() 方法,那就能正常工作。

这是怎么回事呢?

2 个回答

0

当响应返回时,事务管理器会关闭事务。在其他线程中,当响应返回后,DBSession是没有事务的。而且,在不同的线程之间共享事务可能也不是个好主意。

这正是使用工作者的典型场景。可以看看 CeleryRQ

2

首先,SA文档中关于 Session.add() 方法的说明没有提到它的返回值,所以我猜这个方法应该是返回 None

其次,我觉得你是想把 new_movie 加入到会话中,而不是 movies(**movie_details),不管那是什么 :)

第三,标准的Pyramid会话(也就是用ZopeTransactionExtension配置的那个)是和Pyramid的请求-响应周期绑定在一起的,这可能会在你的情况下产生意想不到的行为。你需要配置一个单独的会话,并在 store_movie_details 中手动提交这个会话。这个会话需要使用 scoped_session,这样会话对象就是线程本地的,不会在不同的线程之间共享。

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

session_factory = sessionmaker(bind=some_engine)
AsyncSession = scoped_session(session_factory)

def store_movie_details(movie_id):

    session = AsyncSession()
    movie_details = scrape_data(movie_id)
    new_movie = Movie(**movie_details) # Movie is my model.

    session.add(new_movie)
    session.commit()

当然,这种方法只适合非常轻量的任务,如果你不介意偶尔丢失一个任务(比如当web服务器重启时)。对于更严肃的任务,可以看看Celery等工具,正如Antoine Leclair所建议的。

撰写回答