Python:如何在Python中运行嵌套并行进程?

2024-10-01 15:39:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个交易员交易的数据集df。 我有两个级别的for循环,如下所示:

smartTrader =[]

for asset in range(len(Assets)):
    df = df[df['Assets'] == asset]
    # I have some more calculations here
    for trader in range(len(df['TraderID'])):
        # I have some calculations here, If trader is successful, I add his ID  
        # to the list as follows
        smartTrader.append(df['TraderID'][trader])

    # some more calculations here which are related to the first for loop.

我想对Assets中的每种资产的计算进行并行化,并且我还想对每种资产的每个交易员的计算进行并行化。在所有这些计算完成之后,我想根据smartTrader列表进行额外的分析。在

这是我第一次尝试并行处理,所以请耐心等待,我感谢您的帮助。在


Tags: indfforlenheremorehaverange
3条回答

嵌套并行可以用Ray优雅地完成,这个系统允许您轻松地并行化和分发Python代码。在

假设要并行化以下嵌套程序

def inner_calculation(asset, trader):
    return trader

def outer_calculation(asset):
    return  asset, [inner_calculation(asset, trader) for trader in range(5)]

inner_results = []
outer_results = []

for asset in range(10):
    outer_result, inner_result = outer_calculation(asset)
    outer_results.append(outer_result)
    inner_results.append(inner_result)

# Then you can filter inner_results to get the final output.

下面是与上述代码并行的Ray代码:

  • 对于要在其自身进程中同时执行的每个函数,请使用@ray.remote decorator。远程函数返回未来(即结果的标识符),而不是结果本身。在
  • 当调用远程函数f()时,remote修饰符,即f.remote()
  • 使用ids_to_vals()助手函数将id的嵌套列表转换为值。在

注意程序结构是相同的。您只需要添加remote,然后使用ids_to_vals()助手函数将远程函数返回的futures(id)转换为值。在

^{pr2}$

multiprocessing模块相比,使用Ray有许多优点。特别是,相同的代码将在一台机器上运行,也可以在多台机器上运行。有关Ray的更多优点,请参见this related post。在

如果使用pathos,它提供了multiprocessing的分支,那么可以轻松地嵌套并行映射。pathos是为了方便地测试嵌套并行映射的组合而构建的,这些映射是嵌套for循环的直接转换。 它提供了一系列映射,这些映射是分块的、非阻塞的、迭代的、异步的、串行的、并行的和分布式的。在

>>> from pathos.pools import ProcessPool, ThreadPool
>>> amap = ProcessPool().amap
>>> tmap = ThreadPool().map
>>> from math import sin, cos
>>> print amap(tmap, [sin,cos], [range(10),range(10)]).get()
[[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385, -0.27941549819892586, 0.6569865987187891, 0.9893582466233818, 0.4121184852417566], [1.0, 0.5403023058681398, -0.4161468365471424, -0.9899924966004454, -0.6536436208636119, 0.2836621854632263, 0.9601702866503661, 0.7539022543433046, -0.14550003380861354, -0.9111302618846769]]

在这里,这个例子使用一个处理池和一个线程池,其中线程映射调用是阻塞的,而处理映射调用是异步的(注意最后一行末尾的get)。在

获取pathos此处:https://github.com/uqfoundation 或与: $ pip install git+https://github.com/uqfoundation/pathos.git@master

标准python库中的线程化可能是最方便的方法:

import threading

def worker(id):
    #Do you calculations here
    return

threads = []
for asset in range(len(Assets)):
    df = df[df['Assets'] == asset]
    for trader in range(len(df['TraderID'])):
        t = threading.Thread(target=worker, args=(trader,))
        threads.append(t)
        t.start()
    #add semaphore here if you need synchronize results for all traders.

相关问题 更多 >

    热门问题