如何在循环中更新SQL Alchemy中的记录

2 投票
3 回答
5226 浏览
提问于 2025-04-16 11:16

我正在尝试使用SQLSoup,这是SQLAlchemy的一个扩展,来更新SQL Server 2008数据库中的记录。我使用pyodbc来进行连接。但是有很多问题,让我很难找到相关的例子。

我正在对一个非常大的表(超过200万条记录)中的几何字段进行重投影,所以很多标准的更新字段的方法都无法使用。我需要从几何字段中提取坐标,转换成文本,然后再传回去。这个过程都没问题,所有的单独步骤都能正常工作。

不过,我想在遍历每一条记录时,对每一行执行一个SQL更新语句。我猜这会对记录集加锁,或者连接正在使用中——因为如果我使用下面的代码,它在成功更新第一条记录后就会卡住。

如果有关于如何创建新连接、重用现有连接或用其他方式完成这个任务的建议,我会很感激。

s = select([text("%s as fid" % id_field),
            text("%s.STAsText() as wkt" % geom_field)],
           from_obj=[feature_table])

rs = s.execute()

for row in rs:
    new_wkt = ReprojectFeature(row.wkt)

    update_value = "geometry :: STGeomFromText('%s',%s)" % (new_wkt, "3785")
    update_sql = ("update %s set GEOM3785 = %s where %s = %i" %
                  (full_name, update_value, id_field, row.fid))

    conn = db.connection()
    conn.execute(update_sql)
    conn.close() #or not - no effect..

更新后的代码现在看起来是这样的。它在处理几条记录时工作得很好,但在处理整个表时就会卡住,所以我猜是读取的数据太多了。

db = SqlSoup(conn_string)
#create outer query

Session = sessionmaker(autoflush=False, bind=db.engine)
session = Session()
rs = session.execute(s)

for row in rs: 
    #create update sql...
    session.execute(update_sql)
session.commit()

现在我遇到了连接忙的错误。

DBAPIError: (Error) ('HY000', '[HY000] [Microsoft][ODBC SQL Server Driver]Connection is busy with results for another hstmt (0) (SQLExecDirectW)')

看起来这可能是ODBC驱动的问题 - http://sourceitsoftware.blogspot.com/2008/06/connection-is-busy-with-results-for.html

进一步更新:

在服务器上使用分析工具时,它显示选择语句和第一个更新语句都在“开始”状态,但都没有完成。如果我将选择语句设置为返回前10条记录,那么它就会完成,更新也会执行。

SQL: Batch Starting   Select...
SQL: Batch Starting   Update...

我认为这是与pyodbc和SQL Server驱动有关的问题。如果我去掉SQLAlchemy,直接用pyodbc执行相同的SQL,它也会卡住。即使我为更新创建一个新的连接对象。

我还尝试了SQL Server Native Client 10.0驱动,它本来是为了支持MARS - 多个活动结果集,但没有任何改善。最后,我不得不“分页结果”,使用pyodbc和SQL更新这些批次(见下文),不过我原本以为SQLAlchemy可以自动为我完成这个任务。

3 个回答

1

在我找到其他解决方案之前,我正在使用一个连接和自定义的SQL语句来返回一组记录,并且以批量的方式更新这些记录。我觉得我做的这个方法并不是特别独特,所以我不明白为什么我不能同时处理多个结果集。

下面的代码可以运行,但速度非常非常慢……

cnxn = pyodbc.connect(conn_string, autocommit=True)
cursor = cnxn.cursor()

#get total recs in the database
s = "select count(fid) as count from table"
count = cursor.execute(s).fetchone().count

#choose number of records to update in each iteration
batch_size = 100
for i in range(1,count, batch_size):
    #sql to bring back relevant records in each batch
    s = """SELECT fid, wkt from(select ROW_NUMBER() OVER(ORDER BY FID ASC) AS 'RowNumber'
,FID
,GEOM29902.STAsText() as wkt
    FROM %s) features
    where RowNumber >= %i and RowNumber <= %i""" % (full_name,i,i+batch_size)

    rs = cursor.execute(s).fetchall()
    for row in rs:
        new_wkt = ReprojectFeature(row.wkt)
        #...create update sql statement for the record
        cursor.execute(update_sql)
        counter += 1
cursor.close()
cnxn.close()  
1

试着使用一个 会话

rs = s.execute() 现在变成了 session.execute(rs),你可以把最后三行替换成 session.execute(update_sql)。我还建议你把会话设置为自动提交关闭,并在最后调用 session.commit()

1

我建议你在程序卡住的时候,可以在SQL服务器上运行一下sp_who2命令,看看发生了什么。检查一下有没有被阻塞的进程ID(spid),看看SQL代码里有没有什么能提示你问题的线索。如果你发现有一个进程在阻塞其他进程,可以用dbcc inputbuffer(*spidid*)命令,看看它执行的查询是什么。除此之外,你还可以使用SQL Profiler来跟踪你的调用。

有时候,SQL服务器上的并行处理也可能导致阻塞。如果这不是一个数据仓库,我建议你把最大并行度(Max DOP)关掉,设置为1。告诉我,如果明天我再检查的时候你需要帮助,我会很乐意帮忙。

撰写回答