使用SQLAlchemy筛选相互间隔不超过两秒的对象

5 投票
3 回答
3422 浏览
提问于 2025-04-15 23:18

我有两个表格,其中都有一个叫“date”的列。第一个表格存储的是(名字,日期),而第二个表格存储的是(日期,p1,p2)。现在,给定一个名字,我想用第一个表格中的日期去查询第二个表格中的p1和p2;匹配的条件是第一个表格的日期和第二个表格的日期相差不超过两秒。

我该如何使用SQLAlchemy来实现这个功能呢?

我尝试过使用between操作符,但没有成功,写了类似这样的条件:

td = datetime.timedelta(seconds=2)
q = session.query(table1, table2).filter(table1.name=='my_name').\
    filter(between(table1.date, table2.date - td, table2.date + td))

有没有什么想法呢?

编辑: 我已经找到了解决这个问题的方法,使用了以下的方式:

from sqlalchemy.sql import between
import datetime
# [all other relevant imports]

td = datetime.timedelta(seconds=2)
t1_entry = session.query(table_1).filter(table_1.name == 'the_name').first()
if t1_entry is not None:
 tmin = t1_entry.date - td
 tmax = t1_entry.date + td
 t2_entry = session.query(table_2).filter(between(table_2.date, tmin, tmax)).first()
 return (t1_entry, t2_entry)
return None

所以比较是可以做到的,但我不确定这个方法是否高效。

3 个回答

1

这个方法是把日期转换成 Unix 时间戳。

在我最近的代码中,我成功地用了以下几行:

from sqlalchemy.sql import func
...
q = q.join(q2, func.abs(func.unix_timestamp(rssi1.datetime)-func.unix_timestamp(q2.c.datetime)) <=2 )

不过要注意,func.xxx 只是把 xxx 作为字符串复制到查询中,所以数据库必须支持这个 xxx 函数。这个例子是针对 MySQL 的。

3

首先,我来解释一下你尝试的做法为什么不奏效。SQLAlchemy其实就是一个方便的工具,用来写SQL查询,但所有的查询还是在远程数据库上进行。SQLAlchemy中的列是一些特殊的对象,它们的 __eq____gt__ 等方法被重写了,不是返回 TrueFalse,而是返回其他特殊对象,这些对象会记住它们比较的对象,并且可以在之后生成合适的SQL语句。添加等操作也是一样:自定义的 __add____sub__ 方法不会返回一个数字或拼接的字符串,而是返回一个可以生成SQL语句的对象。你可以把它们和字符串、整数、其他列、选择语句、MySQL函数调用等进行比较或相加,但不能和像timedelta这样的特殊Python对象进行比较。(简化了,可能技术上不完全正确;)

所以你可以做的是:

  • 把数据库中的值设为整数,比如unix时间戳。这样你的 between 查询就能正常工作(用 2 代替时间差)
  • 使用数据库端的函数把日期时间格式转换为unix时间戳,然后再进行比较。

更新:我稍微试了一下,不知怎么的,它确实能工作,甚至还有一个 Interval 数据类型。不过在这里它似乎没有正常工作:

MySQL:

>>> db.session.execute(db.select([User.date_joined, User.date_joined + timedelta(seconds=2)], limit=1)).fetchall()
[(datetime.datetime(2009, 7, 10, 20, 47, 33), 20090710204733.0)]
>>> db.session.execute(db.select([User.date_joined, User.date_joined + 2], limit=1)).fetchall()
[(datetime.datetime(2009, 7, 10, 20, 47, 33), 20090710204735.0)]
>>> db.session.execute(db.select([User.date_joined+0, User.date_joined + 2], limit=1)).fetchall()
[(20090710204733.0, 20090710204735.0)]

SQLite:

>>> db.session.execute(db.select([User.date_joined, User.date_joined + timedelta(seconds=2)], limit=1)).fetchall()
TypeError: expected string or buffer
>>> db.session.execute(db.select([User.date_joined, User.date_joined + 2], limit=1)).fetchall()
[(datetime.datetime(2010, 5, 28, 23, 8, 22, 476708), 2012)]
>>> db.session.execute(db.select([User.date_joined+0, User.date_joined + 2], limit=1)).fetchall()
[(2010, 2012)]

我不知道为什么第一个在MySQL上失败了,为什么返回的是浮点数。SQLite的错误似乎是因为SQLite 没有DATETIME数据类型,而SQLAlchemy把它存储为字符串

你可能需要多试试,或许能找到一个可行的方法,但我觉得为了保持真正的数据库独立,使用整数的方法可能是唯一可行的方式。

0

在这个问题讨论了一段时间后,我想出了目前的解决方法:

from sqlalchemy.sql import between
import datetime
# [all other relevant imports]

td = datetime.timedelta(seconds=2)
t1_entry = session.query(table_1).filter(table_1.name == 'the_name').first()
if t1_entry is not None:
 tmin = t1_entry.date - td
 tmax = t1_entry.date + td
 t2_entry = session.query(table_2).filter(between(table_2.date, tmin, tmax)).first()
 return (t1_entry, t2_entry)
return None

如果你有更好的想法,我会接受你的回答。

撰写回答