在SQLAlchemy中按类内的方法排序

1 投票
2 回答
1092 浏览
提问于 2025-04-18 16:05

我现在正在做一个模型,用来判断一篇文章的相关性。这是参考了Hacker News的算法。这里是我在 app/articles/models.py 中的文章模型。

from app.extensions import db

class Article(db.Model):
    """ database representation of an article """
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(128))
    subtitle = db.Column(db.String(512))
    body = db.Column(db.Text())
    votes = db.Column(db.Integer, default=1)
    views = db.Column(db.Integer, default=1)
    timestamp = db.Column(db.DateTime, default=datetime.utcnow)

    def popularity(self, gravity=1.8):
        """ uses hacker news popularity rating """
        submit_delta = (self.timestamp - datetime.utcnow()).total_seconds()
        time_decay = submit_delta / 60 / 60
        popularity = (self.views - 1) / (time_decay + 2) ** gravity
        return popularity

目前,我正在尝试根据 popularity 的结果来排序。

>>> from app.articles.models import Article
>>> Article.query.order_by(Article.popularity()).all()

但是这样做不行。我该如何根据文章的受欢迎程度来排序呢?

2 个回答

1

如果你想在数据库中用“排序”功能来计算受欢迎程度,你需要把这个计算过程用sql表达式重写。还有其他选择,比如把所有文章都取出来,然后在python里排序(但对于数据量大的时候,这样做效率不高),或者提前计算好所有的受欢迎程度值,然后把它们存储在数据库的一个数字字段里,再进行排序。

举个例子(这个例子是针对Postgres的,我没有用Flask的写法,但你应该能明白意思):

order_exp = "(article.views - 1) / power(2 + extract(epoch from (now() at time zone 'UTC' - timestamp))/3600, :gravity)"
order = sqlalchemy.text(order_exp).bindparams(gravity=1.8)
print(session.query(Article).order_by(order).all())
3

你可以使用混合方法来创建一个方法,这个方法在类被调用时会生成一个SQL表达式(用于查询),而在实例被调用时则表现得像一个普通的方法。

下面是一个实际的例子。它会打印出通过Python和数据库计算的受欢迎程度。这两个结果会因为时间和四舍五入的原因而略有不同。

from datetime import datetime
from sqlalchemy import create_engine, Integer, Column, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_method
from sqlalchemy.orm import Session

engine = create_engine('postgresql:///example', echo=True)
Base = declarative_base(bind=engine)
session = Session(bind=engine)


class Article(Base):
    __tablename__ = 'article'

    id = Column(Integer, primary_key=True)
    views = Column(Integer, nullable=False, default=1)
    ts = Column(DateTime, nullable=False, default=datetime.utcnow)

    @hybrid_method
    def popularity(self, gravity=1.8):
        seconds = (self.ts - datetime.utcnow()).total_seconds()
        hours = seconds / 3600

        return (self.views - 1) / (hours + 2) ** gravity

    @popularity.expression
    def popularity(self, gravity=1.8):
        seconds = func.extract('epoch', self.ts - func.now())
        hours = seconds / 3600

        return (self.views - 1) / func.power((hours + 2), gravity)


Base.metadata.create_all()

a1 = Article(views=100)
a2 = Article(views=200)

session.add_all((a1, a2))
session.commit()

comparison = session.query(Article, Article.popularity()).all()

for a, pop in comparison:
    print 'py: {} db: {}'.format(a.popularity(), pop)

这个方法在PostgreSQL中有效,但func.powerfunc.extract在其他数据库中的表现可能会有所不同。特别是SQLite没有power,而extract的实现方式也不同。

撰写回答