在python中,在两个进程之间传递消息的最快方式是什么?

2024-09-23 00:26:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在寻找最快的方式(就延迟而言)在两个进程之间传递事件已经发生的事实

更准确地说,我在共享内存中有一个numpy数组,其中一个进程(生产者)向数组写入更新,另一个进程(消费者)读取更新

需要使用多处理,因为我们需要克服GIL。生产者是CPU/IO密集型进程,它监听数据流并进行一些数据处理

消费者非常轻,并且大部分是空闲进程,但我们需要在生产者更新阵列时尽快唤醒消费者

还有一件事。以最小的延迟触发消费者比传输所有消息更重要。(例如,如果生产者连续发送三条消息,而消费者只收到前一条消息,然后松开后两条消息,则可以。)


为此,我尝试了多处理原语Pipe、Queue和Event,看起来它们在延迟方面几乎相同。管道是最稳定的

多处理。管道

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, input_pipe):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        input_pipe.send(None)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    (ip, op) = mp.Pipe()
    p = mp.Process(target=main, args=(v, ip,))
    measurements = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        op.recv()
        measurements.append(get_mcs_diff(v.value))
        i += 1

    print(np.percentile(measurements, [50, 90, 95, 99], axis=0))
    p.join()

# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [138.   206.1   238.   383.21]

多处理。队列

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, q):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        q.put(None)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    q = mp.Queue()
    p = mp.Process(target=main, args=(v, q,))
    measurments = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        q.get()
        measurments.append(get_mcs_diff(v.value))
        i += 1

    print(measurments)
    print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
    p.join()

# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [187.   266.   299.05  444.06]

多处理。事件

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, e):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        e.set()
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    e = mp.Event()
    p = mp.Process(target=main, args=(v, e,))
    measurments = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        e.wait()
        measurments.append(get_mcs_diff(v.value))
        i += 1
        e.clear()
    print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
    p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [142.  222.1  256.05  1754.77]

如果为True,则忙循环

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v):
    time.sleep(1)
    for _ in range(ITER_COUNT):
        v.value = time.time()
        # print(v.value)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    p = mp.Process(target=main, args=(v,))
    measurments = []

    p.start()
    i = 0
    v_prev = 0
    while i < ITER_COUNT:
        # print(v_prev - v.value)
        if v_prev < v.value:
            measurments.append(get_mcs_diff(v.value))
            v_prev = float(v.value)
            i += 1
    print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
    p.join()
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [ 33.    65.    81.   128.05]

到目前为止,忙循环是最快的选择。但我想避免它,因为显而易见的原因


Tags: importgettimevaluemaindefascount