Twisted Python中从传输读取的模式

2024-10-06 12:09:01 发布

您现在位置:Python中文网/ 问答频道 /正文

在Twisted Python中,数据被写入协议的传输,但是通过覆盖dataReceived方法来接收。从交通工具上读取数据有规律吗?当使用inlineCallbacks实现状态时,这会很有帮助

例如:

class SomeProtocol(Protocol):
    @defer.inlineCallbacks
    def login(self):
        self.transport.write('login')
        resp = yield self.transport.read(5, timeout=1) # this doesn't exist
        if resp != 'user:':
            raise SomeException()
        self.transport.write('admin')
        resp = transport.read(9, timeout=1)
        if resp != 'password:':
            raise SomeException()
        self.transport.write('hunter2')
        # ... etc

Tags: 数据self协议readiftimeouttwistedlogin
2条回答

这些年来,已经有几次尝试实现这样的api。没有人获得任何吸引力。我想他们现在都被抛弃了。你知道吗

原则上,这并不难实现。您只是将dataReceived回调(push样式的API)转换为pull样式的API。你知道吗

实际上,生成的代码是脆弱的,并且往往包含更多的bug。你知道吗

我认为您试图解决的问题是dataReceived是解析字节流的非常低级的原语。你知道吗

有许多可能的解决办法。您可以尝试构建一个更高级别的基于协议的工具,它了解协议的各个方面(Twisted中的所有协议实现基本上都是这样做的)。您还可以查看像tubes这样的第三方库(它为处理字节流提供了不同的抽象)。你知道吗

最后,当数据到达时,我维护了一个推迟回调的列表,并缓冲传入的数据,直到它满足列表中第一个推迟回调所需的数据长度。你知道吗

class SomeProtocol(Protocol):

    # initialise self.buf and self.readers in __init__

    def deferred_read(self, count, timeout=None):
        """Return a deferred that fires when data becomes available"""
        d = defer.Deferred()
        reader = [d, count]
        timeout_cb = None
        if timeout is not None:
            timeout_cb = self.reactor.callLater(timeout, self.deferred_read_timeout, reader)
        reader.append(timeout_cb)
        self.readers.append(reader)
        self.check_readers()
        return d

    def deferred_read_timeout(self, reader):
        """Timeout this reader and check if others now match"""
        d, count, timeout_cb = reader
        self.readers.remove(reader)
        d.errback(TimeoutException()) # defined elsewhere
        self.check_readers()

    def check_readers(self):
        """Check if there is enough data to satisfy first reader"""
        try:
            while 1:
                reader = self.readers[0]
                d, count, timeout_cb = reader
                if len(self.buf) < count:
                    break
                data = self.buf[:count]
                self.buf = self.buf[count:]
                self.readers.remove(reader)
                try:
                    timeout_cb.cancel()
                except: pass
                d.callback(data)
        except IndexError: pass

    def dataReceived(self, data):
        self.buf += data
        self.check_readers()

它当前要求count为非零。最好扩展它以支持返回当前在读取缓冲区中的内容,并在有超时但不计数的情况下读取,以便在超时后返回缓冲区中的内容。你知道吗

相关问题 更多 >