在SQLAlchemy中映射多个相似表

11 投票
3 回答
8319 浏览
提问于 2025-04-18 00:16

我有大约2000个地点,每个地点都有时间序列数据。每个时间序列的数据行数都达到了几百万。我想把这些数据存储在Postgres数据库里。现在我的做法是为每个地点的时间序列创建一个表,同时还有一个元数据表,用来存储每个地点的信息,比如坐标、高度等等。我正在使用Python和SQLAlchemy来创建和填充这些表。我希望能在元数据表和每个时间序列表之间建立关系,这样我就可以进行一些查询,比如“选择所有在日期A和日期B之间有数据的地点”以及“选择日期A的所有数据并导出一个包含坐标的CSV文件”。那么,创建很多结构相同(只有名字不同)的表,并与元数据表建立关系的最佳方法是什么?或者我应该使用不同的数据库设计呢?

目前,我使用这种方法来生成很多类似的映射:

from sqlalchemy import create_engine, MetaData
from sqlalchemy.types import Float, String, DateTime, Integer
from sqlalchemy import Column, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, backref

Base = declarative_base()


def make_timeseries(name):
    class TimeSeries(Base):

        __tablename__ = name
        table_name = Column(String(50), ForeignKey('locations.table_name'))
        datetime = Column(DateTime, primary_key=True)
        value = Column(Float)

        location = relationship('Location', backref=backref('timeseries',
                                lazy='dynamic'))

        def __init__(self, table_name, datetime, value):
            self.table_name = table_name
            self.datetime = datetime
            self.value = value

        def __repr__(self):
            return "{}: {}".format(self.datetime, self.value)

    return TimeSeries


class Location(Base):

    __tablename__ = 'locations'
    id = Column(Integer, primary_key=True)
    table_name = Column(String(50), unique=True)
    lon = Column(Float)
    lat = Column(Float)

