我目前正在把数据分析和诊断套件搬到达斯克
程序的结构是这样的:主进程分配传入的工作 给一群工人的包裹。工人们根据这些数据计算各种诊断
一个简单的电话可能看起来像:
def calculate(self, dask_client, fft_future):
def coherence(fft_data, ch_it):
"""Kernel that calculates the coherence between two channels.
Input:
======
fft_data: ndarray, float: Contains the fourier-transformed data.
dim0: channel, dim1: Fourier Coefficients. dim2: STFT bins
ch_it: iterable, Iterator over a list of channels we wish to perform our computation on
Returns:
========
coherence, float.
"""
import numpy as np
c1_idx = np.array([cc[0].ch1.idx() for cc in ch_it])
c2_idx = np.array([cc[0].ch2.idx() for cc in ch_it])
X = fft_data[c1_idx, :, :]
Y = fft_data[c2_idx, :, :]
Pxx = X * X.conj()
Pyy = Y * Y.conj()
Gxy = np.mean((X * Y.conj()) / np.sqrt(Pxx * Pyy), axis=1)
Gxy = np.fabs(Gxy).real
return(Gxy)
self.futures_list = [dask_client.submit(coherence, fft_future, ch_iter) for ch_iter in self.get_dispatch_sequence()]
在上面的代码中,numpy是在函数“coherence”中导入的。我想加速 当我创建worker时,通过导入numpy执行。这可能吗
我相信Numpy在默认情况下已经被导入了,如果它在导入
dask.distributed
时存在的话不过,一般来说,如果您想触发代码在辅助进程启动时运行,这里会提到各种机制:https://docs.dask.org/en/latest/setup/custom-startup.html
相关问题 更多 >
编程相关推荐