pyramid:多线程数据库操作
我的应用程序会接收用户输入的一个或多个网址(通常是3到4个网址),然后从这些网址中提取一些数据,并把这些数据写入数据库。不过,因为提取数据需要一点时间,所以我在考虑把每个网址的数据提取放在一个单独的线程里,这样提取数据和写入数据库的过程就可以在后台继续进行,用户就不需要一直等待了。
为了实现这个功能,我做了以下相关的部分:
@view_config(route_name="add_movie", renderer="templates/add_movie.jinja2")
def add_movie(request):
post_data = request.POST
if "movies" in post_data:
movies = post_data["movies"].split(os.linesep)
for movie_id in movies:
movie_thread = Thread(target=store_movie_details, args=(movie_id,))
movie_thread.start()
return {}
def store_movie_details(movie_id):
movie_details = scrape_data(movie_id)
new_movie = Movie(**movie_details) # Movie is my model.
print new_movie # Works fine.
print DBSession.add(movies(**movie_details)) # Returns None.
虽然 new_movie
这一行确实能打印出正确提取的数据,但 DBSession.add()
却不起作用。实际上,它只是返回了 None
。
如果我去掉线程,直接调用 store_movie_details()
方法,那就能正常工作。
这是怎么回事呢?
2 个回答
2
首先,SA文档中关于 Session.add() 方法的说明没有提到它的返回值,所以我猜这个方法应该是返回 None
。
其次,我觉得你是想把 new_movie
加入到会话中,而不是 movies(**movie_details)
,不管那是什么 :)
第三,标准的Pyramid会话(也就是用ZopeTransactionExtension配置的那个)是和Pyramid的请求-响应周期绑定在一起的,这可能会在你的情况下产生意想不到的行为。你需要配置一个单独的会话,并在 store_movie_details
中手动提交这个会话。这个会话需要使用 scoped_session,这样会话对象就是线程本地的,不会在不同的线程之间共享。
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
session_factory = sessionmaker(bind=some_engine)
AsyncSession = scoped_session(session_factory)
def store_movie_details(movie_id):
session = AsyncSession()
movie_details = scrape_data(movie_id)
new_movie = Movie(**movie_details) # Movie is my model.
session.add(new_movie)
session.commit()
当然,这种方法只适合非常轻量的任务,如果你不介意偶尔丢失一个任务(比如当web服务器重启时)。对于更严肃的任务,可以看看Celery等工具,正如Antoine Leclair所建议的。