长时间运行过程中的SQLAlchemy会话管理

4 投票
2 回答
4794 浏览
提问于 2025-04-15 14:19

场景:

  • 一个基于.NET的应用服务器(Wonderware IAS/System Platform)托管着一些自动化对象,这些对象可以和工厂里的各种设备进行沟通。
  • CPython被嵌入到这个应用服务器中(使用Python for .NET)。
  • 这些自动化对象内置了脚本功能(使用一种自定义的基于.NET的语言)。这些脚本可以调用Python函数。

这些Python函数是用来跟踪工厂车间的在制品的。这个系统的目的是跟踪生产的产品在整个过程中是否按照正确的顺序进行,并检查在这个过程中是否满足某些条件。产品的生产历史和状态存储在一个关系型数据库中,这就是SQLAlchemy发挥作用的地方。

举个例子,当一个产品经过扫描仪时,自动化软件会触发以下脚本(用应用服务器的自定义脚本语言编写):

' wiget_id and scanner_id provided by automation object
' ExecFunction() takes care of calling a CPython function
retval = ExecFunction("WidgetScanned", widget_id, scanner_id);
' if the python function raises an Exception, ErrorOccured will be true
' in this case, any errors should cause the production line to stop.
if (retval.ErrorOccured) then
    ProductionLine.Running = False;
    InformationBoard.DisplayText = "ERROR: " + retval.Exception.Message;
    InformationBoard.SoundAlarm = True
end if;

这个脚本会调用WidgetScanned的Python函数:

# pywip/functions.py
from pywip.database import session
from pywip.model import Widget, WidgetHistoryItem
from pywip import validation, StatusMessage
from datetime import datetime

def WidgetScanned(widget_id, scanner_id):
    widget = session.query(Widget).get(widget_id)
    validation.validate_widget_passed_scanner(widget, scanner) # raises exception on error

    widget.history.append(WidgetHistoryItem(timestamp=datetime.now(), action=u"SCANNED", scanner_id=scanner_id))
    widget.last_scanner = scanner_id
    widget.last_update = datetime.now()

    return StatusMessage("OK")

# ... there are a dozen similar functions

我想问的是:在这种情况下,如何最好地管理SQLAlchemy的会话? 应用服务器是一个长时间运行的进程,通常在重启之间会运行几个月。这个应用服务器是单线程的。

目前,我是这样做的:

我给我提供给应用服务器的函数加上一个装饰器:

# pywip/iasfunctions.py
from pywip import functions

def ias_session_handling(func):
    def _ias_session_handling(*args, **kwargs):
        try:
            retval = func(*args, **kwargs)
            session.commit()
            return retval
        except:
            session.rollback()
            raise
    return _ias_session_handling

# ... actually I populate this module with decorated versions of all the functions in pywip.functions dynamically
WidgetScanned = ias_session_handling(functions.WidgetScanned)

问题:上面的装饰器适合在长时间运行的进程中处理会话吗? 我是否应该调用session.remove()?

SQLAlchemy的会话对象是一个作用域会话:

# pywip/database.py
from sqlalchemy.orm import scoped_session, sessionmaker

session = scoped_session(sessionmaker())

我想把会话管理与基本函数分开,原因有两个:

  1. 还有另一类函数,叫做序列函数。序列函数会调用多个基本函数。一个序列函数应该对应一个数据库事务。
  2. 我需要能够在其他环境中使用这个库。a) 从TurboGears网页应用中。在这种情况下,会话管理由TurboGears处理。b) 从IPython命令行中。在这种情况下,提交/回滚将是显式的。

(我真的很抱歉问题这么长。但我觉得有必要解释一下场景。也许并不是必要的?)

2 个回答

1

请让你的WonderWare管理员给你权限,使用WonderWare Historian。这样你就可以通过MSSQL调用,利用sqlalchemy轻松跟踪标签的值,并且可以定期查询这些数据。

另外一个选择是使用archestra工具包,监听内部标签的更新。你可以在银河中部署一个服务器作为平台,从中获取这些更新信息。

4

这个装饰器适合长时间运行的应用程序,但如果你不小心在请求之间共享了对象,就可能会遇到麻烦。为了让错误更早出现,并避免损坏任何东西,最好使用session.remove()来丢弃会话。

try:
    try:
        retval = func(*args, **kwargs)
        session.commit()
        return retval
    except:
        session.rollback()
        raise
finally:
    session.remove()

或者,如果你能使用with上下文管理器:

try:
    with session.registry().transaction:
        return func(*args, **kwargs)
finally:
    session.remove()

顺便提一下,你可能想在查询中使用.with_lockmode('update'),这样你的验证就不会在过期的数据上运行。

撰写回答