非阻塞fi

2024-10-01 15:38:31 发布

您现在位置:Python中文网/ 问答频道 /正文

如何在两个python进程之间创建fifo,以便在读取器无法处理输入时删除行?在

  • 如果读卡器试图以readreadline的速度写入,则应该阻止。在
  • 如果读卡器的工作速度不能达到写入器的速度,则写器不应阻塞。行不应该被缓冲(一次只有一行除外),读取器在下一次尝试readline时只接收最后一行。在

有没有可能用一个命名的fifo,或者有其他简单的方法来实现这一点?在


Tags: 方法readreadline进程读取器命名速度fifo
2条回答

下面的代码使用一个命名的FIFO来允许两个脚本之间的通信。在

  • 如果读卡器试图以read的速度比writer快,它就会阻塞。在
  • 如果读写器跟不上编写器,则编写器不会阻塞。在
  • 操作是面向缓冲区的。目前没有实现面向生产线的操作。在
  • 本规范应被视为概念证明。延迟和缓冲区大小是任意的。在

编码

import argparse
import errno
import os
from select import select
import time

class OneFifo(object):
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        if os.path.exists(self.name):
            os.unlink(self.name)
        os.mkfifo(self.name)
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if os.path.exists(self.name):
            os.unlink(self.name)

    def write(self, data):
        print "Waiting for client to open FIFO..."
        try:
            server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENXIO:
                server_file = None
            else:
                raise
        if server_file is not None:
            print "Writing line to FIFO..."
            try:
                os.write(server_file, data)
                print "Done."
            except OSError as exc:
                if exc.errno == errno.EPIPE:
                    pass
                else:
                    raise
            os.close(server_file)

    def read_nonblocking(self):
        result = None
        try:
            client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                client_file = None
            else:
                raise
        if client_file is not None:
            try:
                rlist = [client_file]
                wlist = []
                xlist = []
                rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
                if client_file in rlist:
                    result = os.read(client_file, 1024)
            except OSError as exc:
                if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
                    result = None
                else:
                    raise
            os.close(client_file)
        return result

    def read(self):
        try:
            with open(self.name, 'r') as client_file:
                result = client_file.read()
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                result = None
            else:
                raise
        if not len(result):
            result = None
        return result

def parse_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', ' client', action='store_true',
                        help='Set this flag for the client')
    parser.add_argument('-n', ' non-blocking', action='store_true',
                        help='Set this flag to read without blocking')
    result = parser.parse_args()
    return result

if __name__ == '__main__':
    args = parse_argument()
    if not args.client:
        with OneFifo('known_name') as one_fifo:
            while True:
                one_fifo.write('one line')
                time.sleep(0.1)
    else:
        one_fifo = OneFifo('known_name')
        while True:
            if args.non_blocking:
                result = one_fifo.read_nonblocking()
            else:
                result = one_fifo.read()
            if result is not None:
                print result

server检查client是否已打开FIFO。如果client已打开FIFO,server将写入一行。否则,server继续运行。我实现了一个非阻塞读取,因为阻塞读取会导致一个问题:如果server重新启动,client大部分时间都处于阻塞状态,并且永远不会恢复。使用非阻塞client,更容易容忍server重新启动。在

输出

^{pr2}$

注意事项

启动时,如果server检测到FIFO已存在,则将其删除。这是通知clients已重新启动的最简单方法。此通知通常被client的阻塞版本忽略。在

嗯,据我所知,这实际上不是一个FIFO(队列),它是一个单一变量。我想如果您设置一个最大大小为1的队列或管道,它可能是可以实现的,但是似乎在其中一个进程中的单个对象上使用^{}会更好,而另一个进程通过proxy object引用该对象。读卡器在每次读取时都会将其设置为None,而编写器则会在每次写入时重写内容。在

通过将对象的代理和锁的代理作为参数传递给所有相关进程,可以将这些传递给其他进程。为了更方便地获取它,您可以使用^{},它提供了一个带有代理的对象,您可以传入它,它包含并提供任何其他对象(包括锁)的代理。This answer提供了一个有用的示例,说明如何正确使用管理器将对象传递到新进程中。在

相关问题 更多 >

    热门问题