如何检测多处理.管道满了吗?

2024-10-01 13:27:25 发布

您现在位置:Python中文网/ 问答频道 /正文

问题描述:我正在用Python进行多处理,并使用多处理.管道()在进程之间进行通信。我已经找了很多次,但还是找不到一个方法来检测管道是否满了。例如下面的例子,writePipe进程不断地将数字放入两个不同的管道(奇数和偶数),而readPipe进程则不断地从这两个管道中读取。但是,从奇数管道读取的速度要快得多,因此偶数管道将充满。此时,writePipe进程将被阻塞,而readPipe进程仍在等待从奇数管道读取,从而导致死锁。在

我的问题:有什么方法可以检测到管道已满,这样我们就可以停止在管道中输入数字,而将数字输入到仍有空格的管道中?在

from multiprocessing import Process, Pipe


def writePipe(sendNumberOdd, sendNumberEven):
    i = 0
    while True:
        if i % 2 == 0:
            sendNumberEven.send(i)
        else:
            sendNumberOdd.send(i)
        i += 1

def readPipe(recvNumberOdd, recvNumberEven):
    countEven = 0
    while True:
        countEven += 1
        print(countEven, recvNumberEven.recv())

        countOdd = 0
        while countOdd < 50:
            countOdd += 1
            print (countOdd, recvNumberOdd.recv())



if __name__ == '__main__':
    recvNumberOdd, sendNumberOdd = Pipe(duplex=False)
    recvNumberEven, sendNumberEven = Pipe(duplex=False)

    write = Process(target=writePipe, args=(sendNumberOdd, sendNumberEven))
    read = Process(target=readPipe, args=(recvNumberOdd, recvNumberEven))
    write.start()
    read.start()

    sendNumberOdd.close()
    sendNumberEven.close()

Tags: 管道进程数字process奇数pipewhilecountodd
2条回答

您可以使用select模块中的^{}函数来实现输出管道是否已满的测试。在

import select
import multiprocessing.connection as mpc


def pipe_full(conn):
    r, w, x = select.select([], [conn], [], 0.0)
    return 0 == len(w)


i, o = mpc.Pipe(duplex=False)

n = 0
while not pipe_full(o):
    o.send(n)
    n += 1

print('{} items fit.'.format(n))

Proposal UNTESTED

class Connection(multiprocessing.Connection):
    def __init__(self, maxsize=0):
        self.__maxsize = maxsize
        self.size = 0
        self.__lock = multiprocessing.Lock

    def send(self, obj):
        with self.__lock:
            self.size += sizeof(obj)
        super().send(obj)

    def recv(self):
        _recv = super().recv()
        with self.__lock:
            self.size -= sizeof(_recv)
        return _recv

    def full(self):
        if self.__maxsize > 0:
            return self.size >= self.__maxsize
        return None

def Pipe(maxsize=0, duplex=True):
    return Connection(maxsize), Connection(maxsize)

实现poll()来检查是否有任何数据准备就绪。在

Python » Documentation: poll([timeout])

Return whether there is any data available to be read.  

例如:

if recvNumberEven.poll():
    countEven += 1
    print(countEven, recvNumberEven.recv())

两者的替代使用wait(...)

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready.  
Returns the list of those objects in object_list which are ready.

相关问题 更多 >