pyspark优化tip Jaccard s

2024-09-29 17:24:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我在努力创造项目:项目相似性pyspark中的稀疏矩阵(使用Jaccard)。我有一个产品输入文件,如下所示:

{"LEVEL2_NM": "Lvl2", "product_index": 1, "LEVEL3_NM": "Lvl3_Nm_1", "LEVEL2_CD": "10001", "ATTRIBUTES": {"COLOR": ["Red", "Blue"], "SEASON": null, "BRAND": "Nike", "NEW_COLOR_CD": ["01", "02"]}}

{"LEVEL2_NM": "Lvl2", "product_index": 2, "LEVEL3_NM": "Lvl3_Nm_2", "LEVEL2_CD": "10001", "ATTRIBUTES": {"COLOR": ["Red", "Green"], "SEASON": null, "BRAND": "Adidas", "NEW_COLOR_CD": ["01", "03"]}}

这个文件大约有200MB大。最终输出如下:

^{pr2}$

我已经编写了下面的pyspark代码,对于一部分数据(~10K)运行正常,但是当我尝试对2M个记录执行它时,它将永远运行。在

from pyspark import SparkConf, SparkContext
import json
import itertools



conf = SparkConf().setMaster("local[4]").setAppName("ProductMatrix")
sc = SparkContext(conf = conf)
lines = sc.textFile("/..../product.txt")

def parseJsonLineMap(line):
    jsonVal = json.loads(line)
    return (str(jsonVal['LEVEL2_NM']),[str(jsonVal['product_index']), str(jsonVal['ATTRIBUTES']['SEASON']), str(jsonVal['ATTRIBUTES']['BRAND'])])

def jaccard_sim(p,q):
    return float(len(set(p) & set(q)))/len(set(p) | set(q))


def jaccard_dict(p,q):
    p1 = p.keys()
    q1 = q.keys()
    pq_intersect = set(p1) & set(q1)
    deno = len(pq_intersect)
    nume = 0.0
    for i in pq_intersect:
        if type(p[i]) is list:
            nume = nume + jaccard_sim(p[i],q[i])
        else:
            nume = nume + jaccard_sim([p[i]],[q[i]])
    final_jaccard = nume/deno
    return final_jaccard

rdd0 = lines.map(parseJsonLineMap)
#rdd1 = rdd0.partitionBy(100).persist()
rdd1 = rdd0.partitionBy(100, (lambda x: int(x[0]))).persist()

rdd2 = rdd1.join(rdd1)
rdd3 = rdd2.filter(lambda x: x[1][0][0] <= x[1][1][0])
rdd4 = rdd3.map(lambda x : [  x[1][0][0] , {x[1][1][0] : str(jaccard_dict(x[1][0][1], x[1][1][1]))}])
rdd5 = rdd4.reduceByKey(lambda x,y : {**x , **y}).map(lambda x: json.dumps({x[0]:x[1]}))
rdd5.saveAsTextFile(out_file)

Tags: lambdaindexcdproductattributespysparkcolorset

热门问题