跟踪ssh登录尝试的IP扭曲的海螺

2024-09-27 07:25:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图用一个简单的ssh服务器捕捉ssh暴力的尝试,代码如下所示,但我无法将用户名/密码组合与它们的原始IP地址相匹配。在

到目前为止,这是我能得到的最简单的,它还没有记录到stdout。我使用buildProtocol定义在建立新连接时打印出它的IP地址。不过,我想获得IP地址以及用户名和密码凭据,这样我就可以同时跟踪来自不同客户端的多个ssh暴力尝试。目前,我只能在建立连接时获得IP地址,这使得同时跟踪多个连接成为不可能。在

只是澄清一下,当我提到一个或多个连接时,我并不是指一个成功的登录,这在我的代码中是不可能的,也不是有意的。我指的是到ssh服务器的单个连接,它允许在重新连接之前尝试3次密码。在

from zope.interface import implements
from twisted.conch.unix import UnixSSHRealm
from twisted.cred import portal
from twisted.cred.credentials import IUsernamePassword
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.error import UnauthorizedLogin
from twisted.conch.ssh import factory, userauth, keys, session
from twisted.internet import reactor, defer

with open('id_rsa') as privateBlobFile:
    privateKey = privateBlobFile.read()

with open('id_rsa.pub') as publicBlobFile:
    publicKey = publicBlobFile.read()

class FailDB:
    credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker)

    def requestAvatarId(self, credentials):
        print"%s:%s" % ( credentials.username, credentials.password)
        return defer.fail(UnauthorizedLogin("invalid password"))

class UnixSSHdFactory(factory.SSHFactory):
    publicKeys = {
        'ssh-rsa': keys.Key.fromString(data=publicKey)
    }
    privateKeys = {
        'ssh-rsa': keys.Key.fromString(data=privateKey)
    }
    services = {
        'ssh-userauth': userauth.SSHUserAuthServer
    }

    def buildProtocol(self, addr):
        print addr
        return factory.SSHFactory.buildProtocol(self, addr)

if __name__ == '__main__':
    portal = portal.Portal(UnixSSHRealm())
    portal.registerChecker(FailDB())
    UnixSSHdFactory.portal = portal
    reactor.listenTCP(2022, UnixSSHdFactory())
    reactor.run()

输出示例:

^{pr2}$

Tags: fromimportself密码factorytwistedkeysrsa
1条回答
网友
1楼 · 发布于 2024-09-27 07:25:46

通过从SSHUserAuthServer派生并重写_ebBadAuth方法,可以从失败的登录尝试中获取地址。示例:

class AuthServer(userauth.SSHUserAuthServer):
    def _ebBadAuth(self, reason):
        addr = self.transport.getPeer().address.host
        print("addr {} failed to log in as {} using {}"
              .format(addr, self.user, self.method))
        userauth.SSHUserAuthServer._ebBadAuth(self, reason)

如果您还希望允许连接成功,则不必重写SSHFactory.services,而只需使用派生类(如factory.services.update({'ssh-userauth': AuthServer}))更新它。在

相关问题 更多 >

    热门问题