Twisted:如何在将数据写入同一个套接字后从客户端套接字读取数据?

2024-10-03 13:22:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图用twisted编写一个简单的TCP服务器,它必须按顺序执行以下操作:

  1. 客户机连接到服务器,并且此连接的KEEPALIVE标志设置为1。在
  2. 服务器从客户端接收数据。在
  3. 然后计算一个列表响应。在
  4. 然后,服务器在等待来自客户端的显式ACK时逐个发送列表的每个项,即,在从列表发送单个项之后,服务器等待来自客户端的ACK包,并且只有在接收到ACK之后,才会以相同的方式发送其余项。在

代码如下:

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyProtocol(Protocol):

    def connectionMade(self):
         try:
             self.transport.setTcpKeepAlive(1)
         except AttributeError: 
             pass
         self.deferred = Deferred()
         self.deferred.addCallback(self.factory.service.compute_response)
         self.deferred.addCallback(self.send_response)

    def dataReceived(self, data):
         self.fire(data)

    def fire(self, data):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(data)

    def send_response(self, data):
        for item in data:
            d = Deferred()
            d.addCallback(self.transport.write)
            d.addCallback(self.wait_for_ack)
            d.callback(item)
        return   

    def wait_for_ack(self, dummy):
        try:
            self.transport.socket.recv(1024)
        except socket.error as e:
            print e
        return

在运行服务器和客户端时,我得到以下异常:

Resource temporarily unavailable

我理解这个异常的原因-我试图在非阻塞套接字上调用阻塞方法。在

请帮助我找到解决这个问题的办法。在


Tags: self服务器客户端列表fordataresponsedef
1条回答
网友
1楼 · 发布于 2024-10-03 13:22:47

你的例子有一些问题:

  1. 您没有在任何地方定义compute_response,因此我无法运行您的示例。考虑将其设为http://sscce.org
  2. 永远不要在Twisted传输的底层套接字上调用send或{};让Twisted为您调用这些方法。在recv的情况下,它将把recv的结果传递给dataReceived。在
  3. 您不能依赖dataReceived来接收整个消息;数据包可能always be arbitrarily fragmented in transit,因此您需要有一个框架协议来封装消息。在

然而,由于我的另一个答案是如此糟糕,我欠你一个更彻底的解释,如何设置你想做的。在

正如您的问题中所规定的,您的协议定义得不够完整,无法给出答案;您不能用原始的TCP片段来执行请求和响应,因为您的应用程序不知道它们从何处开始和结束(见上文第3点)。所以,我发明了一个小协议来为这个例子服务:这是一个以行分隔的协议,客户机发送"request foo\n",服务器立即发送"thinking...\n",计算响应,然后发送"response foo\n",等待客户机发送"ok";作为响应,服务器将发送下一个"response ..."行,或者一个"done\n"行,表示它已经完成了发送响应。在

以这作为我们的协议,我相信你的问题的关键是你不能“等待承认”,或就此而言,任何其他的,扭曲的。你需要做的是按照“当收到确认时……”的思路来实现一些东西。在

因此,当收到消息时,我们需要确定消息的类型:确认还是请求?在

  1. 如果是请求,我们需要计算一个响应;当计算完响应后,我们需要将响应的所有元素排队并发送第一个元素。在
  2. 如果是确认,我们需要检查响应的传出队列,如果它有任何内容,则发送它的第一个元素;否则,发送“done”。在

下面是一个完整的、可运行的示例,它实现了我用这种方式描述的协议:

from twisted.internet.protocol import ServerFactory
from twisted.internet.task import deferLater
from twisted.internet import reactor
from twisted.internet.interfaces import ITCPTransport
from twisted.protocols.basic import LineReceiver

class MyProtocol(LineReceiver):
    delimiter = "\n"
    def connectionMade(self):
        if ITCPTransport.providedBy(self.transport):
            self.transport.setTcpKeepAlive(1)
        self.pendingResponses = []

    def lineReceived(self, line):
        split = line.rstrip("\r").split(None, 1)
        command = split[0]
        if command == b"request":
            # requesting a computed response
            payload = split[1]
            self.sendLine("thinking...")
            (self.factory.service.computeResponse(payload)
             .addCallback(self.sendResponses))
        elif command == b"ok":
            # acknowledging a response; send the next response
            if self.pendingResponses:
                self.sendOneResponse()
            else:
                self.sendLine(b"done")

    def sendOneResponse(self):
        self.sendLine(b"response " + self.pendingResponses.pop(0))

    def sendResponses(self, listOfResponses):
        self.pendingResponses.extend(listOfResponses)
        self.sendOneResponse()

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyService(object):
    def computeResponse(self, request):
        return deferLater(
            reactor, 1.0,
            lambda: [request + b" 1", request + b" 2", request + b" 3"]
        )

from twisted.internet.endpoints import StandardIOEndpoint
endpoint = StandardIOEndpoint(reactor)

endpoint.listen(MyFactory(MyService()))
reactor.run()

我已经使它在标准I/O上可以运行,这样您就可以运行它并输入它来了解它是如何工作的;如果您想在实际的网络端口上运行它,只需用一个different type of endpoint来代替它。希望这能回答你的问题。在

相关问题 更多 >