Python多处理问题池.map()

2024-04-25 12:21:58 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我系统的一部分的示意图。 enter image description here

我有一个返回数据的存储过程。我需要用多个输入参数调用这个存储过程。所以我写了python多处理脚本。但似乎有些问题。在

代码:

import cx_Oracle
import dataextract as tde
import os, sys
import time
from multiprocessing import Pool
import multiprocessing, logging
from multiprocessing.pool import ApplyResult





def __enter__():
    __db = cx_Oracle.Connection("TEST/test123@test.net:12345/test.world")
    __cursor = __db.cursor()
    return __cursor, __db

def __exit__(__cursor, __db):
    __cursor.close()
    __db.close()

def get_context_id(__cursor, context_list):
    context_id = __cursor.arrayvar(cx_Oracle.NUMBER, context_list)
    return context_id


def find_DAC_detail(input):
    cntxt_id = input [0]
    min_row_id = input[1] 
    max_row_id = input[2] 
    print "[",time.asctime(), "]", cntxt_id, min_row_id, max_row_id
    print "[",time.asctime(), "] Calculating DAC Detail...", multiprocessing.current_process().name()
    __cursor, __db = __enter__()
    query_contexts = get_context_id(__cursor, cntxt_id)

    # as it comes to all complex types we need to tell Oracle Client
    # what type to expect from an OUT parameter
    l_cur = __cursor.var(cx_Oracle.CURSOR)
    list = []
    execute_proc = "BEGIN PKG_TABLEAU_FEED.ADD_CONTEXT(:query_contexts, :min_row_id, :max_row_id, :out); END;"
    print "[",time.asctime(), "] Ready to call the stored procedure: PKG_TABLEAU_FEED.ADD_CONTEXT"
    __cursor.execute(execute_proc, (query_contexts, min_row_id, max_row_id, l_cur)) 

    for row in l_cur.getvalue():
    #          print "[",time.asctime(), "]", row
        list.append(row)

    return list



def create_extract_file_detail():
    #Step 1: Create the Extract File and open the .csv
    try:
        tdefile = tde.Extract('test_extract.tde')
    except:
        os.remove('test_extract.tde')            
        tdefile = tde.Extract('test_extract.tde')

    return tdefile


def create_table_definition():

    #Step 2: Create the tableDef
    tableDef = tde.TableDefinition()
    # DAC Data into the tde table
    tableDef.addColumn('COB_DATE', tde.Type.DATE)
    tableDef.addColumn('EXCEPTION_ID', tde.Type.INTEGER)

    return tableDef       



def generate_table(tdefile, tableDef):
    #Step 2: open the tableDef if the table exists     
    if tdefile.hasTable("Extract"):
        table = tdefile.openTable("Extract")
    else:
        # or add new table definition if the table doesn't exist
        table = tdefile.addTable('Extract',tableDef)
    return table


def generate_file(tableDef, table, detail_rowset):           

    newrow = tde.Row(tableDef)

    for line in detail_rowset:
        #Starts Here...                  
        if line[0] != None and line[0]!= '':       
            date =  line[0]
            newrow.setDate(0, date.year, date.month, date.day)

        if line[1] != None:
            newrow.setInteger(1,int(line[1]))

        table.insert(newrow)


if __name__ == '__main__':
    try:
        start = time.time()
        __cursor, __db = __enter__()

        cntxt_id = [253, 254]

        print "[",time.asctime(), "] Connection established successfully..."      



        # start 4 worker processes
        print "[",time.asctime(), "] cpu_count =" , multiprocessing.cpu_count()
        logger = multiprocessing.log_to_stderr()
        logger.setLevel(logging.DEBUG)


        #
        # Tests
        #
        PROCESSES=4
        pool = Pool(PROCESSES)
        TASKS = [(cntxt_id, i*1000 + 1, i*1000 + 1000) for i in range(4)]
        print TASKS
        detail_rowset = pool.map(find_DAC_detail, TASKS)


#        for data in detail_rowset:
#            print data
        print "[",time.asctime(), "] Data Fetched in: ", time.time() - start

        tdefile = create_extract_file_detail()
        print "[",time.asctime(), "] TDE File Validated..."
        tableDef = create_table_definition()
        table = generate_table(tdefile, tableDef)
        print "[",time.asctime(), "] Extract Table Created..."
        generate_file(tableDef, table, detail_rowset)
        print "[",time.asctime(), "] Extract File Generated..."
        print "[",time.asctime(), "] Total Time taken: ",  time.time() - start
#    except:
#        print "[",time.asctime(), "] ERROR:", sys.exc_info()[0]
    finally:                
        __exit__(__cursor, __db)
        print "[",time.asctime(), "] Exiting program..."

说明: 我试着用pool.apply_异步但它有很多细微差别。我正在努力池.map()调用find_DAC_detail函数(上面的代码),但它也失败,并出现以下错误:

错误:

^{pr2}$

任何建议都会有帮助。在


Tags: theiddbtimedeftableextractcursor