扭曲得到密码

2024-09-30 20:29:24 发布

您现在位置:Python中文网/ 问答频道 /正文

from twisted.internet.protocol import ClientFactory
from twisted.internet.protocol import Protocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import returnValue
from twisted.internet.ssl import CertificateOptions
from twisted.internet.ssl import AcceptableCiphers
from ssl import PROTOCOL_SSLv23
from ssl import DER_cert_to_PEM_cert
from OpenSSL.crypto import FILETYPE_PEM
from OpenSSL.crypto import load_certificate
import time
import json

normalCyphers = AcceptableCiphers.fromOpenSSLCipherString(
    'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
    'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES:!aNULL:'
    '!eNULL:!MD5'
)
normalCtxFac = CertificateOptions(acceptableCiphers=normalCyphers, method=PROTOCOL_SSLv23)

weakCiphers = AcceptableCiphers.fromOpenSSLCipherString('ALL:!aNULL:!eNULL')
weakCtxFac = CertificateOptions(acceptableCiphers=weakCiphers, method=PROTOCOL_SSLv23)


def asn1DateToTimestamp(asn1Date):
    expirationDate = time.strptime(asn1Date[:8], '%Y%m%d')
    return int(time.mktime(expirationDate))


class CertCheckProtocol(Protocol):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def connectionMade(self):
        reactor.callLater(0.01, self.getCert, 20)

    def getCert(self, depth):
        cert = self.transport.getPeerCertificate()
        transportHandle = self.transport.getHandle()
        if cert is None or transportHandle is None:
            if depth <= 0:
                self.transport.loseConnection()
                return
            reactor.callLater(0.01, self.getCert, depth - 1)
        else:
            cipherName = transportHandle.get_cipher_name()
            key = DER_cert_to_PEM_cert(cert)
            targetCert = load_certificate(FILETYPE_PEM, key)
            timestamp = asn1DateToTimestamp(targetCert.get_notAfter())
            expiresIn = timestamp - time.time()
            try:
                usedCipher = '  '.join(map(str, cipherName))
            except Exception:
                usedCipher = str(cipherName)

            self.dfd.callback({
                'name': 'certificate',
                'expiresIn': expiresIn,
                'sha1Digest': targetCert.digest('sha1'),
                'signatureAlgorithm': targetCert.get_signature_algorithm(),
                'issuer': targetCert.get_issuer().CN,
                'notAfter': timestamp,
                'notBefore': asn1DateToTimestamp(targetCert.get_notBefore()),
                'serialNumber': targetCert.get_serial_number(),
                'subject': targetCert.get_subject().CN,
                'sslVersion': targetCert.get_version(),
                'usedCipher': usedCipher,
                'weakCipher': self.isWeakSsl
            })

    def connectionLost(self, reason):
        if not self.dfd.called:
            self.dfd.errback(Exception('Connection lost'))


class CertCheckFactory(ClientFactory):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def clientConnectionFailed(self, connector, reason):
        self.dfd.errback(reason)

    def buildProtocol(self, addr):
        return CertCheckProtocol(self.dfd, self.isWeakSsl)


@inlineCallbacks
def getCertificateInfo(ip, port=443):
    dfd = Deferred()
    factory = CertCheckFactory(dfd, isWeakSsl=False)
    reactor.connectSSL(ip, int(port), factory, contextFactory=normalCtxFac)
    try:
        res = yield dfd
    except Exception as ex:
        if hasattr(ex, 'reason') and 'HANDSHAKE_FAILURE' in ex.reason:
            dfd = Deferred()
            factory = CertCheckFactory(dfd, isWeakSsl=True)
            reactor.connectSSL(ip, int(port), factory, contextFactory=weakCtxFac)
            res = yield dfd
        else:
            raise
    returnValue(res)


@inlineCallbacks
def testit(ip):
    res = yield getCertificateInfo(ip)
    print json.dumps(res)
    reactor.stop()

if __name__ == '__main__':
    testit('x.x.x.x')
    reactor.run()

我不确定抓住握手失败是否是正确的扭曲。仍然需要使用密码较弱的服务器来测试该部分。你知道吗

