如何在循环中更新SQL Alchemy中的记录
我正在尝试使用SQLSoup,这是SQLAlchemy的一个扩展,来更新SQL Server 2008数据库中的记录。我使用pyodbc来进行连接。但是有很多问题,让我很难找到相关的例子。
我正在对一个非常大的表(超过200万条记录)中的几何字段进行重投影,所以很多标准的更新字段的方法都无法使用。我需要从几何字段中提取坐标,转换成文本,然后再传回去。这个过程都没问题,所有的单独步骤都能正常工作。
不过,我想在遍历每一条记录时,对每一行执行一个SQL更新语句。我猜这会对记录集加锁,或者连接正在使用中——因为如果我使用下面的代码,它在成功更新第一条记录后就会卡住。
如果有关于如何创建新连接、重用现有连接或用其他方式完成这个任务的建议,我会很感激。
s = select([text("%s as fid" % id_field),
text("%s.STAsText() as wkt" % geom_field)],
from_obj=[feature_table])
rs = s.execute()
for row in rs:
new_wkt = ReprojectFeature(row.wkt)
update_value = "geometry :: STGeomFromText('%s',%s)" % (new_wkt, "3785")
update_sql = ("update %s set GEOM3785 = %s where %s = %i" %
(full_name, update_value, id_field, row.fid))
conn = db.connection()
conn.execute(update_sql)
conn.close() #or not - no effect..
更新后的代码现在看起来是这样的。它在处理几条记录时工作得很好,但在处理整个表时就会卡住,所以我猜是读取的数据太多了。
db = SqlSoup(conn_string)
#create outer query
Session = sessionmaker(autoflush=False, bind=db.engine)
session = Session()
rs = session.execute(s)
for row in rs:
#create update sql...
session.execute(update_sql)
session.commit()
现在我遇到了连接忙的错误。
DBAPIError: (Error) ('HY000', '[HY000] [Microsoft][ODBC SQL Server Driver]Connection is busy with results for another hstmt (0) (SQLExecDirectW)')
看起来这可能是ODBC驱动的问题 - http://sourceitsoftware.blogspot.com/2008/06/connection-is-busy-with-results-for.html
进一步更新:
在服务器上使用分析工具时,它显示选择语句和第一个更新语句都在“开始”状态,但都没有完成。如果我将选择语句设置为返回前10条记录,那么它就会完成,更新也会执行。
SQL: Batch Starting Select...
SQL: Batch Starting Update...
我认为这是与pyodbc和SQL Server驱动有关的问题。如果我去掉SQLAlchemy,直接用pyodbc执行相同的SQL,它也会卡住。即使我为更新创建一个新的连接对象。
我还尝试了SQL Server Native Client 10.0驱动,它本来是为了支持MARS - 多个活动结果集,但没有任何改善。最后,我不得不“分页结果”,使用pyodbc和SQL更新这些批次(见下文),不过我原本以为SQLAlchemy可以自动为我完成这个任务。
3 个回答
在我找到其他解决方案之前,我正在使用一个连接和自定义的SQL语句来返回一组记录,并且以批量的方式更新这些记录。我觉得我做的这个方法并不是特别独特,所以我不明白为什么我不能同时处理多个结果集。
下面的代码可以运行,但速度非常非常慢……
cnxn = pyodbc.connect(conn_string, autocommit=True)
cursor = cnxn.cursor()
#get total recs in the database
s = "select count(fid) as count from table"
count = cursor.execute(s).fetchone().count
#choose number of records to update in each iteration
batch_size = 100
for i in range(1,count, batch_size):
#sql to bring back relevant records in each batch
s = """SELECT fid, wkt from(select ROW_NUMBER() OVER(ORDER BY FID ASC) AS 'RowNumber'
,FID
,GEOM29902.STAsText() as wkt
FROM %s) features
where RowNumber >= %i and RowNumber <= %i""" % (full_name,i,i+batch_size)
rs = cursor.execute(s).fetchall()
for row in rs:
new_wkt = ReprojectFeature(row.wkt)
#...create update sql statement for the record
cursor.execute(update_sql)
counter += 1
cursor.close()
cnxn.close()
试着使用一个 会话。
rs = s.execute()
现在变成了 session.execute(rs)
,你可以把最后三行替换成 session.execute(update_sql)
。我还建议你把会话设置为自动提交关闭,并在最后调用 session.commit()
。
我建议你在程序卡住的时候,可以在SQL服务器上运行一下sp_who2
命令,看看发生了什么。检查一下有没有被阻塞的进程ID(spid),看看SQL代码里有没有什么能提示你问题的线索。如果你发现有一个进程在阻塞其他进程,可以用dbcc inputbuffer(*spidid*)
命令,看看它执行的查询是什么。除此之外,你还可以使用SQL Profiler来跟踪你的调用。
有时候,SQL服务器上的并行处理也可能导致阻塞。如果这不是一个数据仓库,我建议你把最大并行度(Max DOP)关掉,设置为1。告诉我,如果明天我再检查的时候你需要帮助,我会很乐意帮忙。