在Spark dataframe列中获取最大值的最佳方法

2024-06-28 19:45:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图找出在Spark数据帧列中获得最大值的最佳方法。

请考虑以下示例:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()

它创造了:

+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+

我的目标是在A列中找到最大的值(通过检查,这是3.0)。使用PySpark,我可以想到四种方法:

# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])

# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']

# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']

# Method 4: Convert to RDD
df.select("A").rdd.max()[0]

上面的每一个都给出了正确的答案,但是在没有火花分析工具的情况下,我无法判断哪一个是最好的。

从直觉或经验主义的观点来看,上述哪种方法在Spark运行时或资源使用方面最有效,或者是否有比上述方法更直接的方法?


Tags: 数据方法dfusetableselectmethodmax
3条回答

备注:Spark的目标是研究大数据分布式计算。示例数据帧的大小非常小,因此可以相对于小示例更改实际示例的顺序。

最慢:方法1,因为.descripe(“A”)计算min、max、mean、stddev和count(整列5次计算)

Medium:Method_4,因为.rdd(DF到rdd转换)减慢了进程。

更快:Method_3~ Method_2~ Method_5,因为逻辑非常相似,所以Spark的catalyst优化器以最少的操作数遵循非常相似的逻辑(获取特定列的最大值,收集单个值的数据帧);(.asDict()比3,2到5多了一点时间)

import pandas as pd
import time

time_dict = {}

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#--  For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)

tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)

tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)

tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)

tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)

tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)

print time_dict

群集边缘节点上的结果(毫秒):

小DF(ms):{'m1':7096,'m2':205,'m3':165,'m4':211,'m5':180}

较大的DF(ms):{'m1':10260,'m2':452,'m3':465,'m4':916,'m5':373}

使用-

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor|           timestamp|     uid|         x|          y|
+-----+--------------------+--------+----------+-----------+
|    1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
|    1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
|    1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
|    1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|

>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613

答案几乎与方法3相同。但似乎method3中的“asDict()”可以删除

相关问题 更多 >