SQLAlchemy - WHERE子句中的子查询
我最近刚开始使用SQLAlchemy,还是对一些概念有点搞不清楚。
简单来说,我有两个表格,像这样(这是通过Flask-SQLAlchemy实现的):
class User(db.Model):
__tablename__ = 'users'
user_id = db.Column(db.Integer, primary_key=True)
class Posts(db.Model):
__tablename__ = 'posts'
post_id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.user_id'))
post_time = db.Column(db.DateTime)
user = db.relationship('User', backref='posts')
我想查询用户和他们最新的帖子(不包括没有帖子的用户),该怎么做呢?如果我用SQL的话,我会这样写:
SELECT [whatever]
FROM posts AS p
LEFT JOIN users AS u ON u.user_id = p.user_id
WHERE p.post_time = (SELECT MAX(post_time) FROM posts WHERE user_id = u.user_id)
所以我知道我想要的SQL语句是什么,但不知道怎么在SQLAlchemy中“正确”地表达出来。
补充一下,如果这很重要,我用的是SQLAlchemy 0.6.6。
3 个回答
3
通常,这种情况的表达方式和实际的SQL很相似——你会创建一个子查询,返回一个单一的结果,然后用这个结果进行比较。不过,有时候如果你在子查询中需要使用一个你已经在查询或连接的表,那就会变得非常麻烦。
解决这个问题的方法是创建一个别名版本的模型,以便在子查询中引用。
假设你已经在一个连接中,有一个现成的Posts
model
和一些基本的query
,现在你想查询每个用户最新的(单个)帖子列表,你可以这样过滤查询:
from sqlalchemy.orm import aliased
posts2 = aliased(Posts) # create aliased version
query = query.filter(
model.post_id
==
Posts.query # create query directly from model, NOT from the aliased version!
.with_entities(posts2.post_id) # only select column "post_id"
.filter(
posts2.user_id == model.user_id
)
.order_by(posts2.post_id.desc()) # assume higher id == newer post
.limit(1) # we must limit to a single row so we only get 1 value
)
我故意没有使用func.max
,因为我觉得那是一个更简单的版本,而且在其他答案中已经提到过了。我认为这个例子对那些因为寻找如何对子查询同一个表而找到这个问题的人会很有帮助。
68
之前的回答是有效的,但你问的那个具体的SQL语句也可以写成实际的语句:
print s.query(User, Posts).\
outerjoin(Posts.user).\
filter(Posts.post_time==\
s.query(
func.max(Posts.post_time)
).
filter(Posts.user_id==User.user_id).
correlate(User).
as_scalar()
)
我想“概念”这个东西可能不是很明显,就是现在需要用到as_scalar()来把一个子查询当作“标量”来处理(它应该可以根据上下文自动判断这个)。
补充:确认了,这确实是个bug,已经完成了第2190号工单。在当前的版本或者0.7.2版本中,as_scalar()会自动调用,所以上面的查询可以简化为:
print s.query(User, Posts).\
outerjoin(Posts.user).\
filter(Posts.post_time==\
s.query(
func.max(Posts.post_time)
).
filter(Posts.user_id==User.user_id).
correlate(User)
)
80
这个应该可以正常工作(虽然SQL语句不同,但结果是一样的):
t = Session.query(
Posts.user_id,
func.max(Posts.post_time).label('max_post_time'),
).group_by(Posts.user_id).subquery('t')
query = Session.query(User, Posts).filter(and_(
User.user_id == Posts.user_id,
User.user_id == t.c.user_id,
Posts.post_time == t.c.max_post_time,
))
for user, post in query:
print user.user_id, post.post_id
这里的c代表“列”