用Python每日更新PostgreSQL数据库股票价格

0 投票
1 回答
2199 浏览
提问于 2025-04-18 17:43

我在QuantState网站上找到了一段很棒的脚本,它详细讲解了如何建立自己的证券数据库,并加载历史价格信息。不过,我想对这个脚本进行一些修改,让它能够每天运行,并添加最新的股票报价。

我调整了初始数据加载的部分,只下载了一周的历史数据,但在写SQL语句时遇到了一些问题,想要在添加数据之前先检查一下这行数据是否已经存在。有没有人能帮我解决这个问题?以下是我目前的代码:

def insert_daily_data_into_db(data_vendor_id, symbol_id, daily_data):
  """Takes a list of tuples of daily data and adds it to the
   database. Appends the vendor ID and symbol ID to the data.

  daily_data: List of tuples of the OHLC data (with 
  adj_close and volume)"""

  # Create the time now
  now = datetime.datetime.utcnow()

  # Amend the data to include the vendor ID and symbol ID
  daily_data = [(data_vendor_id, symbol_id, d[0], now, now,
    d[1], d[2], d[3], d[4], d[5], d[6]) for d in daily_data]

  # Create the insert strings
  column_str = """data_vendor_id, symbol_id, price_date, created_date, 
          last_updated_date, open_price, high_price, low_price, 
          close_price, volume, adj_close_price"""
  insert_str = ("%s, " * 11)[:-2]
  final_str = "INSERT INTO daily_price (%s) VALUES (%s) WHERE NOT EXISTS (SELECT 1 FROM daily_price WHERE symbol_id = symbol_id AND price_date = insert_str[2])" % (column_str, insert_str)

  # Using the postgre connection, carry out an INSERT INTO for every symbol
  with con: 
    cur = con.cursor()
    cur.executemany(final_str, daily_data)

1 个回答

1

关于你上面的代码,有几点需要注意:

一般来说,尽量在纯 SQL 中使用 now(),而不是在 Python 中处理时间,这样可以避免很多与时区、库差异等相关的问题。

如果你构建一个 的列表,可以根据列表的大小动态生成一串 %s,这样就不需要把长度硬编码到一个重复的字符串中,然后再进行切片。

看起来 insert_daily_data_into_db 是打算在一个循环中按股票来调用的,所以我觉得你不应该在这里使用 executemany,因为它需要一个 元组列表,这在语义上是很不同的。

你在子 select 中把 symbol_id 和它自己进行比较,而不是和一个特定的值比较(这就意味着它总是为真)。

为了防止可能的 SQL 注入,你应该始终在 WHERE 子句中插入值,包括子 select

注意:我假设你在使用 psycopg2 来访问 Postgres,并且表的主键是一个 (symbol_id, price_date) 的元组。如果不是,下面的代码可能需要稍微调整一下。

考虑到这些点,可以尝试这样的代码(未经测试,因为我没有你的数据、数据库等,但它在语法上是有效的 Python):

def insert_daily_data_into_db(data_vendor_id, symbol_id, daily_data):
    """Takes a list of tuples of daily data and adds it to the
    database. Appends the vendor ID and symbol ID to the data.

    daily_data: List of tuples of the OHLC data (with
    adj_close and volume)"""


    column_list = ["data_vendor_id", "symbol_id", "price_date", "created_date",
                    "last_updated_date", "open_price", "high_price", "low_price",
                    "close_price", "volume", "adj_close_price"]

    insert_list = ['%s'] * len(column_str)

    values_tuple = (data_vendor_id, symbol_id, daily_data[0], 'now()', 'now()', daily_data[1],
                    daily_data[2], daily_data[3], daily_data[4], daily_data[5], daily_data[6])

    final_str = """INSERT INTO daily_price ({0})
                     VALUES ({1})
                     WHERE NOT EXISTS (SELECT 1
                                       FROM daily_price
                                       WHERE symbol_id = %s
                                         AND price_date = %s)""".format(', '.join(column_list), ', '.join(insert_list))

    # Using the postgre connection, carry out an INSERT INTO for every symbol
    with con:
        cur = con.cursor()
        cur.execute(final_str, values_tuple, values_tuple[1], values_tuple[2])

撰写回答