用Python每日更新PostgreSQL数据库股票价格
我在QuantState网站上找到了一段很棒的脚本,它详细讲解了如何建立自己的证券数据库,并加载历史价格信息。不过,我想对这个脚本进行一些修改,让它能够每天运行,并添加最新的股票报价。
我调整了初始数据加载的部分,只下载了一周的历史数据,但在写SQL语句时遇到了一些问题,想要在添加数据之前先检查一下这行数据是否已经存在。有没有人能帮我解决这个问题?以下是我目前的代码:
def insert_daily_data_into_db(data_vendor_id, symbol_id, daily_data):
"""Takes a list of tuples of daily data and adds it to the
database. Appends the vendor ID and symbol ID to the data.
daily_data: List of tuples of the OHLC data (with
adj_close and volume)"""
# Create the time now
now = datetime.datetime.utcnow()
# Amend the data to include the vendor ID and symbol ID
daily_data = [(data_vendor_id, symbol_id, d[0], now, now,
d[1], d[2], d[3], d[4], d[5], d[6]) for d in daily_data]
# Create the insert strings
column_str = """data_vendor_id, symbol_id, price_date, created_date,
last_updated_date, open_price, high_price, low_price,
close_price, volume, adj_close_price"""
insert_str = ("%s, " * 11)[:-2]
final_str = "INSERT INTO daily_price (%s) VALUES (%s) WHERE NOT EXISTS (SELECT 1 FROM daily_price WHERE symbol_id = symbol_id AND price_date = insert_str[2])" % (column_str, insert_str)
# Using the postgre connection, carry out an INSERT INTO for every symbol
with con:
cur = con.cursor()
cur.executemany(final_str, daily_data)
1 个回答
1
关于你上面的代码,有几点需要注意:
一般来说,尽量在纯 SQL 中使用 now()
,而不是在 Python 中处理时间,这样可以避免很多与时区、库差异等相关的问题。
如果你构建一个 列 的列表,可以根据列表的大小动态生成一串 %s
,这样就不需要把长度硬编码到一个重复的字符串中,然后再进行切片。
看起来 insert_daily_data_into_db
是打算在一个循环中按股票来调用的,所以我觉得你不应该在这里使用 executemany
,因为它需要一个 元组 的 列表,这在语义上是很不同的。
你在子 select
中把 symbol_id 和它自己进行比较,而不是和一个特定的值比较(这就意味着它总是为真)。
为了防止可能的 SQL 注入,你应该始终在 WHERE
子句中插入值,包括子 select
。
注意:我假设你在使用 psycopg2 来访问 Postgres,并且表的主键是一个 (symbol_id, price_date)
的元组。如果不是,下面的代码可能需要稍微调整一下。
考虑到这些点,可以尝试这样的代码(未经测试,因为我没有你的数据、数据库等,但它在语法上是有效的 Python):
def insert_daily_data_into_db(data_vendor_id, symbol_id, daily_data):
"""Takes a list of tuples of daily data and adds it to the
database. Appends the vendor ID and symbol ID to the data.
daily_data: List of tuples of the OHLC data (with
adj_close and volume)"""
column_list = ["data_vendor_id", "symbol_id", "price_date", "created_date",
"last_updated_date", "open_price", "high_price", "low_price",
"close_price", "volume", "adj_close_price"]
insert_list = ['%s'] * len(column_str)
values_tuple = (data_vendor_id, symbol_id, daily_data[0], 'now()', 'now()', daily_data[1],
daily_data[2], daily_data[3], daily_data[4], daily_data[5], daily_data[6])
final_str = """INSERT INTO daily_price ({0})
VALUES ({1})
WHERE NOT EXISTS (SELECT 1
FROM daily_price
WHERE symbol_id = %s
AND price_date = %s)""".format(', '.join(column_list), ', '.join(insert_list))
# Using the postgre connection, carry out an INSERT INTO for every symbol
with con:
cur = con.cursor()
cur.execute(final_str, values_tuple, values_tuple[1], values_tuple[2])