所以我一直致力于用不同的参数组合来分布和并行执行不同的ML算法。我正在集群环境上进行实验,我希望尽可能多地利用所有可用资源。 更多信息:How are tasks distributed within a Spark cluster?
我的输入包括JSON格式的算法和参数列表。基于这种情况,我有两种方法:
对于我拥有的每个ML算法,我创建一个SparkGridSearch对象(从official integration)并将其放入一个列表中。然后,我在不同的线程上执行这些网格搜索对象,如下所示:
...
for experiment in experiments:
# experiment contains the algorithm and its list of parameters
gridSearchCV = SparkGridSearchCV(spark_context, experiment[0], experiment[1], scoring=experiment[2], cv=experiment[3])
experiment_thread = Thread(target=run_experiment, args=(bcast_dataframe, gridSearchCV))
experiment_thread.start()
输入:
^{pr2}$我还创建了一个列表,其中每个元素都是一个带有一个参数组合的算法。因此,这个列表的大小比前面的方法大得多,因为每个组合都是列表的一个元素。在本例中,我只需在此列表中使用spark.parallelize
,并将其映射到运行每个ML算法的函数:
def run_experiment(dataframe, experiment):
...
model = estimator.fit(train_data, train_target)
predicted = model.predict(test_data)
score = accuracy_score(predicted, expected)
return [model, score]
def main():
...
bcast_dataset = spark_context.broadcast(df)
experimentsRDD = spark_context.parallelize(experiments_dict)
print experimentsRDD.map(lambda experiment: run_experiment(bcast_dataset, experiment)).collect()
bcast_dataset.unpersist()
输入:
experiments_dict = {
'Decision Tree': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {'criterion': ['entropy'], 'class_weight': ['balanced']}},
'Naive Bayes': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {}},
'Random Forests': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {'n_estimators': [5]}},
'Random Forests_1_2': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {'n_estimators': [7]}},
'Random Forests_2': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'bootstrap': [True, False], 'criterion': ['entropy'], 'class_weight': ['balanced']}},
'Random Forests_2_1': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'bootstrap': [False], 'criterion': ['entropy'], 'class_weight': ['balanced']}},
'Decision Tree_2': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [2], 'max_depth': [None], 'class_weight': ['balanced']}},
'Decision Tree_2_1': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [3], 'max_depth': [None], 'class_weight': ['balanced']}},
'Decision Tree_2_2': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [None], 'max_depth': [None], 'class_weight': ['balanced']}},
'Decision Tree_2_3': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [2], 'max_depth': [None], 'class_weight': ['None']}},
'Decision Tree_2_4': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [3], 'max_depth': [None], 'class_weight': ['None']}},
'Decision Tree_2_5': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [None], 'max_depth': [None], 'class_weight': ['None']}},
...
}
我用第二种方法做了一些实验,取得了较好的效果。但是,当改变分区的数量(等于实验的数量,或者2到3倍的可用核心数量)时,执行时间没有变化。 在多线程方法中使用Spark广播数据集时,我也遇到了一些问题,但显然这是一个已知的issue。在
我的问题:这些结果有意义吗?为什么分区的数量不影响执行时间?我是缺少了什么,还是有什么可以改进的?在
谢谢!在
目前没有回答
相关问题 更多 >
编程相关推荐