我对Python比较陌生,正在努力使用多处理模块来执行一些CPU密集型的数据转换。我有大量的csv格式的数据(大约400000个观察值,大约300个变量),我想用pythonapi将其转换为Tableau数据提取。编写一个脚本来进行转换是很简单的,但是它需要大约15分钟才能完成,因为只有一个CPU在做这个工作(使用Tableau桌面只需要大约90秒)。我需要利用我所有的8个核心,使这个转换进行得更快。在
我最初的想法是将数据分成8块,让8个工人用多处理模块生成Tableau行的列表,然后将这些行合并成一个tde表。但是,由于Tableau行对象/类是在一个单独的模块(Tableau API)中定义的,所以我得到了pickle和指针错误。这个API很复杂,并且从许多其他模块中提取,所以我在主全局空间中重建必要定义的尝试都失败了。在
我尝试过使用Dill和PiCloud,但这两种尝试都会导致pickle或指针错误。有没有人知道在Python中序列化和/或多处理一个依赖于外部包中定义的方法/对象的计算的有效方法(不必深入到包中试图在程序中重新创建轮子)?在
下面是我想要多处理的工作程序(我大量地借鉴了Brian Bickell在这里发表的工作http://www.interworks.com/blogs/bbickell/2012/12/06/introducing-python-tableau-data-extract-api-csv-extract-example):
from sys import argv
import os, csv, datetime, time
import dataextract as tde
csv.field_size_limit(10000000)
## Functions
# This function makes adding the columns to each row in the extract a bit easier.
def add_tde_col(colnum, row, val, t):
# Date format used below
dateformat = '%Y-%mm-%dd %H:%M:%S.%f'
if t == tdeTypes['INTEGER']:
try:
convert = int(val)
row.setInteger(colnum, convert)
except ValueError:
#if we bomb the cast then we just add a null
row.setNull(colnum)
elif t == tdeTypes['DOUBLE']:
try:
convert = float(val)
row.setDouble(colnum, convert)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['BOOLEAN']:
try:
convert = int(val)
if convert > -1 and convert <= 1:
row.setBoolean(colnum, convert)
else:
row.setNull(colnum)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['DATETIME']:
try:
d = datetime.datetime.strptime(val, dateformat)
row.setDate(colnum, d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['CHAR_STRING']:
row.setCharString(colnum, val)
elif t == tdeTypes['UNICODE_STRING']:
row.setString(colnum, val)
else:
print 'Error'
row.setNull(colnum)
# define csv input
inputFile = 'test1.csv'
## Parameters
tdeFileName = 'tdetest1.tde'
startTime = time.clock()
# Handy dictionary of Tableau data types
tdeTypes = {'INTEGER': 7, 'DOUBLE': 10, 'BOOLEAN': 11, 'DATE': 12, 'DATETIME': 13, 'DURATION': 14,
'CHAR_STRING': 15, 'UNICODE_STRING': 16}
## Define CSV Schema in dict, (truncated here for brevity)
csvSchema = []
csvSchema.append({'fAsOfDate': tdeTypes['DATETIME']})
csvSchema.append({'AsOfDate_Max': tdeTypes['DATETIME']})
csvSchema.append({'LoanID': tdeTypes['INTEGER']})
csvSchema.append({'lenderdatabaseid': tdeTypes['INTEGER']})
csvSchema.append({'loanrecordid': tdeTypes['INTEGER']})
csvSchema.append({'random_num': tdeTypes['INTEGER']})
# Try to create extract, delete if found.
try:
tdeFile = tde.Extract(tdeFileName)
except:
os.system('del '+tdeFileName)
os.system('del DataExtract.log')
tdeFile = tde.Extract(tdeFileName)
# Open CSV
csvFile = open(inputFile, "rU")
reader = csv.reader(csvFile, delimiter = '^')
print 'Reading records from %s' % (inputFile)
# Create TDE table definition
tdeTableDef = tde.TableDefinition()
print 'Defined table schema:'
# Build TDE Table Def from csv schema dict
for index, item in enumerate(csvSchema):
for k, v in item.items():
print 'Column %i: %s <%s>' % (index, k, tdeTypes.keys() [tdeTypes.values().index(v)])
tdeTableDef.addColumn(k, v)
# Add table to extract
tdeTable = tdeFile.addTable("Extract",tdeTableDef)
print 'Writing records to %s' % (tdeFileName)
# iterate through rows and columns of csv -> add to tde
rownum = 0
for row in reader:
if rownum == 0:
header = row
else:
colnum = 0
tdeRow = tde.Row(tdeTableDef)
for col in row:
if colnum+1 > len(csvSchema):
break
add_tde_col(colnum, tdeRow, row[colnum], csvSchema[colnum].values()[0])
colnum += 1
tdeTable.insert(tdeRow)
tdeRow.close()
rownum += 1
print '%i rows added in total in %f seconds' % (rownum-1, time.clock()-startTime)
tdeFile.close()
csvFile.close()
如果您未能在}序列化,那么我认为您卡住了。我不知道有没有序列化程序可以处理这些类型。
ctypes.pointer
类型中用dill
和{dill
可以处理某些ctypes
类型,但不能处理指针类型。我建议在github上为dill
添加一个问题,也许会发生神奇的事情,您将得到一个新的序列化类型。有了新类型,我将使用pathos.multiprocessing
,它应该可以工作。不过,在重写之前,您可能要先看看问题。例如,如果您使用from dill.detect import badobjects, baditems, badtypes, errors
,您可能可以看到您需要在多大的深度进行重写。这可能和修改导入的方式一样简单,但是,因为有一个ctypes.pointer
,我怀疑这是否容易。在相关问题 更多 >
编程相关推荐