pysp中比较相邻记录的高效内存方法

2024-10-01 13:37:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在寻找一种内存效率高的方法来使用函数在pyspark DF中创建列,该函数将“相邻”(即,当数据集已在特定列上排序)记录的值作为参数,这些记录是特定哈希的排列。你知道吗

也就是说,对于一个特定的dataframe,它有几个‘hash\ n’(最多5个)列,我希望对每个hash列进行排序,并基于‘this’列中的hash函数以及接下来的几个(最多15个)列创建一个新列。该函数本质上是比较散列的“相似性”,如果相似性高于某个阈值,则附加“other”列的“index”。你知道吗

最初我是用一个window函数和一个pyspark UDF来实现这个功能的,但是遇到了内存不足的问题,现在我转换成RDD,通过增加索引创建一个“附近”RDD的字典,合并得到的字典值,并通过reduceByKey将函数应用到这个联合中。这种方法似乎是一种改进,尽管我仍然遇到内存问题(由于executor使用了太多内存而导致容器死机;尝试了几个设置,但都无法解决这个问题)。你知道吗

下面是我正在使用的代码的相关部分(它稍微复杂一些,因为它必须考虑到一个列的可能性,在这个列上对数据进行分区(就数据的处理方式而言——如果哈希值在同一个分区中,我们只对它们进行比较)数据“partnCol”,或者相同的列表“partnCols”;实际上,我们总是至少有一个分区列);这里的参数是nPerms,散列被置换的次数(因此我们总共有nPerms+1个散列),以及B,要查找匹配项的相邻记录数。你知道吗

HSHTBLDict = {}; rdd0Dict = {}; rddDictDict = {}
possMtchsDict = {}; mtchsDict = {}
for nn in range(nPerms+1):
    if (partnCol):
        HSHTBLDict[nn] = _addNamedDFIndex(HSHTBL.orderBy\
                            (partnCol, 'hash_{0}'.format(nn)), 'newIdx')
        rdd0Dict[nn] = HSHTBLDict[nn].select('newIdx', partnCol, '__index__', \
                                         'hash_{0}'.format(nn))\
             .orderBy(partnCol, 'hash_{0}'.format(nn)).rdd.map(tuple)\
                .map(lambda kv: ((kv[0], kv[1]), (kv[2:])))                
    elif (partnCols):
        HSHTBLDict[nn] = _addNamedDFIndex(HSHTBL.orderBy(\
                        *(partnCols+['hash_{0}'.format(nn)])), 'newIdx')
        rdd0Dict[nn] = HSHTBLDict[nn].select('newIdx', \
                    *(partnCols+['__index__', 'hash_{0}'.format(nn)]))\
             .orderBy(*(partnCols+['hash_{0}'.format(nn)])).rdd.map(tuple)\
                .map(lambda kv: ((kv[:len(partnCols)+1]), \
                                 (kv[len(partnCols)+1:])))                
    else:
        HSHTBLDict[nn] = _addNamedDFIndex(HSHTBL.orderBy(\
                                        'hash_{0}'.format(nn)), 'newIdx')
        rdd0Dict[nn] = HSHTBLDict[nn].select('newIdx', '__index__', \
                                             'hash_{0}'.format(nn))\
             .orderBy('hash_{0}'.format(nn)).rdd.map(tuple)\
                .map(lambda kv: ((kv[0], ), (kv[1:])))                                     
    rddDictDict[nn] = {}
    for b in range(1, B+1):
        def funcPos(r, b=b):
            return r.map(lambda kv: (tuple([kv[0][x] + b if x==0 else kv[0][x] \
                                        for x in range(len(kv[0]))]), kv[1]))
        rddDictDict[nn][b] = funcPos(rdd0Dict[nn])
    possMtchsDict[nn] = sc.union([rdd0Dict[nn]] + rddDictDict[nn].values())\
              .reduceByKey(lambda x,y: x+y, numPartitions=rddParts)\
              .mapValues(lambda v: tuple(v[i:i+2] \
                            for i in range(0, len(v), 2)))
    mtchsDict[nn] = possMtchsDict[nn].mapValues(lambda v: tuple([ tuple([ v[x][0],  \
                        [p[0] for p in v if _hashSim(v[x][1], p[1]) > thr]])\
                                        for x in range(len(v)) ]) )
# union together all combinations from all hash columns
Mtchs = sc.union(mtchsDict.values())
# map to rdd of (__index__, matchList) pairs
mls = Mtchs.flatMap(lambda kv: kv[1]).reduceByKey(lambda x,y: list(set(x+y)), \
                                                  numPartitions=rddParts)

如果有人有任何建议/想法,我会非常乐意听取他们的意见。你知道吗

我确实尝试过减少每个执行器的核心数,我发现虽然过程(相对来说)很慢,但我可以完成它,尽管我还没有找到一个适合我想要实现的所有nPerms/B的设置组合。为了做到这一点,可能需要重新编写代码/方法。我还想更好地理解纱线被允许用于洗牌之类的事情的记忆量是如何决定的。我发现减少执行器内存和增加火花线执行器.memoryOverhead似乎有帮助,但我仍然不清楚如何计算可用的内存量。你知道吗


Tags: lambda函数informatmapforhashnn