if __name__ == '__main__':
    connection_string = 'postgresql://user:pw@localhost/location_test'
    engine = create_engine(connection_string)
    metadata = MetaData(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()

    TS1 = make_timeseries('ts1')
    # TS2 = make_timeseries('ts2')   # this breaks because of the foreign key
    Base.metadata.create_all(engine)
    session.add(TS1("ts1", "2001-01-01", 999))
    session.add(TS1("ts1", "2001-01-02", -555))

    qs = session.query(Location).first()
    print qs.timeseries.all()

这种方法有一些问题,最明显的是如果我创建多个TimeSeries,外键就无法正常工作。之前我用过一些变通的方法,但感觉这些都是临时解决方案,似乎应该有更好的方法来处理这个问题。我应该如何组织和访问我的数据呢?

3 个回答

3

分为两个部分:

只用两个表

没必要创建几十个或几百个一模一样的表。只需要一个 location 表和一个 location_data 表,所有的记录都要关联到 location 表。还要在 location_data 表上为 location_id 创建一个索引,这样可以提高搜索效率。

不要用sqlalchemy来创建这个

我很喜欢sqlalchemy,每天都在用。它非常适合管理数据库和添加一些记录,但在初始设置时,如果数据量有几百万条,就不建议使用它。你应该生成一个与Postgres的“COPY”语句兼容的文件 [ http://www.postgresql.org/docs/9.2/static/sql-copy.html ]。COPY可以快速导入大量数据;在备份和恢复操作时也会用到它。

sqlalchemy在查询数据和添加新记录时会非常好用。如果你有批量操作,应该使用COPY。

3

我建议你不要采用你提到的那种数据库设计。我对你正在处理的数据了解不多,但听起来你应该有两个表。一个表用来存放位置,另一个子表用来存放位置的数据。位置表会存储你提到的那些信息,比如坐标和海拔。位置数据表则会存储位置表中的位置ID,以及你想要跟踪的时间序列数据。

这样做可以避免每次添加新位置时都要改变数据库结构和代码,也能让你进行你想要的查询。

14

替代方案1:表分区

一看到完全相同的表结构,我就想到了分区。我不是数据库管理员,也没有太多实际使用经验(尤其是在PostgreSQL上),但请你看看PostgreSQL - 分区的文档。表分区正好可以解决你遇到的问题,不过有超过1000个表/分区听起来有点棘手;所以建议你在论坛或StackOverflow上多查查关于这个话题的可扩展性问题。

考虑到你最常用的搜索条件中,datetime这个部分非常重要,因此必须有一个可靠的索引策略。如果你决定走分区这条路,显而易见的分区策略就是根据日期范围来分区。这可能让你把旧数据和最新数据分成不同的块,尤其是考虑到旧数据几乎不会被更新,这样物理布局会更紧凑和高效;而对于“最近”的数据,你可以采用另一种策略。

替代方案2:欺骗SQLAlchemy

这基本上是通过欺骗SQLAlchemy,让它认为所有的TimeSeries都是一个实体的子类来让你的示例代码正常工作。下面的代码是自包含的,会创建50个表,并且里面有最少的数据。但如果你已经有一个数据库,这应该能让你快速检查性能,这样你就可以决定这是否是一个可行的选择。

from datetime import date, datetime

from sqlalchemy import create_engine, Column, String, Integer, DateTime, Float, ForeignKey, func
from sqlalchemy.orm import sessionmaker, relationship, configure_mappers, joinedload
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.ext.declarative import AbstractConcreteBase, ConcreteBase


engine = create_engine('sqlite:///:memory:', echo=True)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base(engine)


# MODEL
class Location(Base):
    __tablename__ = 'locations'
    id = Column(Integer, primary_key=True)
    table_name = Column(String(50), unique=True)
    lon = Column(Float)
    lat = Column(Float)


class TSBase(AbstractConcreteBase, Base):
    @declared_attr
    def table_name(cls):
        return Column(String(50), ForeignKey('locations.table_name'))


def make_timeseries(name):
    class TimeSeries(TSBase):
        __tablename__ = name
        __mapper_args__ = { 'polymorphic_identity': name, 'concrete':True}

        datetime = Column(DateTime, primary_key=True)
        value = Column(Float)

        def __init__(self, datetime, value, table_name=name ):
            self.table_name = table_name
            self.datetime = datetime
            self.value = value

    return TimeSeries


def _test_model():
    _NUM = 50
    # 0. generate classes for all tables
    TS_list = [make_timeseries('ts{}'.format(1+i)) for i in range(_NUM)]
    TS1, TS2, TS3 = TS_list[:3] # just to have some named ones
    Base.metadata.create_all()
    print('-'*80)

    # 1. configure mappers
    configure_mappers()

    # 2. define relationship
    Location.timeseries = relationship(TSBase, lazy="dynamic")
    print('-'*80)

    # 3. add some test data
    session.add_all([Location(table_name='ts{}'.format(1+i), lat=5+i, lon=1+i*2)
        for i in range(_NUM)])
    session.commit()
    print('-'*80)

    session.add(TS1(datetime(2001,1,1,3), 999))
    session.add(TS1(datetime(2001,1,2,2), 1))
    session.add(TS2(datetime(2001,1,2,8), 33))
    session.add(TS2(datetime(2002,1,2,18,50), -555))
    session.add(TS3(datetime(2005,1,3,3,33), 8))
    session.commit()


    # Query-1: get all timeseries of one Location
    #qs = session.query(Location).first()
    qs = session.query(Location).filter(Location.table_name == "ts1").first()
    print(qs)
    print(qs.timeseries.all())
    assert 2 == len(qs.timeseries.all())
    print('-'*80)


    # Query-2: select all location with data between date-A and date-B
    dateA, dateB = date(2001,1,1), date(2003,12,31)
    qs = (session.query(Location)
            .join(TSBase, Location.timeseries)
            .filter(TSBase.datetime >= dateA)
            .filter(TSBase.datetime <= dateB)
            ).all()
    print(qs)
    assert 2 == len(qs)
    print('-'*80)


    # Query-3: select all data (including coordinates) for date A
    dateA = date(2001,1,1)
    qs = (session.query(Location.lat, Location.lon, TSBase.datetime, TSBase.value)
            .join(TSBase, Location.timeseries)
            .filter(func.date(TSBase.datetime) == dateA)
            ).all()
    print(qs)
    # @note: qs is list of tuples; easy export to CSV
    assert 1 == len(qs)
    print('-'*80)


if __name__ == '__main__':
    _test_model()

替代方案3:大数据风格

如果你在使用数据库时遇到性能问题,我可能会尝试:

  • 像现在这样,仍然把数据保存在不同的表/数据库/模式中
  • 使用数据库引擎提供的“原生”解决方案进行批量导入数据
  • 使用MapReduce风格的分析。
    • 在这里,我会继续使用Python和SQLAlchemy,自己实现分布式查询和聚合(或者找一些现成的)。显然,这只有在你不需要直接在数据库上生成这些结果的情况下才有效。

编辑1: 替代方案4:时间序列数据库

我在大规模使用这些方面没有经验,但绝对是一个值得考虑的选项。


如果你能在之后分享你的发现和整个决策过程,那就太棒了。

撰写回答