在socketio中,线程不会对来自其他类的事件作出反应

2024-10-02 12:36:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个应用程序,我用烧瓶来控制一些家庭自动化。 应用程序在raspberry pi上运行。 我通过触摸屏控制应用程序。 到目前为止一切正常。 下一步,我想对“真正的按钮”做出反应 我附上了一个精简的代码片段,显示了我的问题。 我使用Raspberry GPIO库,在这里我可以定义一个回调函数,每次按下按钮时都会调用该函数 回调函数设置一个事件。 当我从主程序直接调用这个函数“taster.set_interrupt()”时,一切都正常。 如果GPIO库调用该函数,“flag.wait()”语句无法识别该事件,它将永远等待。 如果我使用轮询来检查事件状态,那么一切正常。 如果我在没有socketio的情况下使用“标准线程”,所有的工作都很好

from threading import Event, Thread
from flask import Flask
from flask_socketio import SocketIO
import eventlet
import time
import RPi.GPIO as GPIO


class PushButton:

    def __init__(self, interrupt):
        self.interrupt = interrupt
        GPIO.setmode(GPIO.BOARD)
        pin = 24
        GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
        GPIO.add_event_detect(pin, GPIO.BOTH, callback=self.set_interrupt, bouncetime=200)

    def set_interrupt(self, channel=24):
        print("Set interrupt")
        self.interrupt.set()


async_mode = 'eventlet'
eventlet.monkey_patch()
app = Flask(__name__)
socketio = SocketIO(app, async_mode=async_mode)


def checker(flag):
    print("Checker is running")
    while True:
        flag.wait()
        print("Thread is alive !!!!",flag.isSet())
        flag.clear()


flag = Event()
flag.clear()
taster = PushButton(flag)
socketio.start_background_task(checker, flag)
socketio.sleep(2)
taster.set_interrupt()
while True:
    socketio.sleep(1)


Tags: 函数fromimportselfgpiodefpin事件

热门问题