这是我系统的一部分的示意图。
我有一个返回数据的存储过程。我需要用多个输入参数调用这个存储过程。所以我写了python多处理脚本。但似乎有些问题。在
代码:
import cx_Oracle
import dataextract as tde
import os, sys
import time
from multiprocessing import Pool
import multiprocessing, logging
from multiprocessing.pool import ApplyResult
def __enter__():
__db = cx_Oracle.Connection("TEST/test123@test.net:12345/test.world")
__cursor = __db.cursor()
return __cursor, __db
def __exit__(__cursor, __db):
__cursor.close()
__db.close()
def get_context_id(__cursor, context_list):
context_id = __cursor.arrayvar(cx_Oracle.NUMBER, context_list)
return context_id
def find_DAC_detail(input):
cntxt_id = input [0]
min_row_id = input[1]
max_row_id = input[2]
print "[",time.asctime(), "]", cntxt_id, min_row_id, max_row_id
print "[",time.asctime(), "] Calculating DAC Detail...", multiprocessing.current_process().name()
__cursor, __db = __enter__()
query_contexts = get_context_id(__cursor, cntxt_id)
# as it comes to all complex types we need to tell Oracle Client
# what type to expect from an OUT parameter
l_cur = __cursor.var(cx_Oracle.CURSOR)
list = []
execute_proc = "BEGIN PKG_TABLEAU_FEED.ADD_CONTEXT(:query_contexts, :min_row_id, :max_row_id, :out); END;"
print "[",time.asctime(), "] Ready to call the stored procedure: PKG_TABLEAU_FEED.ADD_CONTEXT"
__cursor.execute(execute_proc, (query_contexts, min_row_id, max_row_id, l_cur))
for row in l_cur.getvalue():
# print "[",time.asctime(), "]", row
list.append(row)
return list
def create_extract_file_detail():
#Step 1: Create the Extract File and open the .csv
try:
tdefile = tde.Extract('test_extract.tde')
except:
os.remove('test_extract.tde')
tdefile = tde.Extract('test_extract.tde')
return tdefile
def create_table_definition():
#Step 2: Create the tableDef
tableDef = tde.TableDefinition()
# DAC Data into the tde table
tableDef.addColumn('COB_DATE', tde.Type.DATE)
tableDef.addColumn('EXCEPTION_ID', tde.Type.INTEGER)
return tableDef
def generate_table(tdefile, tableDef):
#Step 2: open the tableDef if the table exists
if tdefile.hasTable("Extract"):
table = tdefile.openTable("Extract")
else:
# or add new table definition if the table doesn't exist
table = tdefile.addTable('Extract',tableDef)
return table
def generate_file(tableDef, table, detail_rowset):
newrow = tde.Row(tableDef)
for line in detail_rowset:
#Starts Here...
if line[0] != None and line[0]!= '':
date = line[0]
newrow.setDate(0, date.year, date.month, date.day)
if line[1] != None:
newrow.setInteger(1,int(line[1]))
table.insert(newrow)
if __name__ == '__main__':
try:
start = time.time()
__cursor, __db = __enter__()
cntxt_id = [253, 254]
print "[",time.asctime(), "] Connection established successfully..."
# start 4 worker processes
print "[",time.asctime(), "] cpu_count =" , multiprocessing.cpu_count()
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.DEBUG)
#
# Tests
#
PROCESSES=4
pool = Pool(PROCESSES)
TASKS = [(cntxt_id, i*1000 + 1, i*1000 + 1000) for i in range(4)]
print TASKS
detail_rowset = pool.map(find_DAC_detail, TASKS)
# for data in detail_rowset:
# print data
print "[",time.asctime(), "] Data Fetched in: ", time.time() - start
tdefile = create_extract_file_detail()
print "[",time.asctime(), "] TDE File Validated..."
tableDef = create_table_definition()
table = generate_table(tdefile, tableDef)
print "[",time.asctime(), "] Extract Table Created..."
generate_file(tableDef, table, detail_rowset)
print "[",time.asctime(), "] Extract File Generated..."
print "[",time.asctime(), "] Total Time taken: ", time.time() - start
# except:
# print "[",time.asctime(), "] ERROR:", sys.exc_info()[0]
finally:
__exit__(__cursor, __db)
print "[",time.asctime(), "] Exiting program..."
说明: 我试着用pool.apply_异步但它有很多细微差别。我正在努力池.map()调用find_DAC_detail函数(上面的代码),但它也失败,并出现以下错误:
错误:
^{pr2}$任何建议都会有帮助。在
目前没有回答
相关问题 更多 >
编程相关推荐