在Pyramid中使用SQLAlchemy创建带有相关表的查询

2 投票
1 回答
651 浏览
提问于 2025-04-18 18:43

我在Pyramid中定义了几个表,像这样:

# coding: utf-8
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Integer, Float, DateTime, ForeignKey, ForeignKeyConstraint, String, Column
from sqlalchemy.orm import scoped_session, sessionmaker, relationship, backref,
from zope.sqlalchemy import ZopeTransactionExtension

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = declarative_base()


class Codes(Base):
    __tablename__ = 'Code'
    __table_args__ = {u'schema': 'Locations'}

    id = Column(Integer, nullable=False)
    code_str = Column(String(9), primary_key=True)
    name = Column(String(100))

    incoming = relationship(u'Voyages', primaryjoin='Voyage.call == Codes.code_str', backref=backref('Code'))


class Locations(Base):
    __tablename__ = 'Location'
    __table_args__ = {u'schema': 'Locations'}

    unit_id = Column(ForeignKey(u'Structure.Definition.unit_id', ondelete=u'RESTRICT', onupdate=u'CASCADE'), primary_key=True, nullable=False)
    timestamp = Column(DateTime, primary_key=True, nullable=False)
    longitude = Column(Float)
    latitude = Column(Float)


class Voyages(Base):
    __tablename__ = 'Voyage'
    __table_args__ = (ForeignKeyConstraint(['unit_id', 'Voyage_id'], [u'Locations.Voyages.unit_id', u'Locations.Voyages.voyage_id'], ondelete=u'RESTRICT', onupdate=u'CASCADE'), {u'schema': 'Locations'}
    )

    uid = Column(Integer, primary_key=True)
    unit_id = Column(Integer)
    voyage_id = Column(Integer)
    departure = Column(ForeignKey(u'Locations.Code.code_str', ondelete=u'RESTRICT', onupdate=u'CASCADE'))
    call = Column(ForeignKey(u'Locations.Code.code_str', ondelete=u'RESTRICT', onupdate=u'CASCADE'))
    departure_date = Column(DateTime)

    voyage_departure = relationship(u'Codes', primaryjoin='Voyage.departure == Codes.code_str')
    voyage_call = relationship(u'Codes', primaryjoin='Voyage.call == Codes.code_str')


class Definitions(Base):
    __tablename__ = 'Definition'
    __table_args__ = {u'schema': 'Structure'}

    unit_id = Column(Integer, primary_key=True)
    name = Column(String(90))
    type = Column(ForeignKey(u'Structure.Type.id', ondelete=u'RESTRICT', onupdate=u'CASCADE'))

    locations = relationship(u'Locations', backref=backref('Definition'))
    dimensions = relationship(u'Dimensions', backref=backref('Definition'))
    types = relationship(u'Types', backref=backref('Definition'))
    voyages = relationship(u'Voyages', backref=backref('Definition'))


class Dimensions(Base):
    __tablename__ = 'Dimension'
    __table_args__ = {u'schema': 'Structure'}

    unit_id = Column(ForeignKey(u'Structure.Definition.unit_id', ondelete=u'RESTRICT', onupdate=u'CASCADE'), primary_key=True, nullable=False)
    length = Column(Float)


class Types(Base):
    __tablename__ = 'Type'
    __table_args__ = {u'schema': 'Structure'}

    id = Column(SmallInteger, primary_key=True)
    type_name = Column(String(255))
    type_description = Column(String(255))

我想做的是从Codes表中找到一行特定的数据(通过code_str来过滤),然后返回所有相关的表格,但有几个条件:Location表只返回最新的一行数据(根据timestamp),Voyage表也只返回最新的一行(根据departure),而且必须包含Definitions表中的所有信息。

我开始从头创建一个查询,结果遇到了这样的情况:

string_to_search = request.matchdict.get('code')

sub_dest = DBSession.query(func.max(Voyage.departure).label('latest_voyage_timestamp'), Voyage.unit_id, Voyage.call.label('destination_call')).\
    filter(Voyage.call== string_to_search).\
    group_by(Voyage.unit_id, Voyage.call).\
    subquery()

query = DBSession.query(Codes, Voyage).\
    join(sub_dest, sub_dest.c.destination_call == Codes.code_str).\
    outerjoin(Voyage, sub_dest.c.latest_voyage_timestamp == Voyage.departure_date)

但我注意到,当我遍历结果时(比如用for code, voyage in query),我实际上是在遍历每一个返回的Voyage。理论上这对我来说不是个大问题,但我想构建一个包含Codes表基本信息的json响应,这个响应应该包括所有可能的Voyages(如果有的话)。

举个例子:

code_data = {}
all_units = []

for code, voyage in query:
    if code_data is not {}:
        code_data = {
            'code_id': code.id,
            'code_str': code.code_str,
            'code_name': code.name,
        }

    single_unit = {
        'unit_id': voyage.unit_id,
        'unit_departure': str(voyage.departure_date) if voyage.departure_date else None,
    }
    all_units.append(single_unit)

return {
    'code_data':  exception.message if exception else code_data,
    'voyages': exception.message if exception else all_units,
}

现在,这样做似乎有点不对,因为我不喜欢在每次循环中都重写这个code_data,所以我在这里加了if code_data is not {}这一行,但我觉得用类似下面的方式来遍历会更好(更合理):

for code in query:
    code_data = {
        'code_id': code.id,
        'code_str': code.code_str,
        'code_name': code.name,
    }
    for voyage in code.voyages:
        single_unit = {
            'unit_id': voyage.unit_id,
            'unit_departure': str(voyage.departure) if voyage.departure else None,
        }
        all_units.append(single_unit)

return {
    'code_data':  exception.message if exception else code_data,
}

所以,我想只返回一个Code(因为我查询了数据库中的这个特定Code),然后这个Code会有所有相关的Voyages作为嵌套值,当然,每个Voyage中也会包含与特定UnitDefinition相关的所有其他信息……

我这种方法一开始就好吗?我该如何构建我的查询,以便以第二种方式进行遍历呢?

我使用的是Python 2.7.6,SQLAlchemy 0.9.7和Pyramid 1.5.1,数据库是Postgres。

谢谢!

1 个回答

1

试着把外层查询改成这样:

query = DBSession.query(Codes).options(contains_eager('incoming')).\
    join(sub_dest, sub_dest.c.destination_call == Codes.code_str).\
    outerjoin(Voyage, sub_dest.c.latest_voyage_timestamp == Voyage.departure_date)

如果遇到问题,可以尝试这样调用 options(...) 部分:

(...) .options(contains_eager(Codes.incoming)). (...)

这样应该会返回一个单独的 Codes 实例,并且可以通过你定义的关系(incoming)访问到 Voyages 对象,所以你可以继续进行:

results = query.all()
for code in results:
    print code 
    # do something with code.incoming
    # actually, you should get only one code so if it proves to work, you should 
    # use query.one() so that in case something else than a single Code is returned,
    # an exception is thrown

当然,你需要导入一些东西,比如:from sqlalchemy.orm import contains_eager

撰写回答