python中数组行相似性计算的并行化

2024-04-24 02:46:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个大的ish数组artist_topic_probs(112312个项目行乘以~100个特征列),我想计算这个数组中随机行对(大样本)之间的成对余弦相似性。以下是我当前代码的相关部分

# the number of random pairs to check (10 million here)
random_sample_size=10000000

# I want to make sure they're unique, and that I'm never comparing a row to itself
# so I generate my set of comparisons like so:
np.random.seed(99)
comps = set()
while len(comps)<random_sample_size:
    a = np.random.randint(0,112312)
    b= np.random.randint(0,112312)
    if a!=b:
        comp = tuple(sorted([a,b]))
        comps.add(comp)
# convert to list at the end to ensure sort order 
# not positive if this is needed...I've seen conflicting opinions
comps = list(sorted(comps))

这将生成一个元组列表,其中每个元组是两行,我将计算它们之间的相似度。然后我就用一个简单的循环来计算所有的相似性:

^{pr2}$

(当然,cosine这里给出了距离,而不是一个相似性,但是我们可以很容易地用sim = 1.0 - dist得到。我在标题中使用了相似性,因为这是更常见的术语)

这个方法很好,但是不太快,我需要重复这个过程很多次。我有32个内核要处理,所以并行化似乎是个不错的选择,但我不确定最好的方法。我的想法是:

pool = mp.Pool(processes=32)
c_dists = [pool.apply(cosine, args=(artist_topic_probs[a],artist_topic_probs[b])) 
    for a,b in comps]

但是用一些测试数据在我的笔记本电脑上测试这种方法并没有起作用(它只是挂起,或者至少比简单的循环花费了太多的时间,以至于我厌倦了等待并终止了它)。我担心的是矩阵的索引是某种瓶颈,但我不确定。关于如何有效地并行化(或以其他方式加快进程)有什么想法?在


Tags: ofthetosample方法sizetopicso
2条回答

^{},正如您在下面的链接中看到的,它在计算中引入了一个重要的开销,因为对于每个调用,它都会根据样本的大小计算您在每次调用时分析的两个向量的范数 这相当于计算了2000万个范数,如果你提前记住了你大约10万个向量的范数,你可以节省大约60%的计算时间,因为你有一个点积,u*v,和两个范数计算,这三个运算在运算数上大致相等。在

此外,你使用显式循环,如果你能把你的逻辑放在一个向量化的numpy操作符中,你就可以再削减一大块计算时间。在

最后,你会谈到余弦相似性。。。假设scipy.spatial.distance.cosine计算的是余弦距离,关系很简单,cs = cd - 1但我在您发布的代码中没有看到这一点。在

首先,您可能希望将来使用itertools.combinationsrandom.sample来获得唯一对,但是由于内存问题,在这种情况下它将无法工作。因此,多处理不是多线程,也就是说,产生一个新的进程涉及到巨大的系统开销。为每个单独的任务生成一个进程没有什么意义。一个任务必须是值得的,以合理地开始一个新的过程,因此你最好把所有的工作分成单独的工作(分成你想要使用的核心数量的多个部分)。然后,别忘了multiprocessing实现序列化了整个名称空间并将其加载到内存中N次,其中N是进程数。如果没有足够的RAM来存储庞大阵列的N个拷贝,这可能导致密集的交换。所以你可以减少核心的数量。在

已更新以按照您的要求恢复初始订单。在

我做了一个相同向量的测试数据集,因此cosine必须返回一个零向量。在

from __future__ import division, print_function
import math
import multiprocessing as mp
from scipy.spatial.distance import cosine
from operator import itemgetter
import itertools


def worker(enumerated_comps):
    return [(ind, cosine(artist_topic_probs[a], artist_topic_probs[b])) for ind, (a, b) in enumerated_comps]


def slice_iterable(iterable, chunk):
    """
    Slices an iterable into chunks of size n
    :param chunk: the number of items per slice
    :type chunk: int
    :type iterable: collections.Iterable
    :rtype: collections.Generator
    """
    _it = iter(iterable)
    return itertools.takewhile(
        bool, (tuple(itertools.islice(_it, chunk)) for _ in itertools.count(0))
    )


# Test data
artist_topic_probs = [range(10) for _ in xrange(10)]
comps = tuple(enumerate([(1, 2), (1, 3), (1, 4), (1, 5)]))

n_cores = 2
chunksize = int(math.ceil(len(comps)/n_cores))
jobs = tuple(slice_iterable(comps, chunksize))

pool = mp.Pool(processes=n_cores)
work_res = pool.map_async(worker, jobs)
c_dists = map(itemgetter(1), sorted(itertools.chain(*work_res.get())))
print(c_dists)

输出:

^{pr2}$

这些值相当接近于零。在

p.S

来自multiprocessing.Pool.apply文档

Equivalent of the apply() built-in function. It blocks until the result is ready, so apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

相关问题 更多 >