这是stacktrace,表示传输句柄的self.\u socket为none

  File "C:\Python27\lib\site-packages\twisted\internet\base.py", line 825, in runUntilCurrent
    call.func(*call.args, **call.kw)
  File "C:\Users\sjuul\workspace\meuk\soCertQuestion.py", line 50, in getCert
    cipherName = transportHandle.get_cipher_name()
  File "C:\Python27\lib\site-packages\OpenSSL\SSL.py", line 838, in __getattr__
    return getattr(self._socket, name)
exceptions.AttributeError: 'NoneType' object has no attribute 'get_cipher_name'

Tags: namefromimportselfgetcerttimedef
1条回答
网友
1楼 · 发布于 2024-09-30 20:29:24

它不是完全公开的-请随意在Twisted上提交一个bug-但是您可以通过pyopensslapi转义舱口获得它,使用self.transport.getHandle().get_cipher_name()。你知道吗

当我修改您的示例以从标准库ssl和pyOpenSSL OpenSSL模块中删除虚假导入时,它运行良好,并告诉我谷歌网站正在使用ECDHE-RSA-AES128-GCM-SHA256

from twisted.internet.protocol import ClientFactory
from twisted.internet.protocol import Protocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import returnValue
from twisted.internet.ssl import CertificateOptions
from twisted.internet.ssl import AcceptableCiphers

import time
import json

normalCyphers = AcceptableCiphers.fromOpenSSLCipherString(
    'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
    'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES:!aNULL:'
    '!eNULL:!MD5'
)
normalCtxFac = CertificateOptions(acceptableCiphers=normalCyphers)

weakCiphers = AcceptableCiphers.fromOpenSSLCipherString('ALL:!aNULL:!eNULL')
weakCtxFac = CertificateOptions(acceptableCiphers=weakCiphers)


def asn1DateToTimestamp(asn1Date):
    expirationDate = time.strptime(asn1Date[:8], '%Y%m%d')
    return int(time.mktime(expirationDate))


class CertCheckProtocol(Protocol):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def connectionMade(self):
        reactor.callLater(0.01, self.getCert, 20)

    def getCert(self, depth):
        cert = self.transport.getPeerCertificate()
        transportHandle = self.transport.getHandle()
        if cert is None or transportHandle is None:
            if depth <= 0:
                self.transport.loseConnection()
                return
            reactor.callLater(0.01, self.getCert, depth - 1)
        else:
            cipherName = transportHandle.get_cipher_name()
            timestamp = asn1DateToTimestamp(cert.get_notAfter())
            expiresIn = timestamp - time.time()

            self.dfd.callback({
                'name': 'certificate',
                'expiresIn': expiresIn,
                'sha1Digest': cert.digest('sha1'),
                'signatureAlgorithm': cert.get_signature_algorithm(),
                'issuer': cert.get_issuer().CN,
                'notAfter': timestamp,
                'notBefore': asn1DateToTimestamp(cert.get_notBefore()),
                'serialNumber': cert.get_serial_number(),
                'subject': cert.get_subject().CN,
                'sslVersion': cert.get_version(),
                'usedCipher': cipherName,
                'weakCipher': self.isWeakSsl
            })

    def connectionLost(self, reason):
        if not self.dfd.called:
            self.dfd.errback(Exception('Connection lost'))


class CertCheckFactory(ClientFactory):

    def __init__(self, dfd, isWeakSsl):
        self.dfd = dfd
        self.isWeakSsl = isWeakSsl

    def clientConnectionFailed(self, connector, reason):
        self.dfd.errback(reason)

    def buildProtocol(self, addr):
        return CertCheckProtocol(self.dfd, self.isWeakSsl)


@inlineCallbacks
def getCertificateInfo(ip, port=443):
    dfd = Deferred()
    factory = CertCheckFactory(dfd, isWeakSsl=False)
    reactor.connectSSL(ip, int(port), factory, contextFactory=normalCtxFac)
    try:
        res = yield dfd
    except Exception as ex:
        if hasattr(ex, 'reason') and 'HANDSHAKE_FAILURE' in ex.reason:
            dfd = Deferred()
            factory = CertCheckFactory(dfd, isWeakSsl=True)
            reactor.connectSSL(ip, int(port), factory,
                               contextFactory=weakCtxFac)
            res = yield dfd
        else:
            raise
    returnValue(res)


@inlineCallbacks
def testit(ip):
    res = yield getCertificateInfo(ip)
    print json.dumps(res)
    reactor.stop()

if __name__ == '__main__':
    testit('google.com')
    reactor.run()

相关问题 更多 >