Python序列化/多进程与Tableau数据提取API问题

1 投票
1 回答
1367 浏览
提问于 2025-04-18 11:52

我刚开始学习Python,正在尝试使用多进程模块来进行一些需要大量CPU计算的数据转换。我有一大堆数据(大约40万条观察记录,300个变量),格式是csv,我想用它们通过Python API转换成Tableau数据提取文件。写一个脚本来完成这个转换其实很简单,但因为只有一个CPU在工作,所以大约需要15分钟才能完成(用Tableau Desktop只需90秒)。我需要利用我所有的8个核心来加快这个转换过程。

我最初的想法是把数据分成8块,让8个工作进程各自生成Tableau行的列表,然后把这些行合并成一个tde表格。但是,由于Tableau行对象/类是在一个单独的模块(即Tableau API)中定义的,我遇到了序列化和指针错误。这个API很复杂,还依赖于其他多个模块,所以我尝试在主全局空间重建必要的定义都失败了。

我尝试过使用Dill和PiCloud,但这两种方法仍然导致序列化或指针错误。有没有人知道在Python中,有没有有效的方法可以序列化和/或多进程处理依赖于外部包中定义的方法/对象的计算(而不需要深入研究这个包,试图在你的程序中重新创造轮子)?

下面是我想要进行多进程处理的工作程序(我参考了Brian Bickell在这里发布的工作:http://www.interworks.com/blogs/bbickell/2012/12/06/introducing-python-tableau-data-extract-api-csv-extract-example):

from sys import argv
import os, csv, datetime, time
import dataextract as tde

csv.field_size_limit(10000000)

## Functions

# This function makes adding the columns to each row in the extract a bit easier.
def add_tde_col(colnum, row, val, t):
    # Date format used below
    dateformat = '%Y-%mm-%dd %H:%M:%S.%f'  

    if t == tdeTypes['INTEGER']:
        try: 
            convert = int(val)
            row.setInteger(colnum, convert)
        except ValueError:
            #if we bomb the cast then we just add a null 
            row.setNull(colnum)

    elif t == tdeTypes['DOUBLE']:
        try: 
            convert = float(val)
            row.setDouble(colnum, convert)
        except ValueError:                        
            row.setNull(colnum)

    elif t == tdeTypes['BOOLEAN']:
        try: 
            convert = int(val)
            if convert > -1 and convert <= 1:
                row.setBoolean(colnum, convert)
        else:
            row.setNull(colnum)
        except ValueError:
            row.setNull(colnum)

    elif t == tdeTypes['DATETIME']:
        try:
            d = datetime.datetime.strptime(val, dateformat)
           row.setDate(colnum, d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond)
        except ValueError:
            row.setNull(colnum)

    elif t == tdeTypes['CHAR_STRING']:
        row.setCharString(colnum, val)

    elif t == tdeTypes['UNICODE_STRING']:
        row.setString(colnum, val)

    else:
        print 'Error'
        row.setNull(colnum)

# define csv input      
inputFile = 'test1.csv'

## Parameters 
tdeFileName = 'tdetest1.tde'

startTime = time.clock()

# Handy dictionary of Tableau data types
tdeTypes = {'INTEGER': 7, 'DOUBLE': 10, 'BOOLEAN': 11, 'DATE': 12, 'DATETIME': 13, 'DURATION': 14, 
            'CHAR_STRING': 15, 'UNICODE_STRING': 16}

## Define CSV Schema in dict, (truncated here for brevity)
csvSchema = []
csvSchema.append({'fAsOfDate': tdeTypes['DATETIME']})
csvSchema.append({'AsOfDate_Max': tdeTypes['DATETIME']})
csvSchema.append({'LoanID': tdeTypes['INTEGER']})
csvSchema.append({'lenderdatabaseid': tdeTypes['INTEGER']})
csvSchema.append({'loanrecordid': tdeTypes['INTEGER']})
csvSchema.append({'random_num': tdeTypes['INTEGER']})


# Try to create extract, delete if found.
try:
    tdeFile = tde.Extract(tdeFileName)
except: 
    os.system('del '+tdeFileName)
    os.system('del DataExtract.log')
    tdeFile = tde.Extract(tdeFileName)

# Open CSV
csvFile = open(inputFile, "rU")
reader = csv.reader(csvFile, delimiter = '^')

print 'Reading records from %s' % (inputFile)

# Create TDE table definition
tdeTableDef = tde.TableDefinition() 

print 'Defined table schema:'

# Build TDE Table Def from csv schema dict
for index, item in enumerate(csvSchema):
    for k, v in item.items():
        print 'Column %i: %s <%s>' % (index, k, tdeTypes.keys() [tdeTypes.values().index(v)])
        tdeTableDef.addColumn(k, v)

# Add table to extract
tdeTable = tdeFile.addTable("Extract",tdeTableDef)

print 'Writing records to %s' % (tdeFileName)

# iterate through rows and columns of csv -> add to tde
rownum = 0
for row in reader:
    if rownum == 0:
        header = row
    else:
        colnum = 0
        tdeRow = tde.Row(tdeTableDef)
        for col in row:
            if colnum+1 > len(csvSchema):
                break
            add_tde_col(colnum, tdeRow, row[colnum], csvSchema[colnum].values()[0])
            colnum += 1
        tdeTable.insert(tdeRow)
        tdeRow.close()        
    rownum += 1

print '%i rows added in total in %f seconds' % (rownum-1, time.clock()-startTime)

tdeFile.close()

csvFile.close()

1 个回答

0

如果你在用 dillPiCloud 处理 ctypes.pointer 类型时遇到序列化失败的问题,那我觉得你可能会很难解决。我不知道有没有工具能处理这些类型。虽然 dill 可以处理一些 ctypes 的类型,但指针类型就不行了。我建议你在 GitHub 上给 dill 提个问题,或许会有意想不到的好结果,得到一个新的序列化类型。有了新的类型后,我会使用 pathos.multiprocessing,这样应该就能正常工作了。不过在那之前,你可能需要考虑重写一些代码来绕过这个序列化的问题。比如,你可以使用 from dill.detect import badobjects, baditems, badtypes, errors 来查看需要在哪个层级进行重写。可能只需要简单地修改一下导入的方式——不过因为涉及到 ctypes.pointer,我对这会不会简单持怀疑态度。

撰写回答