我应该如何处理中的重新连接twisted.application.internet.客户端服务?

2024-10-02 22:36:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在twisted应用程序中使用recently introduced ^{}类,该应用程序使用pymodbus进行简单的modbus tcp轮询。我觉得我的问题与我正在使用的modbus协议没有任何关系,因为我已经使用较低级别的twisted API创建了许多其他工作原型;但是这个新的ClientService看起来正好符合我的需要,因此如果我能让它工作的话,它应该会减少我的代码占用并保持整洁。在

我的测试显示ClientService处理重新连接的方式与预期的一样,而且我可以轻松访问第一个连接Protocol。我遇到的问题是为重新连接获取后续的Protocol对象。以下是我遇到问题的代码的简化版本:

from twisted.application import internet, service
from twisted.internet.protocol import ClientFactory
from twisted.internet import reactor, endpoints
from pymodbus.client.async import ModbusClientProtocol

class ModbusPollingService(internet.ClientService):
    def __init__(self, addrstr, numregs=5):
        self.numregs=numregs
        internet.ClientService.__init__(self,
            endpoints.clientFromString(reactor, addrstr),
            ClientFactory.forProtocol(ModbusClientProtocol))

    def startService(self):
        internet.ClientService.startService(self)
        self._pollWhenConnected()

    def _pollWhenConnected(self):
        d = self.whenConnected()
        d.addCallback(self._connected)
        d.addErrback(self._connfail)

    def _connected(self, p):
        self._log.debug("connected: {p}", p=p)
        self._mbp = p
        self._poll()
        return True

    def _connfail(self, failstat):
        self._log.failure('connection failure', failure=failstat)
        self._mbp = None
        self._pollWhenConnected()

    def _poll(self):
        self._log.debug("poll: {n}", n=self.numregs)
        d = self._mbp.read_holding_registers(0, self.numregs)
        d.addCallback(self._regs)
        d.addErrback(self._connfail)

    def _regs(self, res):
        self._log.debug("regs: {r}", r=res.registers)
        # Do real work of dealing storing registers here
        reactor.callLater(1, self._poll)
        return res

application = service.Application("ModBus Polling Test")
mbpollsvc = ModbusPollingService('tcp:127.0.0.1:502')
mbpollsvc.setServiceParent(application)

当连接失败(无论什么原因),从read_holding_registers()返回的deferrederrback将被调用,目的是我的服务可以放弃Protocol,并返回到等待whenConnected()回调返回的新连接Protocol的状态。。。然而,似乎正在发生的是,ClientService还没有意识到连接已断开,并返回相同的断开协议,给我一个日志,其中包含:

^{pr2}$

或者非常相似,请注意重复的ModbusClientProtocol对象地址。在

我很确定我可能只是为这个API选择了一个糟糕的模式,但是我已经迭代了一些不同的可能性,比如基于ModbusClientProtocol创建我自己的Protocol和{},并完全在该类中处理轮询机制;但是传递持久配置和机制来存储有点混乱通过这种方式,在ClientService级别或更高级别处理这些数据似乎是一种更干净的方法,但我无法找到跟踪当前连接的协议的最佳方法。我想我真正想要的是一个关于在扩展轮询情况下使用ClientService类的最佳实践建议。在


Tags: fromimportselflog协议applicationdeftwisted
2条回答

你没打电话来自传输丢失连接()根据你的投票结果,我能看到的任何地方,就twisted所知,你实际上并没有断开连接。可能以后,当你停止在旧的交通工具上做任何事情时,但到那时你已经失去了对事物的了解。在

这是个老问题。但是,希望它能帮助其他人。在

The problem that I am having is getting hold of subsequent Protocol objects for the reconnections.

提供可调用的prepareConnection构造函数。它将提供电流连接。在

在下面的示例中,MyService将自身附加到MyFactory。主要原因是MyFactory可以让MyService知道ClientService何时断开连接。这是可能的,因为ClientService在断开连接时调用Factory.stopFactory。在

下次ClientService重新连接时,它将调用其提供当前协议实例的prepareConnection。在

(重新连接)客户端服务:

# clientservice.py
# twistd -y clientservice.py

from twisted.application import service, internet
from twisted.internet.protocol import Factory
from twisted.internet import endpoints, reactor
from twisted.protocols import basic
from twisted.logger import Logger


class MyProtocol(basic.Int16StringReceiver):
    _log = Logger()

    def stringReceived(self, data):
        self._log.info('Received data from {peer}, data={data}',
                       peer=self.transport.getPeer(),
                       data=data)


class MyFactory(Factory):
    _log = Logger()
    protocol = MyProtocol

    def stopFactory(self):
        # Let service know that its current connection is stale
        self.service.on_connection_lost()


class MyService(internet.ClientService):
    def __init__(self, endpoint, factory):
        internet.ClientService.__init__(self,
            endpoint,
            factory,
            prepareConnection=self.on_prepare_connection)

        factory.service = self # Attach this service to factory
        self.connection = None # Future protocol instance

    def on_prepare_connection(self, connection):
        self.connection = connection # Attach protocol to service
        self._log.info('Connected to {peer}',
                       peer=self.connection.transport.getPeer())
        self.send_message('Hello from prepare connection!')

    def on_connection_lost(self):
        if self.connection is None:
            return

        self._log.info('Disconnected from {peer}',
                       peer=self.connection.transport.getPeer())
        self.connection = None

    def send_message(self, message):
        if self.connection is None:
            raise Exception('Service is not available')

        self.connection.sendString(bytes(message, 'utf-8'))


application = service.Application('MyApplication')
my_endpoint = endpoints.clientFromString(reactor, 'tcp:localhost:22222')
my_factory = MyFactory()
my_service = MyService(my_endpoint, my_factory)
my_service.setServiceParent(application)

根据twisted示例对echo服务器稍作修改:

^{pr2}$

相关问题 更多 >