在Pyramid中使用SQLAlchemy创建带有相关表的查询
我在Pyramid中定义了几个表,像这样:
# coding: utf-8
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Integer, Float, DateTime, ForeignKey, ForeignKeyConstraint, String, Column
from sqlalchemy.orm import scoped_session, sessionmaker, relationship, backref,
from zope.sqlalchemy import ZopeTransactionExtension
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = declarative_base()
class Codes(Base):
__tablename__ = 'Code'
__table_args__ = {u'schema': 'Locations'}
id = Column(Integer, nullable=False)
code_str = Column(String(9), primary_key=True)
name = Column(String(100))
incoming = relationship(u'Voyages', primaryjoin='Voyage.call == Codes.code_str', backref=backref('Code'))
class Locations(Base):
__tablename__ = 'Location'
__table_args__ = {u'schema': 'Locations'}
unit_id = Column(ForeignKey(u'Structure.Definition.unit_id', ondelete=u'RESTRICT', onupdate=u'CASCADE'), primary_key=True, nullable=False)
timestamp = Column(DateTime, primary_key=True, nullable=False)
longitude = Column(Float)
latitude = Column(Float)
class Voyages(Base):
__tablename__ = 'Voyage'
__table_args__ = (ForeignKeyConstraint(['unit_id', 'Voyage_id'], [u'Locations.Voyages.unit_id', u'Locations.Voyages.voyage_id'], ondelete=u'RESTRICT', onupdate=u'CASCADE'), {u'schema': 'Locations'}
)
uid = Column(Integer, primary_key=True)
unit_id = Column(Integer)
voyage_id = Column(Integer)
departure = Column(ForeignKey(u'Locations.Code.code_str', ondelete=u'RESTRICT', onupdate=u'CASCADE'))
call = Column(ForeignKey(u'Locations.Code.code_str', ondelete=u'RESTRICT', onupdate=u'CASCADE'))
departure_date = Column(DateTime)
voyage_departure = relationship(u'Codes', primaryjoin='Voyage.departure == Codes.code_str')
voyage_call = relationship(u'Codes', primaryjoin='Voyage.call == Codes.code_str')
class Definitions(Base):
__tablename__ = 'Definition'
__table_args__ = {u'schema': 'Structure'}
unit_id = Column(Integer, primary_key=True)
name = Column(String(90))
type = Column(ForeignKey(u'Structure.Type.id', ondelete=u'RESTRICT', onupdate=u'CASCADE'))
locations = relationship(u'Locations', backref=backref('Definition'))
dimensions = relationship(u'Dimensions', backref=backref('Definition'))
types = relationship(u'Types', backref=backref('Definition'))
voyages = relationship(u'Voyages', backref=backref('Definition'))
class Dimensions(Base):
__tablename__ = 'Dimension'
__table_args__ = {u'schema': 'Structure'}
unit_id = Column(ForeignKey(u'Structure.Definition.unit_id', ondelete=u'RESTRICT', onupdate=u'CASCADE'), primary_key=True, nullable=False)
length = Column(Float)
class Types(Base):
__tablename__ = 'Type'
__table_args__ = {u'schema': 'Structure'}
id = Column(SmallInteger, primary_key=True)
type_name = Column(String(255))
type_description = Column(String(255))
我想做的是从Codes
表中找到一行特定的数据(通过code_str
来过滤),然后返回所有相关的表格,但有几个条件:Location
表只返回最新的一行数据(根据timestamp
),Voyage
表也只返回最新的一行(根据departure
),而且必须包含Definitions
表中的所有信息。
我开始从头创建一个查询,结果遇到了这样的情况:
string_to_search = request.matchdict.get('code')
sub_dest = DBSession.query(func.max(Voyage.departure).label('latest_voyage_timestamp'), Voyage.unit_id, Voyage.call.label('destination_call')).\
filter(Voyage.call== string_to_search).\
group_by(Voyage.unit_id, Voyage.call).\
subquery()
query = DBSession.query(Codes, Voyage).\
join(sub_dest, sub_dest.c.destination_call == Codes.code_str).\
outerjoin(Voyage, sub_dest.c.latest_voyage_timestamp == Voyage.departure_date)
但我注意到,当我遍历结果时(比如用for code, voyage in query
),我实际上是在遍历每一个返回的Voyage
。理论上这对我来说不是个大问题,但我想构建一个包含Codes
表基本信息的json响应,这个响应应该包括所有可能的Voyages
(如果有的话)。
举个例子:
code_data = {}
all_units = []
for code, voyage in query:
if code_data is not {}:
code_data = {
'code_id': code.id,
'code_str': code.code_str,
'code_name': code.name,
}
single_unit = {
'unit_id': voyage.unit_id,
'unit_departure': str(voyage.departure_date) if voyage.departure_date else None,
}
all_units.append(single_unit)
return {
'code_data': exception.message if exception else code_data,
'voyages': exception.message if exception else all_units,
}
现在,这样做似乎有点不对,因为我不喜欢在每次循环中都重写这个code_data
,所以我在这里加了if code_data is not {}
这一行,但我觉得用类似下面的方式来遍历会更好(更合理):
for code in query:
code_data = {
'code_id': code.id,
'code_str': code.code_str,
'code_name': code.name,
}
for voyage in code.voyages:
single_unit = {
'unit_id': voyage.unit_id,
'unit_departure': str(voyage.departure) if voyage.departure else None,
}
all_units.append(single_unit)
return {
'code_data': exception.message if exception else code_data,
}
所以,我想只返回一个Code
(因为我查询了数据库中的这个特定Code
),然后这个Code
会有所有相关的Voyages
作为嵌套值,当然,每个Voyage
中也会包含与特定Unit
的Definition
相关的所有其他信息……
我这种方法一开始就好吗?我该如何构建我的查询,以便以第二种方式进行遍历呢?
我使用的是Python 2.7.6,SQLAlchemy 0.9.7和Pyramid 1.5.1,数据库是Postgres。
谢谢!
1 个回答
试着把外层查询改成这样:
query = DBSession.query(Codes).options(contains_eager('incoming')).\
join(sub_dest, sub_dest.c.destination_call == Codes.code_str).\
outerjoin(Voyage, sub_dest.c.latest_voyage_timestamp == Voyage.departure_date)
如果遇到问题,可以尝试这样调用 options(...) 部分:
(...) .options(contains_eager(Codes.incoming)). (...)
这样应该会返回一个单独的 Codes
实例,并且可以通过你定义的关系(incoming
)访问到 Voyages
对象,所以你可以继续进行:
results = query.all()
for code in results:
print code
# do something with code.incoming
# actually, you should get only one code so if it proves to work, you should
# use query.one() so that in case something else than a single Code is returned,
# an exception is thrown
当然,你需要导入一些东西,比如:from sqlalchemy.orm import contains_eager