我正在从我的项目的主类实例化flas应用程序,从项目中的一个模块,我使用signalslot python库创建了信号来发送数据,flaskApp类负责flask服务器,在同一个类上,signal连接到slot以将数据发布到连接的websocket
对于signalslot,我使用此库https://pypi.org/project/signalslot/
import signalslot
on_heartbeat_data_received_signal = signalslot.Signal()
import signals.py
on_heartbeat_data_received_signal .emit(data="hello")
class Main():
flaskApp()
if __name__ == "__main__":
main = Main()
from flask import Flask
from flask_socketio import SocketIO, emit
from common.signals import on_heartbeat_data_received_signal
class FlaskApp():
def __init__(self,config_path):
self.logger = Logger.get_instance("FlaskApp")
self.logger.debug("started")
# Create an application
app = Flask(__name__)
app.config['SECRET_KEY'] = 'gjr39dkjn344_!67#' #todo read from yaml
socketio.init_app(app, async_mode="threading", cors_allowed_origins="*")
CORS(app)
# load yml to read flask configuration from config path
with open(config_path, "r") as yamlStream:
try:
self.flask_config = yaml.safe_load(yamlStream)["flask"]
except yaml.YAMLError as exc:
print("replace to logger")
self.logger.info(self.flask_config["host"])
self.logger.info(self.flask_config["port"])
self.logger.debug("connect to signal to send to UI..")
on_heartbeat_data_received_signal.connect(self.publish_data)
socketio.run(app, debug=True, host=self.flask_config["host"], port=self.flask_config["port"])
self.logger.debug("flask server is up....")
self.logger.debug("ended")
@socketio.on("connect")
def websocket_connect():
print("websocket connected...")
# socketio.emit("response", "mesg from server") # this line is working
#registered slot to capture data from signal
def publish_data(self, data, **kwargs):
self.logger.debug("started publish data to connected websocket client..")
self.logger.debug(data)
try:
socketio.emit("response-data", data) #this is not emiting data
except Exception as e:
self.logger.error(e)
@socketio.on("response-data")
def handle_message(message):
print(message)
目前没有回答
相关问题 更多 >
编程相关推荐