如何使用SQLAlchemy从查询表达式生成SQL文件以批量插入到DBMS中?

12 投票
3 回答
6092 浏览
提问于 2025-04-15 22:58

请耐心听我解释一下问题,我是怎么尝试解决的,最后会有我的问题,关于如何改进。

我有一个包含10万行的CSV文件,这是一个离线批处理的结果,我需要把它插入到数据库中,按照正确的模型来放。通常情况下,如果这个加载过程比较简单,只需要把CSV文件处理一下,让它符合数据库的格式就行了;但是,我需要进行一些外部处理,这需要查询数据,使用SQLAlchemy来生成我想要的数据会方便很多。

我想要的数据有3个模型,分别对应数据库中3个已经存在的表,而且每个模型都依赖于前一个模型。比如:

Model C --> Foreign Key --> Model B --> Foreign Key --> Model A

所以,这些模型必须按照A、B、C的顺序插入。我想出了一个生产者/消费者的方法:

 - instantiate a multiprocessing.Process which contains a
 threadpool of 50 persister threads that have a threadlocal 
 connection to a database

 - read a line from the file using the csv DictReader

 - enqueue the dictionary to the process, where each thread creates
 the appropriate models by querying the right values and each
 thread persists the models in the appropriate order

这个方法比不使用线程的读取和保存要快,但还是比直接把文件批量加载到数据库慢得多。这个任务大约花了45分钟才完成保存。为了好玩,我决定用SQL语句来写,只花了5分钟

不过,写这些SQL语句花了我几个小时。所以我的问题是,我能不能用一种更快的方法来用SQLAlchemy插入数据?我了解到,SQLAlchemy并不是为了批量插入操作而设计的,所以这并不是最理想的选择。

接下来我的问题是,有没有办法用SQLAlchemy生成SQL语句,把它们放到一个文件里,然后直接用批量加载的方式插入到数据库?我知道str(model_object)这个方法,但它并不显示插入的具体值。

我会很感激任何关于如何更快做到这一点的建议。

谢谢!

3 个回答

0

我想说,Python脚本中花费的时间主要是在每条记录上传的部分。要确定这一点,你可以选择把结果写入CSV文件,或者直接丢弃结果,而不是上传新的记录。这样可以帮助你找出瓶颈在哪里;至少从查找和插入的角度来看。如果我猜得没错,瓶颈确实在这里,你可以利用大多数数据库系统提供的批量导入功能。在这种情况下,逐条插入记录是没有必要的,甚至还有一些反对的理由。

批量导入通常会进行一些有趣的优化,比如把所有操作当作一个事务来处理,而不是每条记录都提交一次(仅仅这样做就能显著减少运行时间);所以在处理大量记录时,我建议使用批量插入。你仍然可以使用生产者/消费者的方式,但消费者可以把值存储在内存中或文件里,然后再调用特定于你使用的数据库的批量导入语句。如果你需要对CSV文件中的每条记录进行处理,这可能是一个不错的选择。如果是这样,我还建议考虑一下有多少数据可以缓存并在记录之间共享。

还有一种可能性是,瓶颈出现在使用SQLAlchemy上。不是说它本身有什么问题,但考虑到你正在做的事情,它可能需要比必要的处理更多的资源——这从运行时间的8倍差异就能看出来。

为了好玩,既然你已经知道SQL了,可以尝试在Python中使用直接的DBAPI模块来操作,并比较一下运行时间。

3

一般来说,不行,你无法获取包含值的查询。

不过,你在用什么数据库呢?因为很多数据库都有处理CSV文件的批量加载功能。

如果你愿意接受某些值可能没有正确处理的情况,那么你可以使用我为调试目的写的这个小技巧:

'''Replace the parameter placeholders with values'''
params = compiler.params.items()
params.sort(key=lambda (k, v): len(str(k)), reverse=True)
for k, v in params:
    '''Some types don't need escaping'''
    if isinstance(v, (int, long, float, bool)):
        v = unicode(v)
    else:
        v = "'%s'" % v

    '''Replace the placeholders with values
    Works both with :1 and %(foo)s type placeholders'''
    query = query.replace(':%s' % k, v)
    query = query.replace('%%(%s)s' % k, v)
2

首先,除非你真的有一台有50个CPU核心的机器,否则使用50个线程或进程并不会提高性能,反而会让事情变得更慢。

其次,我觉得如果你使用SQLAlchemy的方式来一次性插入多个值,会比一个一个创建ORM对象并保存要快得多。

撰写回答