(记忆有效)将“排序”作为生成器实现

2024-10-04 17:14:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我想这不是一个特别新的主题,我想有比我更好的实现:我正在寻找(a)我正在处理的算法类型-它的实际名称或类似名称-和(b)可能更好的实现

一般的问题是:想象一个列表a,它非常长-太长,无法多次放入内存。该列表包含允许排序的事物的“随机”序列(<>==正在工作)。我想按升序遍历列表中的所有条目,包括重复条目,但不复制列表或生成类似“极端”长度的任何内容。我还希望保持a中条目的原始顺序,即,排除就地sort。因此,我基本上希望最小化排序所需的内存,同时不修改原始数据源

Python的sorted不会触及原始数据,而是生成一个新列表,其大小/长度与原始列表相同。因此,我的基本想法是将sorted重新实现为一个生成器:

def sorted_nocopy_generator(data_list):
    state_max = max(data_list)
    state = min(data_list)
    state_count = data_list.count(state)
    for _ in range(state_count):
        yield state
    index = state_count
    while index < len(data_list):
        new_state = state_max
        for entry in data_list:
            if state < entry < new_state:
                new_state = entry
        state = new_state
        state_count = data_list.count(state)
        for _ in range(state_count):
            yield state
        index += state_count

它可以按如下方式进行测试:

import random

a_min = 0
a_max = 10000
a = list(range(a_min, a_max)) # test data
a.extend((random.randint(a_min, a_max - 1) for _ in range(len(a) // 10))) # some double entries
random.shuffle(a) # "random" order
a_control = a.copy() # for verification that a is not altered

a_test_sorted_nocopy_generator = list(sorted_nocopy_generator(a))
assert a == a_control
a_test_sorted = sorted(a)
assert a == a_control
assert a_test_sorted == a_test_sorted_nocopy_generator

它的标度为O(N^2),就像bubblesort那样。我在寻找什么样的算法?如何优化这件事(可能通过交易一些内存)


Tags: intest列表newfordatacountrange
2条回答

假设您在一个单独的列表中有k事物的内存。然后,以下内容将像O((n + n^2/k) log(k))一样缩放。在k常数的极端情况下,它是二次的,如果kn的固定分数,那么它是O(n log(n))

其思想是为尚未返回的k最小的东西创建一个缓冲区。将它转换为一个最小堆,然后使用堆操作在每个返回的元素O(log(k))的时间内从中返回内容。当缓冲区为空时,重新填充缓冲区,然后再次像以前一样继续操作。(缓冲区开始为空。)

要创建缓冲区,请扫描数组,放入比上次返回的值大的值,直到到达缓冲区中的k个值为止。然后将它转换为一个最大堆,当堆中的内容大于返回的内容,小于已经存在的内容时,替换堆中的内容。重新填充后,将其重新组织为最小堆

您需要重新填充缓冲区n/k次。每次都是O(n log(k))操作的最坏情况。(如果k << n且数组是随机顺序的,则平均运行时间为O(n),因为几乎所有内容都与堆中的最大值进行比较,然后丢弃。)

重要的边缘情况:如果您有重复的,您需要考虑这样一种情况,即您的缓冲区只包含最大事物的一些副本。一种方法是跟踪堆中应该有多少个最大对象的副本,但实际上没有存储在那里。这样,在清空堆之后,也可以返回所有丢失的副本,然后继续扫描较大的元素

在这里画草图。其中N = len(data_list)S = sqrt(N)使用O(S)额外内存并占用最坏情况下的O(N*log(N))时间:

  • 对于原始数据中长度为S的每个连续切片,将该切片复制到临时列表中,使用.sort()对其进行适当排序,然后将结果写入唯一的临时文件。总共将有大约S个临时文件

  • 将这些临时文件馈送到heapq.merge()。这是一个生成器,只跟踪当前跨S输入的S最小值,因此此部分也有O(S)内存负担

  • 删除临时文件

您可以使用的内存越多,所需的临时文件就越少,运行速度也就越快

削减常数因子

正如评论中所指出的,次二次时间算法的希望渺茫。但是,您可以通过减少数据的传递次数来减少原始算法中的常数因子。这里有一种方法,在每次传递数据时生成下一个K条目。不过,总的来说,它仍然是二次时间

def sorted_nocopy_generator(data_list, K=100):
    import itertools
    from bisect import insort

    assert K >= 1
    total = 0
    too_small = None

    while total < len(data_list):
        active = [] # hold the next K entries
        entry2count = {}
        for entry in data_list:
            if entry in entry2count:
                entry2count[entry] += 1
            elif ((too_small is None or too_small < entry) and
                  (len(active) < K or entry < active[-1])):
                insort(active, entry)
                entry2count[entry] = 1
                if len(active) > K: # forget the largest
                    del entry2count[active.pop()]
        for entry in active:
            count = entry2count[entry]
            yield from itertools.repeat(entry, count)
            total += count
        too_small = active[-1]

消除最坏的情况

正如@btilly的回答一样,上面代码中最糟糕的情况可以通过使用max堆来避免。然后将新条目添加到active具有最坏情况时间O(log(K)),而不是O(K)

幸运的是,heapq模块已经提供了一些可用于此目的的东西。但是,处理重复数据就成了一件令人头痛的事情——没有公开max heapheapq所使用的隐藏式堆

因此,下面的特殊情况是感兴趣的最小K项中的最大项,使用.count()(如在原始程序中)进行完整传递以计算有多少个

但是,它不需要对每个惟一元素执行该操作,而只需要对每个K元素执行一次

额外内存使用与K成正比

def sorted_nocopy_generator(data_list, K=100):
    import itertools
    from heapq import nsmallest

    assert K >= 1
    too_small = None
    ntodo = len(data_list)

    while ntodo:
        if too_small is None:
            active = nsmallest(K, data_list)
        else:
            active = nsmallest(K, (x for x in data_list
                                     if x > too_small))
        too_small = active[-1]
        for x in active:
            if x == too_small:
                break
            yield x
            ntodo -= 1
        count = data_list.count(too_small)
        yield from itertools.repeat(too_small, count)
        ntodo -= count
        assert ntodo >= 0

相关问题 更多 >

    热门问题