SQLAlchemy计算列

2024-05-13 20:36:13 发布

您现在位置:Python中文网/ 问答频道 /正文

(New SQLAlchemy user alert)我有三个表:person、persons从特定日期开始的小时费率和daily time reporting。我正在寻找一种正确的方法,使费用的时间基础上的人小时费率在那一天。

是的,我可以计算创建时的值,并将其作为模型的一部分,但可以将此视为在幕后总结更复杂数据的示例。如何计算时间成本?它是混合型房地产、柱房地产还是完全不同的房地产?

class Person(Base):
    __tablename__ = 'person'
    personID = Column(Integer, primary_key=True)
    name = Column(String(30), unique=True)

class Payrate(Base):
    __tablename__ = 'payrate'
    payrateID = Column(Integer, primary_key=True)
    personID  = Column(Integer, ForeignKey('person.personID'))
    hourly    = Column(Integer)
    starting  = Column(Date)
    __tableargs__ =(UniqueConstraint('personID', 'starting',
                                     name='uc_peron_starting'))

class Time(Base):
    __tablename__ = 'entry'
    entryID  = Column(Integer, primary_key=True)
    personID = Column(Integer, ForeignKey('person.personID'))
    workedon = Column(Date)
    hours    = Column(Integer)

    person = relationship("Person")

    def __repr__(self):
        return "<{date} {hours}hrs ${0.cost:.02f}>".format(self, 
                      date=self.workedon.isoformat(), hours=to_hours(self.hours))

    @property
    def cost(self):
        '''Cost of entry
        '''
        ## This is where I am stuck in propery query creation
        return self.hours * query(Payrate).filter(
                             and_(Payrate.personID==personID,
                                  Payrate.starting<=workedon
                             ).order_by(
                               Payrate.starting.desc())

Tags: keyselftruebasecolumnintegerclassperson
2条回答

很多时候,我能给的最好的建议就是做不同的事情。像这样的多表计算列是数据库views的用途。使用计算列基于时间表(或其他任何需要的内容)构建视图,基于该视图构建模型,然后设置。这可能也会减轻数据库的压力。这也是一个很好的例子,说明了为什么将设计限制在通过自动化migrations可以完成的工作上是危险的。

您在这里遇到的问题,为了尽可能优雅地解决,使用了非常先进的SQLAlchemy技术,所以我知道您是一个初学者,但是这个答案将一直向您展示到最后。然而,解决这样的问题需要一步一个脚印地走,在我们走过的过程中,你可以用不同的方式得到你想要的答案。

在你开始研究如何混合这个或什么之前,你需要考虑一下SQL。如何查询任意行序列上的Time.cost?因为我们有一个简单的外键,所以我们可以把时间和人清晰地联系起来。但是要将时间链接到Payrate,使用这个特定的模式是很困难的,因为时间链接到Payrate不仅仅是通过person_id,而且还通过workedon-在SQL中,我们可以使用“Time.person_id=person.id和Time.workedon在Payrate.start_date和Payrate.end_date之间”来连接到它。但这里没有“结束日期”,这意味着我们也必须推导出它。这个派生是最棘手的部分,所以我想到的是这样开始的(我将列名小写):

SELECT payrate.person_id, payrate.hourly, payrate.starting, ending.ending
FROM payrate LEFT OUTER JOIN
(SELECT pa1.payrate_id, MIN(pa2.starting) as ending FROM payrate AS pa1
JOIN payrate AS pa2 ON pa1.person_id = pa2.person_id AND pa2.starting > pa1.starting
GROUP BY pa1.payrate_id
) AS ending ON payrate.payrate_id=ending.payrate_id

可能有其他的方法可以得到这个结果,但我就是这么想的-其他方法几乎肯定会有类似的事情发生(即子查询、连接)。

所以有了开始/结束的工资率,我们就可以知道查询是什么样子了。我们希望使用BETWEEN将时间项与日期范围匹配,但最新的付款率项对于“结束”日期将为空,因此解决此问题的一种方法是对非常高的日期使用COALESCE(另一种方法是使用条件):

SELECT *, entry.hours * payrate_derived.hourly
FROM entry
JOIN
    (SELECT payrate.person_id, payrate.hourly, payrate.starting, ending.ending
    FROM payrate LEFT OUTER JOIN
    (SELECT pa1.payrate_id, MIN(pa2.starting) as ending FROM payrate AS pa1
    JOIN payrate AS pa2 ON pa1.person_id = pa2.person_id AND pa2.starting > pa1.starting
    GROUP BY pa1.payrate_id
    ) AS ending ON payrate.payrate_id=ending.payrate_id) as payrate_derived
ON entry.workedon BETWEEN payrate_derived.starting AND COALESCE(payrate_derived.ending, "9999-12-31")
AND entry.person_id=payrate_derived.person_id
ORDER BY entry.person_id, entry.workedon

现在@hybrid在SQLAlchemy中可以为您做的是,当在SQL表达式级别运行时,仅仅是“entry.hours*payrate_derived.hourly”部分,仅此而已。所有的连接等,你需要提供外部的混合。

所以我们需要把那个大的子查询放在下面:

class Time(...):
    @hybrid_property
    def cost(self):
        # ....

    @cost.expression
    def cost(cls):
        return cls.hours * <SOMETHING>.hourly

所以让我们找出<SOMETHING>是什么。建立选择为对象:

from sqlalchemy.orm import aliased, join, outerjoin
from sqlalchemy import and_, func

pa1 = aliased(Payrate)
pa2 = aliased(Payrate)
ending = select([pa1.payrate_id, func.min(pa2.starting).label('ending')]).\
            select_from(join(pa1, pa2, and_(pa1.person_id == pa2.person_id, pa2.starting > pa1.starting))).\
            group_by(pa1.payrate_id).alias()

payrate_derived = select([Payrate.person_id, Payrate.hourly, Payrate.starting, ending.c.ending]).\
    select_from(outerjoin(Payrate, ending, Payrate.payrate_id == ending.c.payrate_id)).alias()

表达式端的cost()混合需要引用派生的payrate(我们将在一分钟内完成python端的工作):

class Time(...):
    @hybrid_property
    def cost(self):
        # ....

    @cost.expression
    def cost(cls):
        return cls.hours * payrate_derived.c.hourly

然后,为了使用我们的cost()混合,它必须在具有该连接的查询的上下文中。注意这里我们使用Python的datetime.date.max来获得最大日期(handy!)以下内容:

print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
                    select_from(Time).\
                    join(Time.person).\
                    join(payrate_derived,
                            and_(
                                payrate_derived.c.person_id == Time.person_id,
                                Time.workedon.between(
                                    payrate_derived.c.starting,
                                    func.coalesce(
                                        payrate_derived.c.ending,
                                        datetime.date.max
                                    )
                                )
                            )
                    ).\
                    all()

所以这个连接很大,而且很笨拙,我们需要经常这样做,更不用说当我们使用Python混合时,我们需要在Python中加载相同的集合。我们可以使用relationship()映射到它,这意味着我们必须设置自定义连接条件,但是我们还需要使用一种不太为人所知的称为非主映射器的技术实际映射到该子查询。非主映射器提供了一种将类映射到某个任意表或SELECT构造的方法,仅用于选择行。我们通常不需要使用它,因为查询已经允许我们查询任意列和子查询,但是要从relationship()中得到它,它需要一个映射。映射需要定义主键,关系还需要知道关系的哪一方是“外来的”。这是这里最高级的部分,在这种情况下,它的工作原理如下:

from sqlalchemy.orm import mapper, relationship, foreign

payrate_derived_mapping = mapper(Payrate, payrate_derived, non_primary=True,
                                        primary_key=[
                                            payrate_derived.c.person_id,
                                            payrate_derived.c.starting
                                        ])
Time.payrate = relationship(
                    payrate_derived_mapping,
                    viewonly=True,
                    uselist=False,
                    primaryjoin=and_(
                            payrate_derived.c.person_id == foreign(Time.person_id),
                            Time.workedon.between(
                                payrate_derived.c.starting,
                                func.coalesce(
                                    payrate_derived.c.ending,
                                    datetime.date.max
                                )
                            )
                        )
                    )

所以这是我们最后一次看到这种结合。我们现在可以在前面执行查询:

print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
                    select_from(Time).\
                    join(Time.person).\
                    join(Time.payrate).\
                    all()

最后,我们还可以将新的payrate关系连接到Python级别的混合体中:

class Time(Base):
    # ...

    @hybrid_property
    def cost(self):
        return self.hours * self.payrate.hourly

    @cost.expression
    def cost(cls):
        return cls.hours * payrate_derived.c.hourly

我们在这里的解决方案花费了很多努力,但至少最复杂的部分,即工资率映射,完全在一个地方,我们不需要再看它。

下面是一个完整的工作示例:

from sqlalchemy import create_engine, Column, Integer, ForeignKey, Date, \
                    UniqueConstraint, select, func, and_, String
from sqlalchemy.orm import join, outerjoin, relationship, Session, \
                    aliased, mapper, foreign
from sqlalchemy.ext.declarative import declarative_base
import datetime
from sqlalchemy.ext.hybrid import hybrid_property


Base = declarative_base()

class Person(Base):
    __tablename__ = 'person'
    person_id = Column(Integer, primary_key=True)
    name = Column(String(30), unique=True)

class Payrate(Base):
    __tablename__ = 'payrate'
    payrate_id = Column(Integer, primary_key=True)
    person_id  = Column(Integer, ForeignKey('person.person_id'))
    hourly    = Column(Integer)
    starting  = Column(Date)

    person = relationship("Person")
    __tableargs__ =(UniqueConstraint('person_id', 'starting',
                                     name='uc_peron_starting'))

class Time(Base):
    __tablename__ = 'entry'
    entry_id  = Column(Integer, primary_key=True)
    person_id = Column(Integer, ForeignKey('person.person_id'))
    workedon = Column(Date)
    hours    = Column(Integer)

    person = relationship("Person")

    @hybrid_property
    def cost(self):
        return self.hours * self.payrate.hourly

    @cost.expression
    def cost(cls):
        return cls.hours * payrate_derived.c.hourly

pa1 = aliased(Payrate)
pa2 = aliased(Payrate)
ending = select([pa1.payrate_id, func.min(pa2.starting).label('ending')]).\
            select_from(join(pa1, pa2, and_(
                                        pa1.person_id == pa2.person_id,
                                        pa2.starting > pa1.starting))).\
            group_by(pa1.payrate_id).alias()

payrate_derived = select([Payrate.person_id, Payrate.hourly, Payrate.starting, ending.c.ending]).\
    select_from(outerjoin(Payrate, ending, Payrate.payrate_id == ending.c.payrate_id)).alias()

payrate_derived_mapping = mapper(Payrate, payrate_derived, non_primary=True,
                                        primary_key=[
                                            payrate_derived.c.person_id,
                                            payrate_derived.c.starting
                                        ])
Time.payrate = relationship(
                    payrate_derived_mapping,
                    viewonly=True,
                    uselist=False,
                    primaryjoin=and_(
                            payrate_derived.c.person_id == foreign(Time.person_id),
                            Time.workedon.between(
                                payrate_derived.c.starting,
                                func.coalesce(
                                    payrate_derived.c.ending,
                                    datetime.date.max
                                )
                            )
                        )
                    )



e = create_engine("postgresql://scott:tiger@localhost/test", echo=False)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)

