如何在SQLAlchemy中指定多级/多重连接的表关系?

13 投票
1 回答
6179 浏览
提问于 2025-04-17 21:08

我正在尝试定义两个表之间的关系,这种关系是间接的,也就是说,它们之间的联系是通过另外两个表来实现的。

我想要的结果可以通过以下查询来获取:

(db.session.query(Telnum)
           .filter(Account.customer==customer)
           .filter(Account.account_id == Subscription.account_id)
           .filter(Telnum.sub_id == Subscription.id)
           .order_by(Telnum.telnum)
           .all()
)

这里的 customer 是一个客户对象。

我在想如何将这个关系定义出来,类似于 Customer.invoices 这种关系。我想到的一个方法是这样的:

telnums = db.relationship('Telnum',
                       primaryjoin="and_(Account.user_id==Customer.id, "
                       "Account.account_id == Subscription.account_id, " 
                       "Telnum.sub_id == Subscription.id)", 
                       backref='customer')

但是,正如这个帖子所示,这种方法并不奏效。它产生的错误信息是: sqlalchemy.exc.ArgumentError: 找不到任何简单的等式表达式,涉及到本地映射的外键列,用于主连接条件 'accounts.user_id = customers.id AND accounts.account_id = subscriptions.account_id AND pstn_numbers.sub_id = subscriptions.id' 在关系 Customer.telnums 上。请确保引用的列与 ForeignKey 或 ForeignKeyConstraint 相关联,或者在连接条件中使用 foreign() 注解进行标注。要允许使用 '==' 以外的比较运算符,可以将关系标记为 viewonly=True。

有没有人能给我一些正确的方向提示?

我有以下的表结构(简化版,除了每个表的一个相关列外,其他不相关的列都已删除):

class Customer(db.Model):
    __tablename__ = 'customers'
    id = db.Column(db.Integer, primary_key=True)
    identification_num = db.Column(db.String(10), unique=True)
    name = db.Column(db.Text)
    invoices = db.relationship('Invoice', backref='customer')
    accounts = db.relationship('Account', backref='customer')

def __init__(self):
    pass

def __repr__(self):
    return '<Customer %r>' % (self.name)

class Invoice(db.Model):
    __tablename__ = 'invoices'
    id = db.Column(db.Integer, primary_key=True)
    customer_id = db.Column(db.Integer, db.ForeignKey('customers.id'))
    active = db.Column(db.Boolean)
    accounts = db.relationship('Account', backref='invoice')

    def __repr__(self):
        return '<Invoice %r>' % (self.id)

class Account(db.Model):
    __tablename__ = 'accounts'
    id = db.Column(db.Integer, primary_key=True)
    account_id = db.Column(db.Integer, unique=True)
    invoice_id = db.Column(db.Integer, db.ForeignKey('invoices.id'))
    user_id = db.Column(db.Integer, db.ForeignKey('customers.id'))
    active = db.Column(db.Boolean)
    subscriptions = db.relationship('Subscription', backref='account')

    def __repr__(self):
        return '<Account %r>' % (self.account_id)

class Subscription(db.Model):
    __tablename__ = 'subscriptions'
    id = db.Column(db.Integer, primary_key=True)
    account_id = db.Column(db.Integer, db.ForeignKey('accounts.account_id'))
    sub_active = db.Column(db.DateTime)
    telnums = db.relationship('Telnum', backref='subscription')

    def __repr__(self):
        return '<Subscription %r>' % (self.id)

class Telnum(db.Model):
    __tablename__ = 'pstn_numbers'
    id = db.Column(db.Integer, primary_key=True)
    sub_id = db.Column(db.Integer, db.ForeignKey('subscriptions.id'))
    telnum = db.Column(db.String(64))
    holder = db.Column(db.String(10))

    def __repr__(self):
        return '<Telnum %r>' % (self.telnum)

1 个回答

13

一般来说,我不会把一个间接关系定义为一个关系,因为在你进行修改时,这些间接关系可能会不同步。你可以通过为关系指定viewonly=False参数来解决一些限制。

一个更简单、更安全、也更直接的解决方案是使用查询(或者可以查询的属性),如果你想从数据库重新加载数据的话,并使用Python的列表推导式来获取关系树的子子节点:

class Customer(Base):
    # ...

    @property
    def telnums_qry(self):
        sess = Session.object_session(self)
        return (sess.query(Telnum)
                .join(Subscription)
                .join(Account)
                .filter(Account.user_id == self.id)
                ).all()


    @property
    def telnums_mem(self):
        return [tel
                for acc in self.accounts
                for sub in acc.subscriptions
                for tel in sub.telnums
                ]


class Telnum(Base):
    # ...

    @property
    def customer(self):
        return (self.subscription
                and self.subscription.account
                and self.subscription.account.customer
                )

撰写回答