回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>我有很多物联网传感器的数据。对于每个特定的传感器,数据帧中只有大约100行:数据没有扭曲。我正在为每个传感器训练一个单独的机器学习模型</p>
<p>我正在使用<code>pandas udf</code>成功地并行地训练和记录不同模型的mlflow度量(假定是这样),正如<a href="https://databricks.com/blog/2020/05/19/manage-and-scale-machine-learning-models-for-iot-devices.html" rel="nofollow noreferrer">here</a>所教的那样</p>
<p>在Azure上使用带有单节点群集(标准DS3\U v2-14GB内存-4个内核)的Databricks,我能够在大约<strong>23分钟内完成所有培训</p>
<p>由于<code>pandas udf</code>假定对每个组并行计算,因此我认为通过使用具有更多核心的单节点集群,或者使用具有更多工作人员的集群,可以更快地完成培训。因此,我尝试使用以下工具运行同一个笔记本:</p>
<ol>
<li>一组计算机:1个主机+3个工人,全部(标准DS3\U v2-14GB内存-4个核)</li>
<li>一个单节点群集,具有(标准_DS5_v2-56GB内存-16核)</li>
</ol>
<p>令我惊讶的是,训练时间没有减少:选项1为<strong>23分钟,选项2为<strong>26.5分钟</p>
<p>我尝试使用<a href="https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html#pandas-function-apis" rel="nofollow noreferrer">newer</a><code>applyInPandas</code>,但结果大致相同</p>
<p>注意:在@Chris answer之后,检查Web UI上的阶段详细信息页面(对于拥有1名大师+3名工人的集群),我发现我只有一个阶段负责udf熊猫培训。这花了20分钟。了解了这个阶段的细节,我发现它只有一个任务,用了整个20分钟。下面是截图</p>
<p>因此@Chris发现了问题:培训没有并行化</p>
<p>为了理解<code>applyInPandas</code>(或<code>udf pandas</code>)没有被并行化的原因,我将代码放在下面(使用<code>applyInPandas</code>版本)。注意,我的目标只是用<code>mlflow</code>记录经过训练的模型度量,因此函数返回的只是它接收到的原始df</p>
<p>还请注意,代码按预期工作。mlflow正在成功记录培训。我唯一的问题是为什么它没有被并行化</p>
<p>我有一种感觉,问题在于<code>for loop</code>,因为它不同于<a href="https://databricks.com/blog/2020/05/19/manage-and-scale-machine-learning-models-for-iot-devices.html" rel="nofollow noreferrer">tutorial</a></p>
<pre><code>import pyspark.sql.functions as f
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import pmdarima as pm
from statsmodels.tsa.statespace.sarimax import SARIMAX
def train_model(df_pandas):
'''
Trains a model on grouped instances
'''
original_df = df_pandas.copy() #the original df will be returned in the end
PA = df_pandas['Princípio_Ativo'].iloc[0]
run_id = df_pandas['run_id'].iloc[0] # Pulls run ID to do a nested run
observacoes_no_teste = 12
horizonte = 1
observacoes_total = len(df_pandas.index)
observacoes_no_train = len(df_pandas.index) - observacoes_no_teste
try:
#train test split
X = df_pandas[:observacoes_no_train]['Demanda']
y = df_pandas[observacoes_no_train:]['Demanda']
# Train the model
model = pm.auto_arima(X, seasonal=True, m=12)
order = model.get_params()['order']
seasonal_order = model.get_params()['seasonal_order']
except:
pass
# Resume the top-level training
with mlflow.start_run(run_id=run_id, experiment_id=1333367041812290):
# Create a nested run for the specific device
with mlflow.start_run(run_name=str(PA), nested=True, experiment_id=1333367041812290) as run:
mae_list = []
mse_list = []
previsoes_list = []
teste_list = []
predictions_list = []
try:
#the purpose of the following loop is to do backtesting: the model is trained with n observations, and the (n+1)th is predicted. n is increased by 1 on each iteration.
for i in range(observacoes_total-observacoes_no_train-horizonte+1):
#train test split
X = df_pandas[:observacoes_no_train+i]['Demanda']
y = df_pandas[observacoes_no_train+i:observacoes_no_train+i+horizonte]['Demanda']
#train model
model = SARIMAX(X, order=order, seasonal_order=seasonal_order)
model = model.fit()
#make predictions
predictions = model.predict(start=observacoes_no_train + i, end=(observacoes_no_train + i + horizonte-1))
predictions_list.append(predictions)
mse = round(mean_squared_error(y, predictions),2)
mae = round(mean_absolute_error(y, predictions),2)
mse_list.append(mse)
mae_list.append(mae)
#series with predictions
in_sample_predictions = pd.concat(predictions_list)
in_sample_predictions.name = 'in_sample'
#out of sample predictions
hp = 3
out_of_sample_predictions = model.predict(start=observacoes_total, end=(observacoes_total + hp - 1))
out_of_sample_predictions.name = 'out_sample'
#in sample + out of sample predictions
df_predictions = pd.concat([df_pandas.drop('run_id',axis=1), in_sample_predictions,out_of_sample_predictions], axis=1)
#save df with predictions to be logged as an artifact my mlflow.
df_predictions.to_csv('df_predictions.csv')
#mlflow logging
mlflow.log_param("Princípio_Ativo", PA)
mlflow.log_param("mae_list", str(mae_list))
mlflow.log_param("mse_list", str(mse_list))
mlflow.log_param("status_sucesso", 'sim')
mlflow.log_artifact('df_predictions.csv')
except:
mlflow.log_param("status_falha", 'sim')
return original_df.drop('run_id', axis=1)
</code></pre>
<pre><code>with mlflow.start_run(run_name="SARIMA", experiment_id=1333367041812290) as run:
run_id = run.info.run_uuid
modelDirectoriesDF = (df
.withColumn("run_id", f.lit(run_id)) # Add run_id
.groupby("Princípio_Ativo")
.applyInPandas(train_model, schema=df.schema)
)
combinedDF = (df
.join(modelDirectoriesDF, on="Princípio_Ativo", how="left")
)
display(combinedDF)
</code></pre>
<p>Spark UI的屏幕截图:
<a href="https://i.stack.imgur.com/9bMNu.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/9bMNu.png" alt="Picture1"/></a>
<a href="https://i.stack.imgur.com/qOOza.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/qOOza.png" alt="picture2"/></a>
<a href="https://i.stack.imgur.com/Yc0V5.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/Yc0V5.png" alt="picture3"/></a></p>