SSH端口转发和与SQL Server的连接在单独的线程中冻结

2024-10-03 06:29:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试将通过SSH与远程主机连接的模块合并到应用程序中,建立端口转发(转发sqlserver正在侦听的远程端口),然后建立到sqlserver实例的连接。你知道吗

以下是代码的提取部分:

  1. 定义GUI(在QT5中)并为插槽分配操作
  2. 提供端口转发框架
  3. 提供数据库连接框架
  4. 执行应用程序

第一部分定义了一个简单的窗口,提供用户名和密码来加载rsa密钥,然后通过ssh连接到远程主机,最后建立端口转发。应用程序只连接到远程主机并设置端口转发时工作。我已经用数据库浏览器(一个单独的应用程序)检查过了。当我运行ssh连接和端口转发部分时,我可以轻松地使用数据库浏览器,就像使用putty with port forwarding选项一样。你知道吗

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import paramiko
import socket
import select
import socketserver as SocketServer
import sys
import traceback

import sqlalchemy as ORM

from PyQt5 import QtCore
from PyQt5 import QtGui
from PyQt5 import QtWidgets

"""
Define main window
"""
class guiWindowLogin(QtWidgets.QDialog):

    def __init__(self):
        super(guiWindowLogin, self).__init__()

        # Initialize imported variables and objects
        self.WorkerPool = QtCore.QThreadPool(self)
        self.ssh_forwarder = SSHForwarder()
        self.dbh_mssql = dbhMSSQL()

        # Initialize window and widgets
        self.init_winMain()

        # Initialize widgets
        self.init_lblUserName()
        self.init_ledUserName()
        self.init_lblUserPass()
        self.init_ledUserPass()
        self.init_lblConnection()
        self.init_btnLogin()
        self.init_btnWindowClose()

        # Show window
        self.open()

    def init_winMain(self):
        self.left = 200
        self.top = 100
        self.width = 340
        self.height = 170
        self.setWindowTitle("Login")
        self.setFixedSize(self.width, self.height)
        self.move(self.left, self.top)

    def init_lblUserName(self):
        self.lblUserName = QtWidgets.QLabel(self)
        self.lblUserName.setGeometry(10, 10, 120, 30)
        self.lblUserName.setFont(QtGui.QFont("Microsoft Sans Serif", 10))
        self.lblUserName.setText("Username:")

    def init_ledUserName(self):
        self.ledUserName = QtWidgets.QLineEdit(self)
        self.ledUserName.setFont(QtGui.QFont("Microsoft Sans Serif", 10))
        self.ledUserName.setGeometry(100, 10, 230, 30)

    def init_lblUserPass(self):
        self.lblUserPass = QtWidgets.QLabel(self)
        self.lblUserPass.setGeometry(10, 50, 120, 30)
        self.lblUserPass.setFont(QtGui.QFont("Microsoft Sans Serif", 10))
        self.lblUserPass.setText("Password:")

    def init_ledUserPass(self):
        self.ledUserPass = QtWidgets.QLineEdit(self)
        self.ledUserPass.setGeometry(100, 50, 230, 30)
        self.ledUserPass.setFont(QtGui.QFont("Microsoft Sans Serif", 10))
        self.ledUserPass.setEchoMode(QtWidgets.QLineEdit.Password)

    def init_lblConnection(self):
        self.lblConnection = QtWidgets.QLabel(self)
        self.lblConnection.setGeometry(10, 90, 320, 30)
        self.lblConnection.setFont(QtGui.QFont("Microsoft Sans Serif", 10, weight = QtGui.QFont.Bold))
        self.lblConnection.setAlignment(QtCore.Qt.AlignCenter)
        self.lblConnection.setFrameStyle(QtWidgets.QFrame.Panel)
        self.lblConnection.setFrameShadow(QtWidgets.QFrame.Sunken)

    def init_btnLogin(self):
        self.btnLogin = QtWidgets.QPushButton(self)
        self.btnLogin.setGeometry(10, 130, 150, 30)
        self.btnLogin.setFont(QtGui.QFont("Microsoft Sans Serif", 10))
        self.btnLogin.setText("Login")
        self.btnLogin.clicked.connect(self.slot_login)

    def init_btnWindowClose(self):
        self.btnWindowClose = QtWidgets.QPushButton(self)
        self.btnWindowClose.setGeometry(180, 130, 150, 30)
        self.btnWindowClose.setFont(QtGui.QFont("Microsoft Sans Serif", 11))
        self.btnWindowClose.setText("Close window")
        self.btnWindowClose.clicked.connect(self.slot_window_close)

    def slot_login(self):
        try:
            self.ssh_forwarder.key_load(self.ledUserName.text(), self.ledUserPass.text())
            self.ssh_forwarder.connect()
            self.SSHConnection = Worker(self.ssh_forwarder.tunnel_create)
            self.WorkerPool.start(self.SSHConnection)
        except Exception as ex:
            print("An exception of type {0} occurred. Arguments:\n{1!r}".format(type(ex).__name__, ex.args))
        finally:
            self.exe_ssh_connection_check()

        self.dbh_mssql.engine_create()
        try:
            self.MSSQLConnection = Worker(self.dbh_mssql.connection_check)
            self.WorkerPool.start(self.MSSQLConnection)
        except Exception as ex:
            print("An exception of type {0} occurred. Arguments:\n{1!r}".format(type(ex).__name__, ex.args))


    def slot_window_close(self):
        self.close()

    def exe_ssh_connection_check(self):
        hostname = self.ssh_forwarder.connection_check()
        if hostname == "unknown":
            self.lblConnection.setStyleSheet("color: rgb(0, 0, 0); background-color: rgb(255, 153, 153)")
            self.lblConnection.setText("No connection")
        else:
            self.lblConnection.setStyleSheet("color: rgb(0, 0, 0); background-color: rgb(153, 255, 102)")
            self.lblConnection.setText("Connected to: {}".format(hostname))
        self.lblConnection.repaint()

