通过PickleType内容筛选SQLAlchemy查询
我有一个名为 Meter 的对象模型。
class Meter(db.Model):
"""
Model for repair meter.
"""
__tablename__ = 'meters'
id = db.Column(db.Integer, primary_key=True, nullable=False)
meter_no = db.Column(db.String(50), unique=True, nullable=False)
client = db.Column(db.String(50), unique=True, nullable=False)
problems = db.Column(db.PickleType)
functional = db.Column(db.Boolean(), default=False, server_default="false")
location = db.Column(db.Integer, db.ForeignKey('locations.id'))
date_in = db.Column(db.Date, default=dt.date.today())
date_out = db.Column(db.Date)
其中,PickleType 列存储了一个 Python 列表,这个列表里包含了 Problem 对象的 ID。
然后我有一个函数,它试图查询 meters 数据库表,只获取那些在 PickleType 列表中包含特定问题 ID(prob_id)的 Meter 对象。
def problemrate(prob_id, month):
"""
Return meters with the problem id for a particular month.
args:
prob_id (int): problem id
month (int): month number e.g. april -> 4
returns:
meters (list): list of meter objects
raises: None
"""
meters = Meter.query.filter(and_(extract('month', Meter.date_in) == month,
prob_id in Meter.problems)).all()
return meters
但是当我调用这个函数时,出现了一个错误:
NotImplementedError: Operator 'getitem' is not supported on this expression
怎么才能正确地根据 PickleType 列的内容来过滤 sqlalchemy 查询?这可能吗,还是 sqlalchemy 不支持这个功能?
1 个回答
5
简单来说:PickleType不支持任何与数据库相关的功能,比如查询或过滤。它的主要作用是存储和取出数据。要使用关系功能,应该用sqlalchemy.orm.relationship。
详细解释:错误信息其实是对的。在过滤函数中,所有内容都会被转化成一个SQL查询(可以打印出来看看),所以'in'操作符不能转化成有效的查询,其他类似的操作符也不行。可以用'=='来比较,但你需要传入完全相同的对象,这样才能转化成和存储的PickleType对象一模一样的对象。这个比较是在内部完成的。
解决方案:在普通的SQL中,你会把每个列表项单独存储在不同的表中,并通过ID关联。在SQLAlchemy中,你可以做类似的事情:
from sqlalchemy.orm import relationship
from sqlalchemy.ext.associationproxy import association_proxy
class Meter(db.Model):
"""
Model for repair meter.
"""
__tablename__ = 'meters'
id = db.Column(db.Integer, primary_key=True, nullable=False)
meter_no = db.Column(db.String(50), unique=True, nullable=False)
client = db.Column(db.String(50), unique=True, nullable=False)
meter_problems = relationship('Problem', secondary=lambda: meterproblems_table)
functional = db.Column(db.Boolean(), default=False, server_default="false")
location = db.Column(db.Integer, db.ForeignKey('locations.id'))
date_in = db.Column(db.Date, default=dt.date.today())
date_out = db.Column(db.Date)
problems = association_proxy('meter_problems', 'problem')
class Problem(db.Model):
__tablename__ = 'problem'
id = db.Column(db.Integer, primary_key=True, nullable=False)
problem = db.Column('keyword', db.String(64))
def __init__(self, problem):
self.problem = problem
meterproblems_table = db.Table(
'meterproblems',
db.metadata,
db.Column(
'meter_id',
db.Integer,
db.ForeignKey("meters.id"),
primary_key=True
),
db.Column(
'problem_id',
db.Integer,
db.ForeignKey("problem.id"),
primary_key=True
)
)
这样你的查询就变成:
meters = Meter.query.filter(
extract('month', Meter.date_in) == month,
Meter.meter_problems.has(keyword=prob_id)
).all()
assocation_proxy用于方便地将项目添加到meter_problems列中:
obj.problems.append('something')
其中obj是一个meter对象。
文档链接 在这里