session = Session(e)
p1 = Person(name='p1')
session.add(p1)

session.add_all([
    Payrate(hourly=10, starting=datetime.date(2013, 5, 17), person=p1),
    Payrate(hourly=15, starting=datetime.date(2013, 5, 25), person=p1),
    Payrate(hourly=20, starting=datetime.date(2013, 6, 10), person=p1),
])

session.add_all([
    Time(person=p1, workedon=datetime.date(2013, 5, 19), hours=10),
    Time(person=p1, workedon=datetime.date(2013, 5, 27), hours=5),
    Time(person=p1, workedon=datetime.date(2013, 5, 30), hours=5),
    Time(person=p1, workedon=datetime.date(2013, 6, 18), hours=12),
])
session.commit()

print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
                    select_from(Time).\
                    join(Time.person).\
                    join(Time.payrate).\
                    all()

for time in session.query(Time):
    print time.person.name, time.workedon, time.hours, time.payrate.hourly, time.cost

输出(第一行是聚合版本,其余是每个对象):

[(u'p1', datetime.date(2013, 5, 19), 10, 100), (u'p1', datetime.date(2013, 5, 27), 5, 75), (u'p1', datetime.date(2013, 5, 30), 5, 75), (u'p1', datetime.date(2013, 6, 18), 12, 240)]
p1 2013-05-19 10 10 100
p1 2013-05-27 5 15 75
p1 2013-05-30 5 15 75
p1 2013-06-18 12 20 240

相关问题 更多 >