使用命名变量和批量插入的Postgres Python
我需要一些帮助,想了解一下Python和Postgres是如何处理事务和批量插入的,特别是在一次事务中插入多个数据集的情况。
- 操作系统:Windows 7 64位
- Python版本:3.2
- Postgresql版本:9.1
- 使用的库:psycopg2
这是我的情况: 我正在把一个数据库(Oracle)中的数据转换成XML字符串,然后把这些数据插入到一个新的数据库(Postgres)中。因为数据量很大,所以我想优化我的插入操作。我把这些数据看作是图书馆类型的对象,所以我有一个图书馆表,还有用于存储XML元数据和内容的表,这些数据在数据库中的字段类型都是文本。我从Oracle中提取数据,然后创建我需要插入的数据字典。我有三个插入语句,第一个插入语句在图书馆表中创建一条记录,使用一个序列ID,而这个ID在接下来的两个查询中插入XML到元数据和内容表时是必需的。下面是我所说的一个例子:
for inputKey in libDataDict.keys():
metaString = libDataDict[inputKey][0]
contentString = libDataDict[inputKey][1]
insertLibDataList.append({'objIdent':"%s" % inputKey, 'objName':"%s" % inputKey, objType':libType})
insertMetadataDataList.append({'objIdent':inputKey,'objMetadata':metaString})
insertContentDataList.append({'objIdent':inputKey, 'objContent':contentString})
dataDict['cmsLibInsert'] = insertLibDataList
dataDict['cmsLibMetadataInsert'] = insertMetadataDataList
dataDict['cmsLibContentInsert'] = insertContentDataList
sqlDict[0] = {'sqlString':"insert into cms_libraries (cms_library_ident, cms_library_name, cms_library_type_id, cms_library_status_id) \
values (%(objIdent)s, %(objName)s, (select id from cms_library_types where cms_library_type_name = %(objType)s), \
(select id from cms_library_status where cms_library_status_name = 'active'))", 'data':dataDict['cmsLibInsert']}
sqlDict[1] = {'sqlString':"insert into cms_library_metadata (cms_library_id, cms_library_metadata_data) values \
((select id from cms_libraries where cms_library_ident = %(objIdent)s), $$%(objMetadata)s$$)", \
'data':dataDict['cmsLibMetadataInsert']}
sqlDict[2] = {'sqlString':"insert into cms_library_content (cms_library_id, cms_library_content_data) values \
((select id from cms_libraries where cms_library_ident = %(objIdent)s), $$%(objContent)s$$)", \
'data':dataDict['cmsLibContentInsert']}
bulkLoadData(myConfig['pgConn'], myConfig['pgCursor'], sqlDict)
我遇到的问题是,当我运行第一个查询(sqlDict[0])并进行插入时,只要我分开执行并在运行接下来的两个查询之前提交,所有操作都能正常进行。理想情况下,我希望所有这些查询都在同一个事务中,但它失败了,因为在第二和第三个查询中找不到cms_libraries表的ID。 这是我目前的插入代码:
def bulkLoadData(dbConn, dbCursor, sqlDict):
try:
libInsertSql = sqlDict.pop(0)
dbSql = libInsertSql['sqlString']
data = libInsertSql['data']
dbCursor.executemany(dbSql, data)
dbConn.commit()
for sqlKey in sqlDict:
dbSql = sqlDict[sqlKey]['sqlString']
data = sqlDict[sqlKey]['data']
dbCursor.executemany(dbSql, data)
dbConn.commit()
之前我是在查询中追加值,然后为每个插入运行一个查询。这样做的话,我可以把所有操作放在同一个事务中,并且能找到生成的ID,一切都很好。我不明白为什么在使用executemany()进行批量插入时找不到ID?有没有办法在同一个事务中进行批量插入和其他两个查询?
我一直在阅读相关文档,并在StackOverflow和互联网上搜索,但没有找到解决我问题的答案: pyscopg文档 还有Postgres的: Postgresql字符串文档
任何帮助、建议或评论都将不胜感激。 谢谢, Mitch
1 个回答
这里你有两个选择。要么在外部生成ID(这样你可以进行批量插入),要么从序列中生成ID(这意味着你只能一个一个地插入)。我觉得外部ID生成和批量加载的过程很简单,虽然我建议你使用一个ETL工具,而不是自己用python手动编码。如果你需要从序列中获取ID,那么你应该考虑使用服务器端的预处理语句。
你的第一个语句应该像下面这样:
dbCursor.execute("""
PREPARE cms_lib_insert (bigint, text, text) AS
INSERT INTO cms_libraries (cms_library_ident, cms_library_name, cms_library_type_id, cms_library_status_id)
VALUES ($1, $2,
(select id from cms_library_types where cms_library_type_name = $3),
(select id from cms_library_status where cms_library_status_name = 'active')
)
RETURNING cms_library.id
""")
你只需要在启动时运行一次这个语句。然后你会想要在每次插入时运行下面的EXECUTE语句。
dbCursor.execute("""
EXECUTE cms_lib_insert(%(objIndent)s, %(objName)s, %(objType)s)
""", {'objIndent': 345, 'objName': 'foo', 'objType': 'bar'))
my_new_id = dbCursor.fetchone()[0]
这个语句会返回生成的序列ID。接下来,我强烈建议你不要继续现在这种试图抽象数据库通信的方式(你现在的sqlDict方法),而是采用更直接的编码方式(聪明的做法在这里反而是敌人,因为它会让性能调优变得更难)。
你需要将插入操作批量处理成一个适合性能的块大小。这意味着你需要根据实际情况调整你的BLOCK_SIZE。你的代码应该像下面这样:
BLOCK_SIZE = 500
while not_done:
dbCursor.begin()
for junk in irange(BLOCK_SIZE):
dbCursor.execute("EXECUTE cms_lib_insert(...)")
cms_lib_id = dbCursor.fetchone()[0] # you're using this below.
dbCursor.executemany("EXECUTE metadata_insert(...)")
dbCursor.executemany("EXECUTE library_insert(...)")
dbCursor.commit()
如果你需要达到更高的性能水平,下一步就是构建一个插入处理函数,它可以处理依赖表的行数组。我不推荐这样做,因为这很快会变成维护的噩梦。