导致应用程序冻结的部分如下所示

    self.dbh_mssql.engine_create()
    try:
        self.MSSQLConnection = Worker(self.dbh_mssql.connection_check)
        self.WorkerPool.start(self.MSSQLConnection)
    except Exception as ex:
        print("An exception of type {0} occurred. Arguments:\n{1!r}".format(type(ex).__name__, ex.args))

代码的其他部分如下:

SSH和转发框架:

下面的代码部分是从

https://github.com/paramiko/paramiko/blob/master/demos/forward.py

# --- SSH and port forwarding ---#
class ForwardServer(SocketServer.ThreadingTCPServer):
    daemon_threads = True
    allow_reuse_address = True

class Handler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            self.channel = self.ssh_transport.open_channel(
                "direct-tcpip",
                (self.chain_host, self.chain_port),
                self.request.getpeername(),
            )
        except Exception as e:
            print('Incoming request to {0}:{1} failed: %s'.format(self.chain_host, self.chain_port, repr(e)))
            return
        if self.channel is None:
            print('Incoming request to {0}:{1} was rejected by the SSH server.'.format(self.chain_host, self.chain_port))
            return

        print('Connected!  Tunnel open {0} -> {1} -> {2}'.format(
                self.request.getpeername(),
                self.channel.getpeername(),
                (self.chain_host, self.chain_port),
            )
        )
        while True:
            r, w, x = select.select([self.request, self.channel], [], [])
            if self.request in r:
                data = self.request.recv(1024)
                if len(data) == 0:
                    break
                self.channel.send(data)
            if self.channel in r:
                data = self.channel.recv(1024)
                if len(data) == 0:
                    break
                self.request.send(data)

        self.peername = self.request.getpeername()
        self.channel.close()
        self.request.close()
        print('Tunnel closed from {0}'.format(self.peername))

class SSHForwarder():
    def __init__(self):
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.remote_host = "1.1.1.1"
        self.remote_port_ssh = 22
        self.remote_port_forward = 53425
        self.local_port_forward = 1433
        self.pkey_rsa = None
        self.key_username = None

    def key_load(self, key_username, key_password):
        self.key_username = key_username
        key_path = os.path.join("C:\\DATA\\ssh", "{0}{1}".format(self.key_username, "_rsa"))
        self.pkey_rsa = paramiko.RSAKey.from_private_key_file(
            key_path,
            password=key_password
        )

    def tunnel_create(self, callback_progress, callback_data):
        class SubHandler(Handler):
            chain_host = self.remote_host
            chain_port = int(self.remote_port_forward)
            ssh_transport = self.client.get_transport()

        ForwardServer(("", int(self.local_port_forward)), SubHandler).serve_forever()

    def connect(self):
        try:
            self.client.connect(
                self.remote_host,
                port=int(self.remote_port_ssh),
                username=self.key_username,
                pkey=self.pkey_rsa
            )
        except Exception as e:
            sys.exit(1)

    def connection_check(self):
        try:
            hostname = (self.client.get_transport().getpeername()[0])
            return hostname
        except Exception as e:
            return "unknown"

数据库连接框架:

class dbhMSSQL(object):
    def __init__(self):
        self.dialect = "mssql"
        self.driver = "pymssql"
        self.username = "username"
        self.password = "password"
        self.host = "localhost"
        self.port = "1433"
        self.database = "Database"

    def engine_create(self):
        self.conn_string = "{0}+{1}://{2}:{3}@{4}:{5}/{6}".format(self.dialect, self.driver, self.username, self.password, self.host, self.port, self.database)
        self.engine = ORM.create_engine(self.conn_string)

    def connection_check(self, callback_progress, callback_data):
        try:
            dbc_main = self.engine.connect()
            dbc_main.execute(
                "SELECT * FROM sys.dm_exec_connections"
            ).fetchall()
            dbc_main.close()
            return True
        except Exception as ex:
            return False

线程框架:下面的代码部分是从

https://www.learnpyqt.com/courses/concurrent-execution/multithreading-pyqt-applications-qthreadpool/

class WorkerSignals(QtCore.QObject):
    error = QtCore.pyqtSignal(tuple)
    finished = QtCore.pyqtSignal()
    progress = QtCore.pyqtSignal(int)
    result = QtCore.pyqtSignal(object)
    data = QtCore.pyqtSignal(dict)


class Worker(QtCore.QRunnable):
    def __init__(self, fn, *args, **kwargs):
        super(Worker, self).__init__()

        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

        # Add the callback to our kwargs
        self.kwargs['callback_progress'] = self.signals.progress
        self.kwargs['callback_data'] = self.signals.data

    # @pyqtSlot()
    def run(self):
        '''
        Initialise the runner function with passed args, kwargs.
        '''

        # Retrieve args/kwargs here; and fire processing using them
        try:
            result = self.fn(
                *self.args,
                **self.kwargs
            )
        except:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit((exctype, value, traceback.format_exc()))
        else:
            self.signals.result.emit(result)  # Return the result of the processing
        finally:
            self.signals.finished.emit()  # Done

应用程序执行

if __name__ == '__main__':

    app = QtWidgets.QApplication(sys.argv)
    window = guiWindowLogin()
    window.show()
    sys.exit(app.exec_())

如果与SQL Server的连接没有放入线程,应用程序也会冻结。你知道吗

编辑 经过几次试验,我发现问题出在指挥部内部 发动机连接() 这是SQLAlchemy软件包的一部分。你知道吗


Tags: keyimportselfformathostchaindatainit