我正试图为标普500 ETF创建一个包含30分钟数据的PostgreSQL表 (spy30new,用于测试新插入的数据)来自一个包含15分钟数据(全部15分钟)的多个股票的表格。所有15个都有一个关于“dt”(时间戳)和“instr”(股票符号)的索引。我想让spy30new有一个关于“dt”的索引。
import numpy as np
import pandas as pd
from datetime import datetime, date, time, timedelta
from dateutil import parser
from sqlalchemy import create_engine
# Query all15
engine = create_engine('postgresql://user:passwd@localhost:5432/stocks')
new15Df = (pd.read_sql_query("SELECT dt, o, h, l, c, v FROM all15 WHERE (instr = 'SPY') AND (date(dt) BETWEEN '2016-06-27' AND '2016-07-15');", engine)).sort_values('dt')
# Correct for Time Zone.
new15Df['dt'] = (new15Df['dt'].copy()).apply(lambda d: d + timedelta(hours=-4))
# spy0030Df contains the 15-minute data at 00 & 30 minute time points
# spy1545Df contains the 15-minute data at 15 & 45 minute time points
spy0030Df = (new15Df[new15Df['dt'].apply(lambda d: d.minute % 30) == 0]).reset_index(drop=True)
spy1545Df = (new15Df[new15Df['dt'].apply(lambda d: d.minute % 30) == 15]).reset_index(drop=True)
high = pd.concat([spy1545Df['h'], spy0030Df['h']], axis=1).max(axis=1)
low = pd.concat([spy1545Df['l'], spy0030Df['l']], axis=1).min(axis=1)
volume = spy1545Df['v'] + spy0030Df['v']
# spy30Df assembled and pushed to PostgreSQL as table spy30new
spy30Df = pd.concat([spy0030Df['dt'], spy1545Df['o'], high, low, spy0030Df['c'], volume], ignore_index = True, axis=1)
spy30Df.columns = ['d', 'o', 'h', 'l', 'c', 'v']
spy30Df.set_index(['dt'], inplace=True)
spy30Df.to_sql('spy30new', engine, if_exists='append', index_label='dt')
这会产生错误“ValueError:无法将DatetimeIndex强制转换为dtype datetime64[us]”
到目前为止我已经尝试过(我已经成功地使用pandas将CSV文件推送到PG。但这里的来源是一个PG数据库):
不在'dt'
上放置索引
spy30Df.set_index(['dt'], inplace=True) # Remove this line
spy30Df.to_sql('spy30new', engine, if_exists='append') # Delete the index_label option
使用to_pydatetime()
将'dt'从pandas.tslib.Timestamp类型转换为datetime.datetime(以防psycopg2可以使用python dt,但不能使用pandas Timestamp)
u = (spy0030Df['dt']).tolist()
timesAsPyDt = np.asarray(map((lambda d: d.to_pydatetime()), u))
spy30Df = pd.concat([spy1545Df['o'], high, low, spy0030Df['c'], volume], ignore_index = True, axis=1)
newArray = np.c_[timesAsPyDt, spy30Df.values]
colNames = ['dt', 'o', 'h', 'l', 'c', 'v']
newDf = pd.DataFrame(newArray, columns=colNames)
newDf.set_index(['dt'], inplace=True)
newDf.to_sql('spy30new', engine, if_exists='append', index_label='dt')
使用datetime.utcfromtimestamp()
timesAsDt = (spy0030Df['dt']).apply(lambda d: datetime.utcfromtimestamp(d.tolist()/1e9))
使用pd.to_datetime()
timesAsDt = pd.to_datetime(spy0030Df['dt'])
实际上,这是我的数据框。
我试图插入到SQL数据库中,但得到了与上述问题相同的错误。我所做的是,将数据帧的索引转换为带有标签“index”的列。
使用此代码将列名“index”重命名为“DateTime”。
将数据类型更改为“datetime64”。
使用这些代码将其存储在sql数据库中。
对每个工作的元素使用pd.to_datetime()。选项4不起作用,将pd.to_datetime()应用于整个序列。或许Postgres驱动程序理解python的datetime,但不理解pandas&numpy中的datetime64。选项4产生了正确的输出,但是我在将DF发送到Postgres时得到ValueError(参见标题)
我也有同样的问题,在每个元素上应用
pd.to_datetime()
也可以。但它比在整个序列上运行pd.to_datetime()
要慢几个数量级。对于超过100万行的数据帧:大约需要70秒
以及
大约需要0.01秒
实际的问题是正在包括时区信息。要删除它:
这应该快得多!
相关问题 更多 >
编程相关推荐