如何使用pyspark管道并行化数据帧转换?

2024-09-24 06:28:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我的方法如下。我认为,由于我使用for循环并分别处理每一列,所以这段代码不会分发,因为完成它需要很多小时。 我在pyspark数据帧中有8000列。你能告诉我一个正确的方法来实现这个并行/分布式的方式吗。谢谢

indexers = [ 
 StringIndexer(inputCol=c, outputCol="{}_idx".format(c)) for c in cat_string]

encoders1 = [
  OneHotEncoder(
    inputCol=idx.getOutputCol(),
    outputCol="{0}_enc".format(idx.getOutputCol())) for idx in indexers]

encoders2 = [
    OneHotEncoder(
     inputCol=c,
      outputCol="{0}_enc".format(c)) for c in cat_numeric]

label_indexer = StringIndexer(inputCol = "dep", outputCol = "label")

assembler = VectorAssembler(
 inputCols=[enc.getOutputCol() for enc in encoders1] +[enc.getOutputCol()    for enc in encoders2]+ con_columns,
outputCol="features")

pipeline = Pipeline(stages=indexers + encoders1 +encoders2+[label_indexer]+[assembler])

model = pipeline.fit(final_df)

Tags: 方法informatforlabelcatencidx