如何在tkinter python中停止线程

2024-09-25 00:25:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用Tkinter和threads进行一个项目。该项目是一种测试电子产品的机器

该产品位于带有打开/关闭传感器的盒子内。该产品由220伏供电

我使用一个线程来监控传感器的状态,因此当外壳打开时(如果值=0),我应该关闭电源以保护用户,这是正常工作的

但是,当盒子打开时,所有测试序列都应该停止,测试结果应该是失败的,但这不会发生,并且管理产品测试的线程保持运行(电源关闭,但测试仍在执行)

我必须找到一个解决方案,这样当盒子打开时,测试序列停止并返回初始状态

任何帮助都将不胜感激

谢谢

import tkinter as tk
import threading
from pyModbusTCP.client import ModbusClient
import configparser
import serial


read_config = configparser.ConfigParser()
read_config.read("config.ini")
SERVER_HOST = read_config.get("IOLOGIK", "IP")
SERVER_PORT = read_config.get("IOLOGIK", "Port")
c = ModbusClient()
c.host(SERVER_HOST)
c.port(SERVER_PORT)

port = read_config.get("Configuration MOXA", "port")

address_c = int(read_config.get("I_O_U1", "adresse_cloche"))
adresse_alim = int(read_config.get("I_O_U1", "adresse_alim"))

class InputThread(threading.Thread):
    def __init__(self, threadID):
        super(InputThread, self).__init__()
        self.daemon = False
        self.c = []
        self.running = None
        self.id = threadID

    def run(self):
        if self.id == 1:
            while True:
                try:
                   self.c = c.read_discrete_inputs(address_c, 1)  # reading my variable from the iologik
                   if not self.c[0]:
                       x = c.write_single_coil(adresse_alim, 0)  # alim off
                       ser.close()
                       self.running = False
                       finish.configure(bg="red")
                       break
                   else:
                       self.running = True
                except:
                    break
        elif self.id == 2:
            start_test()


def task_serial(ser, cmd):
    if ser.isOpen:
        try:
            ser.write(cmd)
            x = ser.read()
            line = ser.readline()
            return line
        except:
            return 0


def connection_Product():
    global ser
    ser = serial.Serial()
    ser.port = port
    ser.baudrate = 19200
    ser_open = False
    while not ser_open:
        try:
            ser.open()
            ser_open = True
        except:
            ser_open = False
    res = task_serial(ser, "hh ")
    return ser


def Product_test():
    res = task_serial(ser, "aa")
    res = task_serial(ser, "bb")
    res = task_serial(ser, "cc")
    res = task_serial(ser, "dd")
    res = task_serial(ser, "ee")
    res = task_serial(ser, "ff")
    res = task_serial(ser, "hh")
    res = task_serial(ser, "ii")
    res = task_serial(ser, "jj")
    res = task_serial(ser, "kk")
    res = task_serial(ser, "ll")
    res = task_serial(ser, "mm")
    res = task_serial(ser, "nn")
    res = task_serial(ser, "oo")
    res = task_serial(ser, "pp")
    res = task_serial(ser, "qq")


def test():
    thread = InputThread(2)
    thread.start()


def start_test():
    it = InputThread()
    it.start()
    x = c.write_single_coil(adresse_alim, 1) # alim on
    while it.running:
        ser = connection_Product()
        Product_test()


window = tk.Tk()
window.geometry("2000x700")
window.title("Test")
departure = tk.Button(window, text="Depart Test", fg="#0F8DCB",font="serif 13 bold", width=15, command=test)
departure.place(x=1100, y=160)
finish = tk.Button(window, text="Fin Test", fg="#0F8DCB", font="serif 13 bold", width=15)
finish.place(x=1100, y=210)
window.mainloop()

Tags: testimportselfconfigtaskreadgetport