如何使用SQLalchemy连接三张表并保留其中一张表的所有列?

9 投票
3 回答
25636 浏览
提问于 2025-04-15 20:53

我有三个表:

这些是类的定义:

engine = create_engine('sqlite://test.db', echo=False)
SQLSession = sessionmaker(bind=engine)
Base = declarative_base()

class Channel(Base):
    __tablename__ = 'channel'

    id = Column(Integer, primary_key = True)
    title = Column(String)
    description = Column(String)
    link = Column(String)
    pubDate = Column(DateTime)

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key = True)
    username = Column(String)
    password = Column(String)
    sessionId = Column(String)

class Subscription(Base):
    __tablename__ = 'subscription'

    userId = Column(Integer, ForeignKey('user.id'), primary_key=True)
    channelId = Column(Integer, ForeignKey('channel.id'), primary_key=True)

注意:我知道user.username应该是唯一的,需要修复这个问题,而且我不太明白为什么SQLalchemy会创建一些带双引号的行名。

我想找个办法来获取所有频道,以及某个特定用户(通过user.sessionId和user.id来识别)订阅了哪些频道的信息。

举个例子,假设我们有四个频道:channel1、channel2、channel3、channel4;还有一个用户:user1;他订阅了channel1和channel4。对于user1的查询结果应该是这样的:

channel.id | channel.title | subscribed
---------------------------------------
1            channel1        True
2            channel2        False
3            channel3        False
4            channel4        True

这是一个理想的结果,但因为我完全不知道怎么实现“订阅”这一列,所以我试着在用户有订阅的行中获取用户的ID,而没有订阅的地方就留空。

我现在使用的数据库引擎是sqlite3,和SQLalchemy一起用。

我已经为这个问题绞尽脑汁两天了,我可以通过订阅表把三个表连接在一起,但这样一来,用户没有订阅的频道就会被省略掉。

希望我能清楚地描述我的问题,提前谢谢大家。

编辑:我用一种稍微笨拙的方法解决了这个问题,涉及到一个子查询:

# What a messy SQL query!
stmt = query(Subscription).filter_by(userId = uid()).join((User, Subscription.userId == User.id)).filter_by(sessionId = id()).subquery()
subs = aliased(Subscription, stmt)
results = query(Channel.id, Channel.title, subs.userId).outerjoin((subs, subs.channelId == Channel.id))

不过,我会继续寻找更优雅的解决方案,所以欢迎大家的回答。

3 个回答

0

不要从用户那里查询信息,而是从频道中查询。

user = query(User).filter_by(id=1).one()
for channel in query(Channel).all():
    print channel.id, channel.title, user in channel.subscriptions.user

这样你就能获取到所有的频道,而不仅仅是和特定用户有关的那些频道。

1

为了让这个更简单,我在你的模型中添加了关系,这样你只需要使用 user.subscriptions 就可以获取所有的订阅了。

engine = create_engine('sqlite://test.db', echo=False)
SQLSession = sessionmaker(bind=engine)
Base = declarative_base()

class Channel(Base):
    __tablename__ = 'channel'

    id = Column(Integer, primary_key = True)
    title = Column(String)
    description = Column(String)
    link = Column(String)
    pubDate = Column(DateTime)

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key = True)
    username = Column(String)
    password = Column(String)
    sessionId = Column(String)

class Subscription(Base):
    __tablename__ = 'subscription'

    userId = Column(Integer, ForeignKey('user.id'), primary_key=True)
    user = relationship(User, primaryjoin=userId == User.id, backref='subscriptions')
    channelId = Column(Integer, ForeignKey('channel.id'), primary_key=True)
    channel = relationship(channel, primaryjoin=channelId == channel.id, backref='subscriptions')

results = session.query(
    Channel.id,
    Channel.title,
    Channel.subscriptions.any().label('subscribed'),
)

for channel in results:
    print channel.id, channel.title, channel.subscribed
14

选项一:

Subscription其实就是一个多对多的关系对象,我建议你把它建模成这样,而不是单独作为一个类。你可以查看SQLAlchemy/declarative的多对多关系配置文档。

用测试代码建模后变成:

from sqlalchemy import create_engine, Column, Integer, DateTime, String, ForeignKey, Table
from sqlalchemy.orm import relation, scoped_session, sessionmaker, eagerload
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine('sqlite:///:memory:', echo=True)
session = scoped_session(sessionmaker(bind=engine, autoflush=True))
Base = declarative_base()

t_subscription = Table('subscription', Base.metadata,
    Column('userId', Integer, ForeignKey('user.id')),
    Column('channelId', Integer, ForeignKey('channel.id')),
)

class Channel(Base):
    __tablename__ = 'channel'

    id = Column(Integer, primary_key = True)
    title = Column(String)
    description = Column(String)
    link = Column(String)
    pubDate = Column(DateTime)

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key = True)
    username = Column(String)
    password = Column(String)
    sessionId = Column(String)

    channels = relation("Channel", secondary=t_subscription)

# NOTE: no need for this class
# class Subscription(Base):
    # ...

Base.metadata.create_all(engine)


# ######################
# Add test data
c1 = Channel()
c1.title = 'channel-1'
c2 = Channel()
c2.title = 'channel-2'
c3 = Channel()
c3.title = 'channel-3'
c4 = Channel()
c4.title = 'channel-4'
session.add(c1)
session.add(c2)
session.add(c3)
session.add(c4)
u1 = User()
u1.username ='user1'
session.add(u1)
u1.channels.append(c1)
u1.channels.append(c3)
u2 = User()
u2.username ='user2'
session.add(u2)
u2.channels.append(c2)
session.commit()


# ######################
# clean the session and test the code
session.expunge_all()

# retrieve all (I assume those are not that many)
channels = session.query(Channel).all()

# get subscription info for the user
#q = session.query(User)
# use eagerload(...) so that all 'subscription' table data is loaded with the user itself, and not as a separate query
q = session.query(User).options(eagerload(User.channels))
for u in q.all():
    for c in channels:
        print (c.id, c.title, (c in u.channels))

这段代码会产生以下输出:

(1, u'channel-1', True)
(2, u'channel-2', False)
(3, u'channel-3', True)
(4, u'channel-4', False)
(1, u'channel-1', False)
(2, u'channel-2', True)
(3, u'channel-3', False)
(4, u'channel-4', False)

请注意使用了eagerload,这样在请求channels时,只会发出1个SELECT语句,而不是每个User都发一个。

选项二:

但是如果你想保持你的模型,只是创建一个SA查询来获取你需要的列,下面的查询应该可以满足你的需求:

from sqlalchemy import and_
from sqlalchemy.sql.expression import case
#...
q = (session.query(#User.username, 
                   Channel.id, Channel.title, 
                   case([(Subscription.channelId == None, False)], else_=True)
                  ).outerjoin((Subscription, 
                                and_(Subscription.userId==User.id, 
                                     Subscription.channelId==Channel.id))
                             )
    )
# optionally filter by user
q = q.filter(User.id == uid()) # assuming uid() is the function that provides user.id
q = q.filter(User.sessionId == id()) # assuming uid() is the function that provides user.sessionId
res = q.all()
for r in res:
    print r

输出和上面选项一的结果完全一样。

撰写回答