我有下面的示例输入数据帧,但值(clm以m开头)列可以是n个数字。此外,我还将customer_id用作主键(但是,根据输入数据,我可以有更多的主键号)
customer_id|month_id|m1 |m2 |m3 ....to....m_n
1001 | 01 |10 |20
1002 | 01 |20 |30
1003 | 01 |30 |40
1001 | 02 |40 |50
1002 | 02 |50 |60
1003 | 02 |60 |70
1001 | 03 |70 |80
1002 | 03 |80 |90
1003 | 03 |90 |100
现在,基于输入值列-我必须基于累积和或平均值计算新列。让我们考虑一个例子:
cumulative sum on [m1, ......, m10] and
cumulative avg on [m11, ......., m20] columns
基于此,我必须计算新列。我在windows函数的基础上尝试了它,并且能够计算新的列。但是,我的问题是因为数据的大小,我正在用更新的数据框和新列一个接一个地进行计算
我的尝试:
a = [m1, ......, m10]
b = [m11, ......, m20]
rnum = (Window.partitionBy("partner_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
for item in a:
var = n
df = df.withColumn(var + item[1:], F.sum(item).over(rnum))
for item in b:
var = n
df = df.withColumn(var + item[1:], F.avg(item).over(rnum))
输出数据:
customer_id|month_id|m1 |m2 |m11 |m12 |n1 |n2 |n11 |n12
1001 | 01 |10 |20 |10 |20 |10 |20 |10 |20
1002 | 01 |20 |30 |10 |20 |20 |30 |10 |20
1003 | 01 |30 |40 |10 |20 |30 |40 |10 |20
1001 | 02 |40 |50 |10 |20 |50 |35 |10 |20
1002 | 02 |50 |60 |10 |20 |70 |55 |10 |20
1003 | 02 |60 |70 |10 |20 |90 |75 |10 |20
1001 | 03 |70 |80 |10 |20 |120 |75 |10 |20
1002 | 03 |80 |90 |10 |20 |150 |105 |10 |20
1003 | 03 |90 |100 |10 |20 |180 |135 |10 |20
但是,我们可以通过将数据帧拆分为两个来执行相同的操作吗?一个数据帧中包含累积和列,另一个数据帧中包含累积平均列以及主键,然后执行该操作,然后计算出数据帧
DF1方法优化逻辑计划
DF方法优化逻辑计划
如果您看到上面的
DF Approach Optimized Logical Plan
,它在平均值计算期间有总和计算计划,这可能是低效的只要有可能,您可以缩小数据帧的大小并继续计算。同时,为DF1优化逻辑计划中的两个数据集添加了
join
计划。在许多情况下,连接总是很慢,因此最好通过code - repartition & cache
configs - executor, driver, memoryOverhead, number of cores
我用
m1,m2,m3,m4
列尝试过的代码根据您的问题,我的理解是,您正在尝试拆分操作以并行执行任务并节省时间
您不必并行化执行,因为当您对已创建的数据帧执行任何操作(如collect()、show()、count()、write)时,执行将在spark中自动并行化。这是由于spark的懒惰执行
如果出于其他原因仍要拆分操作,可以使用线程。下面的文章将为您提供有关pyspark中线程的更多信息:https://medium.com/@everisUS/threads-in-pyspark-a6e8005f6017
相关问题 更多 >
编程相关推荐