我有以下实用程序类:
class RunningStatisticsVar:
def __init__(self, ddof=0):
self.mean = 0
self.var = 0
self.std = 0
self._n = 0
self._s = 0
self._ddof = ddof
def update(self, value):
self._n += 1
old_mean = self.mean
self.mean += (value - old_mean) / self._n
self._s += (value - old_mean) * (value - self.mean)
self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
self.std = np.sqrt(self.var)
它计算并存储(长)数字流的运行平均值和标准差。它工作得很好,但是,因为我把类放在我的个人库中,所以我想让它对并发执行非常健壮。例如,我希望能够做到以下几点:
from joblib.parallel import Parallel, delayed
def execute_and_update(var):
a = do_stuff()
var.update(a)
b, c = do_more_stuff()
var.update(b)
var.update(c)
stat = RunningStatisticsVar()
Parallel()(delayed(execute_and_update)(stat) for _ in range(1000))
并使update
调用是线程安全的
google为我提供了很多并发执行代码的方法,但是我还没有找到让类安全地并发执行的方法。在Java,IIRC中,这可以用原子方法/类来完成,但我认为Python没有
在注释之后,我已经更新了代码,但是在尝试从Parallel调用我的方法时出现了一个错误:
from joblib.parallel import Parallel, delayed
import numpy as np
from threading import Lock
class RunningStatisticsVar:
def __init__(self, ddof=0):
self.mean = 0
self.var = 0
self.std = 0
self._n = 0
self._s = 0
self._ddof = ddof
self._lock = Lock()
def update(self, value):
with self._lock:
self._n += 1
old_mean = self.mean
self.mean += (value - old_mean) / self._n
self._s += (value - old_mean) * (value - self.mean)
self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
self.std = np.sqrt(self.var)
samples = np.random.uniform(0, 100, [1000])
s1 = RunningStatisticsVar()
s2 = RunningStatisticsVar()
for i in samples:
s1.update(i)
Parallel(n_jobs=-1)(delayed(lambda x: s2.update(x))(i) for i in samples) #
print(s1.mean, s1.std)
print(s2.mean, s2.std)
尝试运行上述代码时,在标有#
的行中出现以下错误:
TypeError: can't pickle _thread.lock objects
目前没有回答
相关问题 更多 >
编程相关推荐