Python cx_oracle多线程处理不适用于每个线程的游标

2024-05-28 19:52:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试使用cx_Oracle在python中并行运行完全独立的Oracle查询。在

我可以通过为每个线程设置一个新的数据库连接,然后在每个单独的线程中运行查询,这将使总时间从大约2分钟增加到1分钟20,因此它肯定可以工作。查询计时:

START_TIME                      END_TIME
17-FEB-16 22.33.28.000000000    17-FEB-16 22.33.30.000000000
17-FEB-16 22.33.30.000000000    17-FEB-16 22.33.33.000000000
17-FEB-16 22.33.33.000000000    17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000    17-FEB-16 22.33.36.000000000
17-FEB-16 22.33.36.000000000    17-FEB-16 22.34.08.000000000
17-FEB-16 22.34.08.000000000    17-FEB-16 22.34.26.000000000
17-FEB-16 22.34.26.000000000    17-FEB-16 22.34.27.000000000
17-FEB-16 22.34.27.000000000    17-FEB-16 22.34.29.000000000

但是,在每个线程中设置到数据库的连接会有开销,我很确定我应该能够为每个线程创建一个新的游标并共享连接,如下所示:

http://www.oracle.com/technetwork/articles/vasiliev-python-concurrency-087536.html

但是,当我共享连接并使用一个单独的游标时,会发生的情况是,所有查询同时开始,然后在同一时间结束,这样看来,当线程被派生时,数据库上的查询仍在按顺序运行。查询计时:

^{pr2}$

多连接代码:

for file in file_transporter.complete_file_list:
        #Get database and open connection
        the_db =      shared_lib_wrapper.get_oracle().Oracle(the_logger)
        the_db .connect(conn_str())
        #Create new thread
        thread = threading.Thread(target=Loader, args=(params, the_date, the_logger, the_db, file, file_transporter.complete_file_list[file]))
        the_logger.info("Running Thread: " + thread.getName())
        thread.start()

多光标的代码(在runLoad中有一个函数可以创建一个新的游标并执行一个过程-见下文):

for file in self.file_list:
        file_parametes = self.file_list[file]
        function_to_run = file_parametes['LOAD_PACKAGE'] + '.' + file_parametes['LOAD_FUNCTION']

        #Create new thread
        thread = threading.Thread(target=self.runLoad, args=(file_parametes['RUN_ID'], function_to_run))
        self.log.info("Spawned Thread: " + thread.getName())
        self.log.info("Running Thread: " + thread.getName())
        thread.start()

创建游标的代码:

def execute_stored_proc_with_in_and_out_params(self, proc_name, params, dbms_logging=False):
    try:
        cursor = cx_Oracle.Cursor(self.db_conn

因此,我的问题是:

1)我是否在创建光标时出错?-如果有关于如何修复它的任何想法,我已经读到cx峎u oracle是threadsafety 2:

Currently 2, which means that threads may share the module and connections, but not cursors.

2)如果我不能共享连接,那么为每个线程创建一个新的连接是否有问题,即使创建每个连接的开销也似乎仍然可以提高我的性能?在


Tags: theself数据库db线程threadfeblist
2条回答

请看下面这是一个程序的工作实现,使用相同的连接,但在每个线程中有一个单独的游标。我调用的过程是在cxu Oracle测试用例(5.2.1版本的一部分)中的,非常简单,所以我在示例中多次调用它(每个线程中的一个随机数)。输出清楚地表明线程不会同时完成。在

from __future__ import print_function

import cx_Oracle
import datetime
import random
import threading

connection = cx_Oracle.Connection("cx_Oracle/dev", threaded = True)

def TestThread(threadNum):
     startTime = datetime.datetime.today()
     cursor = connection.cursor()
     numInputs = int(random.random() * 5000)
     print("Thread", threadNum, "with", numInputs, "inputs:", startTime)
     for i in range(numInputs):
         value = bool(int(random.random() * 2))
         cursor.callfunc("pkg_TestBooleans.GetStringRep", str, (value,))
     endTime = datetime.datetime.today()
     print("Thread", threadNum, "with", numInputs, "inputs:", endTime)

 threads = []
 for i in range(8):
     thread = threading.Thread(target = TestThread, args = (i + 1,))
     threads.append(thread)
     thread.start()
 print("All threads spawned...waiting for them to complete...")
 for thread in threads:
     thread.join()

输出如下:

线程1,输入3405:2016-02-22 07:55:07.849127
线程2,输入2706个:2016-02-22 07:55:07.849998
线程3,输入4101:2016-02-22 07:55:07.850256
线程4,输入2912个:2016-02-22 07:55:07.850937
线程5,3747个输入:2016-02-22 07:55:07.851275
线程6,输入4318个:2016-02-22 07:55:07.851534
线程7,1453个输入:2016-02-22 07:55:07.852649
线程8,输入3304:2016-02-22 07:55:07.853090
已生成所有线程…正在等待它们完成…
线程7,输入1453个:2016-02-22 07:55:09.897217
线程2,输入2706个:2016-02-22 07:55:11.446744
线程4,输入2912个:2016-02-22 07:55:11.681414
线程8,输入3304:2016-02-22 07:55:12.016809
线程1,输入3405:2016-02-22 07:55:12.081846
线程5,3747个输入:2016-02-22 07:55:12.266111
线程3,输入4101:2016-02-22 07:55:12.375623
线程6,输入4318:2016-02-22 07:55:12.409352

from concurrent.futures import ThreadPoolExecutor, as_completed
import cx_Oracle
import datetime

CONN_INFO = {
    'host': 'xxx.xx.xx.x',
    'port': 99999,
    'user': 'user_name',
    'psw': 'password',
    'service': 'abc.xyz.com',
}

CONN_STR = '{user}/{psw}@{host}:{port}/{service}'.format(**CONN_INFO)

# your long running query
QUERY = 'SELECT FROM * customer where date = :date'

def run(date):
    conn = cx_Oracle.connect(CONN_STR, threaded=True)
    cursor = conn.cursor()
    cursor.execute(QUERY, {'date': date})    
    data = cursor.fetchall()
    cursor.close()

    return data

def main():
    dates = [datetime.datetime.today() - datetime.timedelta(days=x) for x in range(0, 30)] 
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(run, d) for d in dates]

        for future in as_completed(futures):
            # process your records from each thread
            # process_records(future.result())


if __name__ == '__main__':
    main()

相关问题 更多 >

    热门问题