我正在寻找最快的方式(就延迟而言)在两个进程之间传递事件已经发生的事实
更准确地说,我在共享内存中有一个numpy数组,其中一个进程(生产者)向数组写入更新,另一个进程(消费者)读取更新
需要使用多处理,因为我们需要克服GIL。生产者是CPU/IO密集型进程,它监听数据流并进行一些数据处理
消费者非常轻,并且大部分是空闲进程,但我们需要在生产者更新阵列时尽快唤醒消费者
还有一件事。以最小的延迟触发消费者比传输所有消息更重要。(例如,如果生产者连续发送三条消息,而消费者只收到前一条消息,然后松开后两条消息,则可以。)
为此,我尝试了多处理原语Pipe、Queue和Event,看起来它们在延迟方面几乎相同。管道是最稳定的
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v, input_pipe):
for _ in range(ITER_COUNT):
v.value = time.time()
input_pipe.send(None)
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
(ip, op) = mp.Pipe()
p = mp.Process(target=main, args=(v, ip,))
measurements = []
p.start()
i = 0
while i < ITER_COUNT:
op.recv()
measurements.append(get_mcs_diff(v.value))
i += 1
print(np.percentile(measurements, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [138. 206.1 238. 383.21]
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v, q):
for _ in range(ITER_COUNT):
v.value = time.time()
q.put(None)
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
q = mp.Queue()
p = mp.Process(target=main, args=(v, q,))
measurments = []
p.start()
i = 0
while i < ITER_COUNT:
q.get()
measurments.append(get_mcs_diff(v.value))
i += 1
print(measurments)
print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [187. 266. 299.05 444.06]
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v, e):
for _ in range(ITER_COUNT):
v.value = time.time()
e.set()
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
e = mp.Event()
p = mp.Process(target=main, args=(v, e,))
measurments = []
p.start()
i = 0
while i < ITER_COUNT:
e.wait()
measurments.append(get_mcs_diff(v.value))
i += 1
e.clear()
print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [142. 222.1 256.05 1754.77]
import multiprocessing as mp
import numpy as np
import random
import time
ITER_COUNT = 1000
def get_mcs_diff(ts):
return round((time.time() - ts) * 1e6, 0)
def main(v):
time.sleep(1)
for _ in range(ITER_COUNT):
v.value = time.time()
# print(v.value)
time.sleep((0.1 + random.random()) / 100)
if __name__ == "__main__":
v = mp.Value('d', time.time())
p = mp.Process(target=main, args=(v,))
measurments = []
p.start()
i = 0
v_prev = 0
while i < ITER_COUNT:
# print(v_prev - v.value)
if v_prev < v.value:
measurments.append(get_mcs_diff(v.value))
v_prev = float(v.value)
i += 1
print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [ 33. 65. 81. 128.05]
到目前为止,忙循环是最快的选择。但我想避免它,因为显而易见的原因
目前没有回答
相关问题 更多 >
编程相关推荐