在Python中并行化数组分配

2024-10-03 13:29:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在尝试并行化整个函数,当它在main中被调用时,或者,当你在下面看到的函数的任何部分时,我都不走运,似乎我无法摆脱TypeError: function object is not iterable。谢谢你的建议。你知道吗

from joblib import Parallel, delayed
num_cores = multiprocessing.cpu_count()
parallel = Parallel(n_jobs=num_cores)
from multiprocessing import Pool
p = Pool(4)

def kmean(layerW,cluster):
    weights1d = np.reshape(layerW,-1)
    print(np.shape(weights1d))

    #Parallelizing Here
    centroids,_ = parallel(delayed(kmeans(weights1d, cluster)))
    idxs,_      = parallel(delayed(vq(weights1d,centroids)))

    #Here, using Parallel
    weights1d_q = parallel(delayed([centroids[idxs[i]] for i in range(len(weights1d))]))

    #OR --- using pool instead
    weights1d_q = p.map([centroids[idxs[i]] for i in range(len(weights1d))])
    weights4d_q  = np.reshape(weights1d_q, np.shape(layerW))
    return weights4d_q

Tags: 函数fromimportparallelnpmultiprocessingcoresnum
1条回答
网友
1楼 · 发布于 2024-10-03 13:29:29

Q : I can't get away with the TypeError: function object is not iterable

为了TypeError

TypeError由于语法错误(对joblib.Parallel( delayed( ... ) ... )的调用格式错误,不遵守有文档记录的调用语法构造函数),异常被正确抛出。你知道吗

示例1:正确的呼叫:
此调用遵循文档中的语法规范,一直到最后一点:

>>> from joblib import Parallel, delayed
>>> parallel = Parallel( n_jobs = -1 )
>>> import numpy as np
>>> parallel( delayed( np.sqrt ) ( i**2 ) for i in range( 10 ) )
#          ^  ^^^^^^^     ^^^^     ^^^^   |||
#          |  |||||||     ||||     ||||   vvv
#JOBS(-1):-+  |||||||     ||||     ||||   |||
#DELAYED:  -+++++++     ||||     ||||   |||
#FUN( par ):       ++++     ||||   |||
#     |||                          ||||   |||
#     +++-FUN(signature-"decl.") -++++   |||
#     ^^^                                 |||
#     |||                                 |||
#     +++-<<<-<iterator>-<<<-<<<-<<<-<<< +++
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

生成的结果证实了调用是完全兼容和可解释的。你知道吗

例2:打错电话:

>>> from joblib import Parallel, delayed
>>> parallel = Parallel( n_jobs = -1 )
>>> import numpy as np
>>> parallel( delayed( np.sqrt( 10 ) ) )          #### THIS SLOC IS KNOWINGLY WRONG
#          ^  ^^^^^^^     ^^^^(????)  ????   ???  ####
#          |  |||||||     ||||        ||||   vvv  ####
#JOBS(-1):-+  |||||||     ||||        ||||   |||  ####
#DELAYED:  -+++++++     ||||        ||||   |||  #### DELAYED( <float64> )
#FUN( par ):       ++++        ||||   |||  #### GOT NO CALLABLE FUN( par ) 
#     |||                             ||||   |||  ####        BUT A NUMBER
#     +++-FUN(signature-"decl.")   ++++   |||  ####        FUN( signature )
#     ^^^                                    |||  ####        NOT PRESENT
#     |||                                    |||  ####        AND FEEDER
#     +++-<<<-<iterator>-<<<-<<<-<<<-<<<-<<<-+++  #### <ITERATOR> MISSING
#                                                 ####
Traceback (most recent call last):                ####   FOR DETAILS, READ THE O/P
  File "<stdin>", line 1, in <module>             ####   AND EXPLANATION BELOW
  File ".../lib/python3.5/site-packages/joblib/parallel.py", line 947, in __call__
    iterator = iter(iterable)
TypeError: 'function' object is not iterable

结果证实,O/p使用了一种语法,与文档中的joblib.Parallel( delayed(...) ... )
不兼容 Q.E.D


补救措施:

