用wss初始化两次扭曲的高速公路websocket

2024-10-01 11:19:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一些用Twisted实现的websocket协议,当我使用“ws”连接时,它们工作得很好,但是当我启用安全websockets时,__init__方法会被调用两次。更具体地说,它被调用一次,然后连接显然失败,调用connectionLost,然后再次调用它__init__,这次连接保持打开状态。在

下面的代码就是一个例子。当我与wss连接时,websocket协议的__init__中的日志行被调用两次,但普通websockets不会发生这种情况。在

导入套接字 从日期时间导入日期时间 从扭曲的互联网进口电抗器

from twisted.internet.ssl import DefaultOpenSSLContextFactory

from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory, listenWS
import txaio

txaio.use_twisted()


CERT_KEY = "certificate.key"
CERT_PATH = "certificate.crt"


def log(msg):
    print("{}: {}".format(str(datetime.now()), msg))


class TestProtocol(WebSocketServerProtocol):
    def __init__(self):
        super(TestProtocol, self).__init__()
        log("Test protocol init")

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        log("Connection closed: Reason is {}".format(reason))


class TestProtocolFactory(WebSocketServerFactory):
    protocol = TestProtocol


def init_websocket_protocol(factory_cls, port):
    try:
        key, crt = CERT_KEY, CERT_PATH
        context_factory = DefaultOpenSSLContextFactory(key, crt)
        connection_string = "wss://localhost:{}".format(str(port))
        factory = factory_cls(connection_string)
        listenWS(factory, contextFactory=context_factory)
        log("Port {} bound to test websocket server".format(str(port)))
    except socket.error as e:
        log("Server was unable to bind to a new port: ".format(str(e)))


def main():
    init_websocket_protocol(TestProtocolFactory, 9000)
    reactor.run()


if __name__ == '__main__':
    main()

Tags: importselflogformatcertinitportfactory
1条回答
网友
1楼 · 发布于 2024-10-01 11:19:36

现在推荐的API是使用端点。另外,twisted.internet.ssl.CertificateOptions是TLS连接的首选API。因此,通过这些更改,上面的代码将如下所示:

from datetime import datetime
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
from twisted.internet.ssl import CertificateOptions, PrivateCertificate, Certificate, KeyPair
from twisted.internet.endpoints import SSL4ServerEndpoint
from twisted.internet.task import react
from OpenSSL import crypto


CERT_KEY = "certificate.key"
CERT_PATH = "certificate.crt"


def log(msg):
    print("{}: {}".format(str(datetime.now()), msg))


class TestProtocol(WebSocketServerProtocol):
    def __init__(self):
        super(TestProtocol, self).__init__()
        log("Test protocol init")

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        log("Connection closed: Reason is {}".format(reason))



class TestProtocolFactory(WebSocketServerFactory):
    protocol = TestProtocol


def init_websocket_protocol(reactor, port):
    with open(CERT_KEY) as key_file, open(CERT_PATH) as cert_file:
        key = KeyPair.load(key_file.read(), crypto.FILETYPE_PEM).original
        cert = Certificate.loadPEM(cert_file.read()).original
    ctx = CertificateOptions(
        privateKey=key,
        certificate=cert,
    )
    return SSL4ServerEndpoint(reactor, port, ctx)


def main(reactor):
    ep = init_websocket_protocol(reactor, 9000)
    ep.listen(TestProtocolFactory())
    reactor.run()


if __name__ == '__main__':
    react(main)

当我运行这段代码并指向Firefox时,它会连接一次。您使用的浏览器端代码是什么样子的?在

相关问题 更多 >