为什么PandasUDF没有被并行化?

2024-10-03 04:33:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我有很多物联网传感器的数据。对于每个特定的传感器,数据帧中只有大约100行:数据没有扭曲。我正在为每个传感器训练一个单独的机器学习模型

我正在使用pandas udf成功地并行地训练和记录不同模型的mlflow度量(假定是这样),正如here所教的那样

在Azure上使用带有单节点群集(标准DS3\U v2-14GB内存-4个内核)的Databricks,我能够在大约23分钟内完成所有培训

由于pandas udf假定对每个组并行计算,因此我认为通过使用具有更多核心的单节点集群,或者使用具有更多工作人员的集群,可以更快地完成培训。因此,我尝试使用以下工具运行同一个笔记本:

  1. 一组计算机:1个主机+3个工人,全部(标准DS3\U v2-14GB内存-4个核)
  2. 一个单节点群集,具有(标准_DS5_v2-56GB内存-16核)

令我惊讶的是,训练时间没有减少:选项1为23分钟,选项2为26.5分钟

我尝试使用newerapplyInPandas,但结果大致相同

注意:在@Chris answer之后,检查Web UI上的阶段详细信息页面(对于拥有1名大师+3名工人的集群),我发现我只有一个阶段负责udf熊猫培训。这花了20分钟。了解了这个阶段的细节,我发现它只有一个任务,用了整个20分钟。下面是截图

因此@Chris发现了问题:培训没有并行化

为了理解applyInPandas(或udf pandas)没有被并行化的原因,我将代码放在下面(使用applyInPandas版本)。注意,我的目标只是用mlflow记录经过训练的模型度量,因此函数返回的只是它接收到的原始df

还请注意,代码按预期工作。mlflow正在成功记录培训。我唯一的问题是为什么它没有被并行化

我有一种感觉,问题在于for loop,因为它不同于tutorial

import pyspark.sql.functions as f
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import pmdarima as pm
from statsmodels.tsa.statespace.sarimax import SARIMAX

def train_model(df_pandas):
  '''
  Trains a model on grouped instances
  '''
  original_df = df_pandas.copy() #the original df will be returned in the end
  PA = df_pandas['Princípio_Ativo'].iloc[0]
  run_id = df_pandas['run_id'].iloc[0] # Pulls run ID to do a nested run
  
  
  observacoes_no_teste = 12
  horizonte = 1
  observacoes_total = len(df_pandas.index)
  observacoes_no_train = len(df_pandas.index) - observacoes_no_teste
  
  try:
    #train test split
    X = df_pandas[:observacoes_no_train]['Demanda']
    y = df_pandas[observacoes_no_train:]['Demanda']

    # Train the model
    model = pm.auto_arima(X, seasonal=True, m=12)

    order = model.get_params()['order']
    seasonal_order = model.get_params()['seasonal_order']


  except:
    pass
 
  # Resume the top-level training
  with mlflow.start_run(run_id=run_id, experiment_id=1333367041812290):
    # Create a nested run for the specific device
    with mlflow.start_run(run_name=str(PA), nested=True, experiment_id=1333367041812290) as run:
      
      mae_list = []
      mse_list = []
      previsoes_list = []
      teste_list = []
      predictions_list = []

    
      try:
        #the purpose of the following loop is to do backtesting: the model is trained with n observations, and the (n+1)th is predicted. n is increased by 1 on each iteration.
        for i in range(observacoes_total-observacoes_no_train-horizonte+1):
          #train test split
          X = df_pandas[:observacoes_no_train+i]['Demanda']
          y = df_pandas[observacoes_no_train+i:observacoes_no_train+i+horizonte]['Demanda']
          #train model
          model = SARIMAX(X, order=order, seasonal_order=seasonal_order)
          model = model.fit()
          #make predictions
          predictions = model.predict(start=observacoes_no_train + i, end=(observacoes_no_train + i + horizonte-1))

          predictions_list.append(predictions)

          mse = round(mean_squared_error(y, predictions),2)
          mae = round(mean_absolute_error(y, predictions),2)

          mse_list.append(mse)
          mae_list.append(mae)

        #series with predictions
        in_sample_predictions = pd.concat(predictions_list)
        in_sample_predictions.name = 'in_sample'
        #out of sample predictions
        hp = 3
        out_of_sample_predictions = model.predict(start=observacoes_total, end=(observacoes_total + hp - 1))
        out_of_sample_predictions.name = 'out_sample'
        #in sample + out of sample predictions
        df_predictions = pd.concat([df_pandas.drop('run_id',axis=1), in_sample_predictions,out_of_sample_predictions], axis=1)
        #save df with predictions to be logged as an artifact my mlflow.
        df_predictions.to_csv('df_predictions.csv')

        #mlflow logging
        mlflow.log_param("Princípio_Ativo", PA)
        mlflow.log_param("mae_list", str(mae_list))
        mlflow.log_param("mse_list", str(mse_list))
        mlflow.log_param("status_sucesso", 'sim')
        mlflow.log_artifact('df_predictions.csv')
      except:
        mlflow.log_param("status_falha", 'sim')

  return original_df.drop('run_id', axis=1) 

with mlflow.start_run(run_name="SARIMA", experiment_id=1333367041812290) as run:
  run_id = run.info.run_uuid

  modelDirectoriesDF = (df
    .withColumn("run_id", f.lit(run_id)) # Add run_id
    .groupby("Princípio_Ativo")
    .applyInPandas(train_model, schema=df.schema)
  )
  
combinedDF = (df
  .join(modelDirectoriesDF, on="Princípio_Ativo", how="left")
)

display(combinedDF)

Spark UI的屏幕截图: Picture1picture2picture3


Tags: thesamplenorunimportidpandasdf
2条回答

对。只有一个任务的阶段是不并行的。这可以解释为什么不通过在集群中添加更多核心或节点来减少运行时间

您的输入数据集很小(69KB),因此除非您显式地repartition,否则如果数据帧分区大小保留为默认的128MB(由spark.sql.files.maxPartitionBytes参数指定),Spark将把它写入单个分区。因此,它将被分配给单个任务

通过设备列重新划分输入应该提供您正在寻找的并行培训

我认为您可以通过在执行最后一个display命令之前对combinedDF执行sort()来改进并行化。比如:

import pyspark.sql.functions as F
display(combinedDF.sort(F.col("Princípio_Ativo").asc()))

上面的代码片段将防止display函数仅计算combinedDF中的一些行。 其思想是,通过对display之前的行进行排序,spark必须对所有行进行求值,以了解display函数的行顺序。因此,一切都安排为并行执行。 spark在不首先排序的情况下执行display,在计算足够的行以执行display之前,计划任务的数量会按阶段指数增长

相关问题 更多 >