ThreadPoolExecutor().map与ThreadPoolExecutor().submit有何不同?

2024-05-20 11:00:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我只是被我写的一些代码弄糊涂了。我惊讶地发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

以及

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

产生不同的结果。第一个生成一个f返回的任何类型的列表,第二个生成一个concurrent.futures.Future对象的列表,然后需要用它们的result()方法对这些对象求值,以便获得f返回的值。

我主要担心的是,这意味着executor.map无法利用concurrent.futures.as_completed,这似乎是一种非常方便的方法来评估对数据库的一些长时间运行的调用的结果,这些调用在可用时我正在进行。

我根本不清楚concurrent.futures.ThreadPoolExecutor对象是如何工作的——天真地说,我更喜欢(更详细一些):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

而不是更简洁的executor.map,以便利用性能上的可能提高。我这样做是错的吗?


Tags: 对象lambdamapaswithresultiterableresults
2条回答

问题是将ThreadPoolExecutor.map的结果转换为列表。如果不这样做,而是直接在生成的生成器上迭代,结果仍按原始顺序生成,但循环在所有结果就绪之前继续。可以通过以下示例对此进行测试:

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

保持这个顺序的原因可能是因为有时候你得到的结果和你给地图的顺序一样重要。结果可能不会包装在将来的对象中,因为在某些情况下,如果需要的话,在列表上执行另一个映射可能需要太长时间才能获得所有结果。毕竟,在大多数情况下,下一个值很可能在循环处理第一个值之前就准备好了。这在本例中演示:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

在本例中,do_some_stuff可能需要比crunch_number更长的时间,如果真是这样,那么在保持map的易用性的同时,性能损失并不大。

另外,由于工作线程(/processes)从列表的开头开始处理,并一直工作到您提交的列表的末尾,因此结果应该按照迭代器已经生成的顺序完成。这意味着在大多数情况下executor.map是很好的,但是在某些情况下,例如,如果处理值的顺序无关紧要,并且传递给map的函数需要非常不同的时间来运行,那么future.as_completed可能会更快。

除了在这里的答案中的解释外,直接找到来源是有帮助的。它重申另一个答复中的声明:

  • .map()按提交结果的顺序给出结果,而
  • ^{}遍历Future对象列表不会保证这种顺序,因为这是as_completed()的本质

.map()在基类中定义,^{}

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

正如您所提到的,还有.submit(),它留在子类中定义,即ProcessPoolExecutor^{},并返回一个_base.Future实例,您需要调用该实例才能实际执行任何操作。

.map()开始的重要行可以归结为:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse()加上.pop()是获得第一个提交结果(来自iterables)的方法,该结果首先被生成,第二个提交结果其次被生成,依此类推。结果迭代器的元素不是Futures;它们本身就是实际的结果。

相关问题 更多 >