cx-oracle中的并行执行
我最近加入了一家公司,刚开始接触Python(他们喜欢用的脚本语言),现在正在使用cx_oracle来创建一些ETL(提取、转换、加载)流程。我目前写的脚本都是单线程的,主要是从Oracle数据库中选择我需要的列,然后把结果写入一个命名管道,外部的进程会在那儿等着读取这些数据并插入到目标位置。
这个方法一直都很有效,直到我遇到一些表,行数在5亿到20亿之间。虽然这个任务还是能完成,但花费的时间却非常长,通常需要几个小时。这些大表是分区的,所以我一直在研究如何协调并行读取不同分区的方法,这样我就可以让两个或更多的线程同时工作,每个线程写入一个不同的命名管道。
在cx-oracle中,有没有简单的方法可以处理多个线程从同一张表的不同分区读取数据呢?
这是我目前的(简单的)代码:
import cx_Oracle
import csv
# connect via SQL*Net string or by each segment in a separate argument
connection = cx_Oracle.connect("user/password@TNS")
csv.register_dialect('pipe_delimited', escapechar='\\' delimiter='|',quoting=csv.QUOTE_NONE)
cursor = connection.cursor()
f = open("<path_to_named_pipe>", "w")
writer = csv.writer(f, dialect='pipe_delimited', lineterminator="\n")
r = cursor.execute("""SELECT <column_list> from <SOURCE_TABLE>""")
for row in cursor:
writer.writerow(row)
f.close()
我的一些源表有超过1000个分区,所以直接把分区名称写死在代码里并不是个好主意。我在考虑设置一个分区名称的数组,然后逐个遍历,但如果大家有其他的想法,我很想听听。
1 个回答
1
首先,你需要确认一下 *cx_Oracle* 是否支持多线程。因为它遵循了 Python 数据库 API 规范 v2.0,所以你只需要查看一下 threadsafety
这个模块的全局变量。
如果值是 2
或 3
,那就说明你可以同时打开多个数据库连接,并且可以同时执行多个查询。最简单的方法是使用 threading 模块,这个模块使用起来非常简单。这里有一篇简短易懂的文章,可以帮助你入门。
当然,不能保证将你的查询进行流水线处理一定会显著提高性能(可能是因为数据库引擎、输入输出等原因),但尝试一下是绝对值得的。祝你好运!