我在努力创造项目:项目相似性pyspark中的稀疏矩阵(使用Jaccard)。我有一个产品输入文件,如下所示:
{"LEVEL2_NM": "Lvl2", "product_index": 1, "LEVEL3_NM": "Lvl3_Nm_1", "LEVEL2_CD": "10001", "ATTRIBUTES": {"COLOR": ["Red", "Blue"], "SEASON": null, "BRAND": "Nike", "NEW_COLOR_CD": ["01", "02"]}}
{"LEVEL2_NM": "Lvl2", "product_index": 2, "LEVEL3_NM": "Lvl3_Nm_2", "LEVEL2_CD": "10001", "ATTRIBUTES": {"COLOR": ["Red", "Green"], "SEASON": null, "BRAND": "Adidas", "NEW_COLOR_CD": ["01", "03"]}}
这个文件大约有200MB大。最终输出如下:
^{pr2}$我已经编写了下面的pyspark代码,对于一部分数据(~10K)运行正常,但是当我尝试对2M个记录执行它时,它将永远运行。在
from pyspark import SparkConf, SparkContext
import json
import itertools
conf = SparkConf().setMaster("local[4]").setAppName("ProductMatrix")
sc = SparkContext(conf = conf)
lines = sc.textFile("/..../product.txt")
def parseJsonLineMap(line):
jsonVal = json.loads(line)
return (str(jsonVal['LEVEL2_NM']),[str(jsonVal['product_index']), str(jsonVal['ATTRIBUTES']['SEASON']), str(jsonVal['ATTRIBUTES']['BRAND'])])
def jaccard_sim(p,q):
return float(len(set(p) & set(q)))/len(set(p) | set(q))
def jaccard_dict(p,q):
p1 = p.keys()
q1 = q.keys()
pq_intersect = set(p1) & set(q1)
deno = len(pq_intersect)
nume = 0.0
for i in pq_intersect:
if type(p[i]) is list:
nume = nume + jaccard_sim(p[i],q[i])
else:
nume = nume + jaccard_sim([p[i]],[q[i]])
final_jaccard = nume/deno
return final_jaccard
rdd0 = lines.map(parseJsonLineMap)
#rdd1 = rdd0.partitionBy(100).persist()
rdd1 = rdd0.partitionBy(100, (lambda x: int(x[0]))).persist()
rdd2 = rdd1.join(rdd1)
rdd3 = rdd2.filter(lambda x: x[1][0][0] <= x[1][1][0])
rdd4 = rdd3.map(lambda x : [ x[1][0][0] , {x[1][1][0] : str(jaccard_dict(x[1][0][1], x[1][1][1]))}])
rdd5 = rdd4.reduceByKey(lambda x,y : {**x , **y}).map(lambda x: json.dumps({x[0]:x[1]}))
rdd5.saveAsTextFile(out_file)
目前没有回答
相关问题 更多 >
编程相关推荐