通过PickleType内容筛选SQLAlchemy查询

3 投票
1 回答
4931 浏览
提问于 2025-04-18 04:43

我有一个名为 Meter 的对象模型。

class Meter(db.Model):
    """
    Model for repair meter.

    """
    __tablename__ = 'meters'

    id = db.Column(db.Integer, primary_key=True, nullable=False)
    meter_no = db.Column(db.String(50), unique=True, nullable=False)
    client = db.Column(db.String(50), unique=True, nullable=False)
    problems = db.Column(db.PickleType)
    functional = db.Column(db.Boolean(), default=False, server_default="false")
    location = db.Column(db.Integer, db.ForeignKey('locations.id'))
    date_in = db.Column(db.Date, default=dt.date.today())
    date_out = db.Column(db.Date)

其中,PickleType 列存储了一个 Python 列表,这个列表里包含了 Problem 对象的 ID。

然后我有一个函数,它试图查询 meters 数据库表,只获取那些在 PickleType 列表中包含特定问题 ID(prob_id)的 Meter 对象。

def problemrate(prob_id, month):
    """
    Return meters with the problem id for a particular month.

    args:
        prob_id (int): problem id
        month (int): month number e.g. april -> 4

    returns:
        meters (list): list of meter objects

    raises: None

    """
    meters = Meter.query.filter(and_(extract('month', Meter.date_in) == month,
                    prob_id in Meter.problems)).all()

    return meters

但是当我调用这个函数时,出现了一个错误:

NotImplementedError: Operator 'getitem' is not supported on this expression

怎么才能正确地根据 PickleType 列的内容来过滤 sqlalchemy 查询?这可能吗,还是 sqlalchemy 不支持这个功能?

1 个回答

5

简单来说:PickleType不支持任何与数据库相关的功能,比如查询或过滤。它的主要作用是存储和取出数据。要使用关系功能,应该用sqlalchemy.orm.relationship。

详细解释:错误信息其实是对的。在过滤函数中,所有内容都会被转化成一个SQL查询(可以打印出来看看),所以'in'操作符不能转化成有效的查询,其他类似的操作符也不行。可以用'=='来比较,但你需要传入完全相同的对象,这样才能转化成和存储的PickleType对象一模一样的对象。这个比较是在内部完成的。

解决方案:在普通的SQL中,你会把每个列表项单独存储在不同的表中,并通过ID关联。在SQLAlchemy中,你可以做类似的事情:

from sqlalchemy.orm import relationship
from sqlalchemy.ext.associationproxy import association_proxy

class Meter(db.Model):
    """
    Model for repair meter.

    """
    __tablename__ = 'meters'

    id = db.Column(db.Integer, primary_key=True, nullable=False)
    meter_no = db.Column(db.String(50), unique=True, nullable=False)
    client = db.Column(db.String(50), unique=True, nullable=False)
    meter_problems = relationship('Problem', secondary=lambda: meterproblems_table)
    functional = db.Column(db.Boolean(), default=False, server_default="false")
    location = db.Column(db.Integer, db.ForeignKey('locations.id'))
    date_in = db.Column(db.Date, default=dt.date.today())
    date_out = db.Column(db.Date)

    problems = association_proxy('meter_problems', 'problem')

class Problem(db.Model):
    __tablename__ = 'problem'
    id = db.Column(db.Integer, primary_key=True, nullable=False)
    problem = db.Column('keyword', db.String(64))

    def __init__(self, problem):
        self.problem = problem

meterproblems_table = db.Table(
    'meterproblems',
    db.metadata,
    db.Column(
        'meter_id',
        db.Integer,
        db.ForeignKey("meters.id"),
        primary_key=True
    ),
    db.Column(
        'problem_id',
        db.Integer,
        db.ForeignKey("problem.id"),
        primary_key=True
    )
)

这样你的查询就变成:

meters = Meter.query.filter(
    extract('month', Meter.date_in) == month,
    Meter.meter_problems.has(keyword=prob_id)
).all()

assocation_proxy用于方便地将项目添加到meter_problems列中:

obj.problems.append('something')

其中obj是一个meter对象。

文档链接 在这里

撰写回答