如何在spark上优化此代码?

2024-10-01 02:25:18 发布

您现在位置:Python中文网/ 问答频道 /正文

如何使这段代码在Spark中更有效?
我需要根据数据计算最小值、最大值、计数和平均值。
这是我的样本数据

Name Shop Money
A Shop001 99.99
A Shop001 87.15
B Shop001 3.99
...

现在,我尝试组织我的数据,生成mean、min、max、count by Name+Shop(key)。
然后通过collect()获得结果。
这是我在spark中的代码



def tupleDivide(y):
    return float(y[0])/y[1]

def smin(a, b):
    return min(a, b)

def smax(a, b):
    return max(a, b)

raw = sgRDD.map(lambda x: getVar(parserLine(x),list_C+list_N)).cache()
cnt = raw.map(lambda (x,y,z): (x+"_"+y, 1)).countByKey()
sum = raw.map(lambda (x,y,z): (x+"_"+y, z)).reduceByKey(add)
min = raw.map(lambda (x,y,z): (x+"_"+y, z)).reduceByKey(smin)
max = raw.map(lambda (x,y,z): (x+"_"+y, z)).reduceByKey(smax)
raw_cntRDD = sc.parallelize(cnt.items(),3)
raw_mean = sum.join(raw_cntRDD).map(lambda (x, y): (x, tupleDivide(y))) 

有人能提供一些关于优雅的编码风格的建议吗?
谢谢!在


Tags: 数据lambda代码namemaprawreturndef
1条回答
网友
1楼 · 发布于 2024-10-01 02:25:18

您应该使用aggregateByKey来获得更优化的处理。其思想是存储由count、min、max和sum组成的state向量,并使用聚合函数来获得最终值。另外,可以使用元组作为键,不必将键连接到单个字符串中。在

data = [
        ['x', 'shop1', 1],
        ['x', 'shop1', 2],
        ['x', 'shop2', 3],
        ['x', 'shop2', 4],
        ['x', 'shop3', 5],
        ['y', 'shop4', 6],
        ['y', 'shop4', 7],
        ['y', 'shop4', 8]
    ]

def add(state, x):
    state[0] += 1
    state[1] = min(state[1], x)
    state[2] = max(state[2], x)
    state[3] += x
    return state

def merge(state1, state2):
    state1[0] += state2[0]
    state1[1] = min(state1[1], state2[1])
    state1[2] = max(state1[2], state2[2])
    state1[3] += state2[3]
    return state1

res = sc.parallelize(data).map(lambda x: ((x[0], x[1]), x[2])).aggregateByKey([0, 10000, 0, 0], add, merge)

for x in res.collect():
    print 'Client "%s" shop "%s" : count %d min %f max %f avg %f' % (
        x[0][0], x[0][1],
        x[1][0], x[1][1], x[1][2], float(x[1][3])/float(x[1][0])
    )

相关问题 更多 >