SQLAlchemy:修改查询对象的from子句

1 投票
1 回答
1043 浏览
提问于 2025-04-28 03:53

我有几个类,它们都有相同的抽象基类和相同的结构,都是在数据库中指向类似的表。我的查询非常简单,没有复杂的连接,只是一些简单的过滤条件。我使用了多态身份,这样可以让类之间的联合操作变得很顺畅。

问题是,有时候我需要对多个表重复相同的查询,并进行联合。我在SQLAlchemy中找不到解决这个问题的方法,所以我想在我自定义的BaseQuery类中实现一个方法,这样我就可以自动完成这些操作,通过克隆原始查询并更改用于from子句的类或映射器。

比如说,今天我需要做这样的事情:

query1 = MyModel1.query.filter_by(foo=bar)
query2 = MyModel2.query.filter_by(foo=bar)
query3 = MyModel3.query.filter_by(foo=bar)

query = query1.union(query2).union(query3)

我希望能够做到类似这样的事情:

query = MyModel1.query.filter_by(foo=bar).with_unions(MyModel2, MyModel3)

with_unions会像这样,其中replace_from_clause是我想要的方法:

def with_unions(self, *others):
    query = self._clone()

    for other in others:
        query = query.union(replace_from_clause(query, other))

    return query

在SQLAlchemy中有没有类似replace_from_clause的方法,或者有什么方法可以实现它?

当然,如果有更好的方法,我非常乐意听取建议。

暂无标签

1 个回答

0

根据我所知道的/我的经验/根据这个StackOverflow的回答:https://stackoverflow.com/a/10612690/3329834,你不能这样用ORM进行联合查询。

我设法实现了你想要的语法(或多或少),并在返回时将所有内容加载回ORM。关于联合查询的常规注意事项(比如列数要相同等)依然适用,还有更多要求(需要使用相同的列名进行过滤)。此外,我觉得在实际应用中我可能永远不会使用这个……

from functools import partial
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import *
from sqlalchemy import orm
from sqlalchemy import sql

engine = sqlalchemy.create_engine('sqlite://')
connection = engine.connect()


Base = declarative_base()


class Student(Base):
    __tablename__ = "students"
    id = Column(Integer, primary_key=True)
    name = Column(String(767), unique=True)
    caretaker = Column(String(50))

    def __repr__(self):
        return 'Student(name={s.name}, caretaker={s.caretaker}'.format(s=self)


class Patient(Base):
    __tablename__ = "patients"
    id = Column(Integer, primary_key=True)
    name = Column(String(767), unique=True)
    caretaker = Column(String(50))

    def __repr__(self):
        return 'Patient(name={s.name}, caretaker={s.caretaker}'.format(s=self)

class StagedOperation(object):

    def __init__(self, attr):
        self.attr = attr

    def __call__(self, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs


class StagedQuery(object):

    def __init__(self, model, session=None):
        self.session = session
        self.models = [model]
        self.columns = [e.name for e in model.__table__.columns]
        self.ops = []

    def __getattr__(self, attr):
        # __getattr__ fires only when an attribute is requested & not found
        # We will attempt to pass on any attribute call on to the resulting 
        # Query objects; do note this will only work, technically and logicaly, 
        # with method calls, not attribute access 
        if hasattr(orm.query.Query, attr):
            obj = StagedOperation(attr)
            self.ops.append(obj)

            # really getting hacky to enable "chaining"
            # Could also build this into the StagedOperation.__call__
            def _allow_chaining(desired_return, op, *args, **kwargs):
                op(*args, **kwargs)
                return desired_return

            return partial(_allow_chaining, self, obj)

    def with_unions(self, *models):
        self.models.extend(models)
        return self

    def with_session(self, session):
        self.session = session
        return self

    def query(self):
        q = None
        for model in self.models:
            id_col = sql.literal(model.__tablename__).label('tablename')
            columns = self.columns + [id_col]
            mq = orm.query.Query(columns).select_from(model)
            for op in self.ops:
                mq = getattr(mq, op.attr)(*op.args, **op.kwargs)
            q = q.union(mq) if q else mq
        return q

    def _deserialize_row(self, row):
        ref = {e.__tablename__: e for e in self.models}
        return ref[row.tablename](**{k: getattr(row, k) for k in self.columns})

    def one(self):
        return self._deserialize_row(
            self.query().with_session(self.session).one())

    def first(self):
        r = self.query().with_session(self.session).first()
        if r:
            return self._deserialize_row(r)

    def all(self):
        return [
            self._deserialize_row(e) for e in
            self.query().with_session(self.session).all()]


if __name__ == '__main__':
    engine = create_engine('sqlite://')
    Session = orm.sessionmaker()
    Session.configure(bind=engine)
    Base.metadata.bind = engine
    Base.metadata.create_all()

    session = Session()

    #
    # Insert some objects
    #

    stu = Student(id=1, name='John', caretaker='Mother')
    stu2 = Student(id=2, name='Sally', caretaker='Mother')
    stu3 = Student(id=3, name='Scott', caretaker='Father')

    pat = Patient(id=1, name='Susan', caretaker='Mother')
    pat2 = Patient(id=2, name='Sally', caretaker='Father')
    pat3 = Patient(id=3, name='Turnip', caretaker='Father')

    session.add_all([stu, stu2, stu3, pat, pat2, pat3])
    session.flush()

    # Some usage options
    print (
        StagedQuery(Student)
        .filter_by(caretaker='Mother')
        .with_unions(Patient)
        .with_session(session)
        .all())

    print (
        StagedQuery(Student, session=session)
        .filter_by(caretaker='Mother')
        .filter_by(name='Sally')
        .with_unions(Patient)
        .all())

打印结果……

[Student(name=John, caretaker=Mother, Patient(name=Susan, caretaker=Mother, Student(name=Sally, caretaker=Mother]
[Student(name=Sally, caretaker=Mother]

撰写回答