我试图count
使用Spark API对mllib的FP growth生成的频繁项集。我的火花是1.5.1版。以下是我的代码:
#!/usr/bin/python
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext,SparkConf
from pyspark import HiveContext
import os
os.environ['PYSPARK_PYTHON']='/usr/bin/python'
appName = "FP_growth"
sc = SparkContext()
sql_context = HiveContext(sc)
def read_spu(prod):#prod_code):
sql = """
select
t.orderno_nosplit,
t.prod_code,
t.item_code,
sum(t.item_qty) as item_qty
from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
where t.prod_code='%s'
group by t.prod_code, t.orderno_nosplit, t.item_code """%prod
spu_result = sql_context.sql(sql)
return spu_result.cache()
if __name__ == '__main__':
spu=read_spu('6727780')
conf=0.7
trans=spu.rdd.repartition(100).map(lambda x: (x[0],x[2])).groupByKey().mapValues(list).values().cache()
model = FPGrowth.train(trans, 0.01, 100)
freq_count = model.freqItemsets().count()
print 'freq_count:',freq_count
sc.stop()
输入数据是从Hadoop读取的,数据不是很大,只有大约20000行。但是,脚本在.count
阶段的工作非常缓慢。我不知道为什么。从性能上看,似乎是因为数据倾斜。但是输出的数据不是很大(每个任务只有大约100KB)。在
集群有8个节点,320个核心,总内存为1.56t(不仅仅是一个用户)。我的spark提交脚本是spark-submit --master yarn-cluster --executor-memory 30g --num-executors 20 --executor-cores 5 FP_growth.py
附件是运行时性能的屏幕图像:
repartition(100)
看起来不是个好主意,您可以检查哪些阶段占用的时间最多。因为只有两万条记录。遣返应在每个分区将其分成200个记录。在如果数据量不是很大,你根本不需要返回。或者尝试使用40-60个分区(2或3)*执行器数量。在
相关问题 更多 >
编程相关推荐