如何在功能上构建期货?

2024-06-17 02:19:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个线程对象,无法在ProcessPoolExecutor中分发,但希望返回未来。如果我已经有了未来,有没有一种方法可以应用于它的完成值,例如,Future a -> (a -> b) -> Future b

import concurrent.futures
import threading

def three(x):
    return 2+x


if __name__ == '__main__':
    trackedItem = (3, threading.Event())
    pool = concurrent.futures.ProcessPoolExecutor(3)
    poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
    *** something magic goes here ***
    #Trying to transform it into Future(int,Event)

Tags: 对象方法importeventreturndeffuture线程
2条回答

@kaya3提供了一个很好的答案,但我在为其添加异常处理以关闭池时遇到了问题。您可以在下面找到我的示例cpchung_example,以了解如何在功能上组合future。它仍然需要添加异常处理,我还没有一个好的解决方案

为了进行比较,我将它们全部放在一个文件中:



from concurrent.futures import ProcessPoolExecutor, Future
from concurrent.futures.thread import ThreadPoolExecutor


def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        try:
            x = f.result()
            y = func(x)
            future_b.set_result(y)
        except Exception as e:

            future_b.set_exception(e)

    future_a.add_done_callback(callback)
    return future_b


def func_a(x):
    return 2 + x


def func_b(x):
    return 3 * x


def func_c(x):
    raise NameError('Hi There')
    return 4 * x


def kaya3_example():
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result())  # 50


def exception_handling():
    try:

        future_a = pool.submit(func_a, 3)
        future_b = map_future(future_a, func_b)
        future_c = map_future(future_b, func_c)

        print(future_c.result())

    except Exception as e:
        pool.shutdown()

    pool.shutdown()


def f(x, y):
    return x * y


def cpchung_example():
    with ThreadPoolExecutor(max_workers=1) as executor:
        a = executor.submit(f, 2, 3)
        b = executor.submit(f, 4, 5)
        c = executor.submit(f, a.result(), b.result())

        print(c.result())


if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)

    kaya3_example()

    cpchung_example()

    # exception_handling()  # not working, still wip

这里有一种使用更简单的设置代码的方法,没有threading.Event,因为解决问题似乎不需要这样做。基本上,您可以自己创建future_b作为一个新的Future(),并使用future_a上的add_done_callback方法设置future_b的结果。这里,func_a是计算future_a的结果的计算,func_b是使用future_a的结果计算future_b的结果的计算

from concurrent.futures import ProcessPoolExecutor, Future

def func_a(x):
    return 2 + x

def func_b(x):
    return 10 * x

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result()) # 50

如果您想要一个助手函数来完成这项工作,您可以编写一个:map_future接受一个future和一个mapping函数,并根据需要返回新的映射future。此版本在f.result()func_b抛出异常时处理异常:

def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        try:
            x = f.result()
            y = func(x)
            future_b.set_result(y)
        except Exception as e:
            future_b.set_exception(e)

    future_a.add_done_callback(callback)
    return future_b

警告:这与Future类文档中的建议背道而驰,文档中说:

Future instances are created by Executor.submit() and should not be created directly except for testing.

此外,如果您在回调中有任何不是Exception子类的错误,根据文档,它们将被“记录并忽略”。为了简单起见,我选择在这段代码中只捕获Exception,但是您可能更喜欢捕获所有可能引发的事件的sys.exc_info()[0]方式

相关问题 更多 >