遵循joblib.Parallel( delayed( ... ) ... )记录的语法:

#entroids, _ = parallel( delayed( kmeans(weights1d, cluster)))
#                                 ^^^^^^(..................)
#                                 ||||||(..................)
#THIS-IS-NOT-A-CALLABLE-BUT-VALUE-++++++(..................)
#
centroids, _ = parallel( delayed( kmeans ) ( weights1d, cluster ) for ... )
#                                 ^^^^^^     ^^^^^^^^^^^^^^^^^^   |||||||
#                                 ||||||     ||||||||||||||||||   vvvvvvv
# CALLABLE FUN()         ++++++     ||||||||||||||||||   |||||||
#          FUN( <signature> )        ++++++++++++++++++   |||||||
#               ^^^^^^^^^^^                                       |||||||
#               |||||||||||                                       |||||||
#               +++++++++++      <<< feeding-<iterator>  +++++++

最好的第一步:

是为了重新阅读joblib.Parallel是如何设计的以及使用模式的详细文档,以便更好地了解该工具:

joblib.Parallel( n_jobs       = None,   # how many jobs will get instantiated
                 backend      = None,   # a method, how these will get instantiated
                 verbose      = 0,
                 timeout      = None,
                 pre_dispatch = '2 * n_jobs',
                 batch_size   = 'auto',
                 temp_folder  = None,
                 max_nbytes   = '1M',
                 mmap_mode    = 'r',
                 prefer       = None,   # None | { ‘processes’, ‘threads’ }
                 require      = None    # None | ‘sharedmem’ ~CONSTRAINTS backend
                 )

接下来,您可以掌握一些琐碎的示例(并进行实验,并将其扩展到预期的用例):

      Parallel(  n_jobs = 2 ) ( delayed( sqrt ) ( i ** 2 ) for i in range( 10 ) )
      #          ^              ^^^^^^^  ^^^^     ^^^^^^   |||
      #          |              |||||||  ||||     ||||||   vvv
      #JOBS:  -+              |||||||  ||||     ||||||   |||
      #DELAYED:        -+++++++  ||||     ||||||   |||
      #FUN( par ):           -++++     ||||||   |||
      #     |||                                   ||||||   |||
      #     +++ FUN(-signature-"declaration"-) -++++++   |||
      #     ^^^                                            |||
      #     |||                                            |||
      #     +++-<<<-<iterator>-<<<-<<<-<<<-<<<-<<<-<<<-<<<-+++

      Parallel(  n_jobs = -1 ) ( 
                 delayed( myTupleConsumingFUN ) ( # aFun( aTuple = ( a, b, c, d ) )
                           aTupleOfParametersGeneratingFUN( i ) )
                 for                                        i in range( 10 )
                 )

下一步:尝试理解使用n_jobs实例化的成本和限制

The default backend of joblib will run each function call in isolated Python processes, therefore they cannot mutate a common Python object defined in the main program.

However if the parallel function really needs to rely on the shared memory semantics of threads, it should be made explicit with require='sharedmem'

Keep in mind that relying a on the shared-memory semantics is probably suboptimal from a performance point of view as concurrent access to a shared Python object will suffer from lock contention.

使用基于线程的后端允许“共享”,但这意味着要付出巨大的成本-线程重新引入GIL步进,这将以GIL锁步进的方式将代码执行流重新注入到一个接一个的执行流中。对于计算密集型处理,其性能比原始的纯代码差(虽然这种模式有助于延迟掩蔽用例,在这种情况下,等待网络响应可能会允许线程释放GIL锁并让其他线程继续工作)

有一些步骤,人们可以实现,以便使单独的基于进程的计算能够传达这样的需求,然而,这需要一些附加成本。你知道吗

计算密集型问题必须平衡对最终性能的需求(使用更多的核心),同时考虑到只有一个独立的(分离的)工作单元,以及参数传输和结果返回的最小附加成本,所有这些都很容易增加成本,而不是一个错误的设计意图来利用可用的进程调度形式。你知道吗

更多信息details on ^{}

更多关于并行加速的含义

相关问题 更多 >