“从子线程”调用主线程的方法

2024-10-03 11:17:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在制作一个与测量设备通信的数据采集程序。需要定期(例如,每0.1秒)检查设备的状态,以查看采集是否完成。此外,程序必须具有“中止”方法,因为采集有时需要超过几分钟的时间。因此,我需要使用多线程

我附上了流程图和示例代码。但是我不知道如何调用主线程来从子线程执行方法


python 3.7.2 wxpython 4.0.6


Flow Chart

import wx
import time
from threading import Thread

class TestFrame(wx.Frame):
    def __init__(self):
        wx.Frame.__init__(self, None, title="Test Frame")
        panel = wx.Panel(self)

        self.Btn1 = wx.Button(panel, label="Start Measurement")
        self.Btn1.Bind(wx.EVT_BUTTON, self.OnStart)
        self.Btn2 = wx.Button(panel, label="Abort Measurement")
        self.Btn2.Bind(wx.EVT_BUTTON, self.OnAbort)
        self.Btn2.Enable(False)

        self.DoneFlag = False
        self.SubThread = Thread(target=self.Check, daemon=True)

        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.Btn1, 0, wx.EXPAND)
        sizer.Add(self.Btn2, 0, wx.EXPAND)
        panel.SetSizer(sizer)

    def OnStart(self, event):
        # self.N is the number of data points
        self.N = 0
        # self.N_max is the number of data points that is going to be acquired
        self.N_max = int(input("How many data points do yo want? (greater than 1) : ")) 
        self.DoneFlag = False
        self.Btn1.Enable(False)
        self.Btn2.Enable(True)
        self.Start()

    def OnAbort(self, event):
        self.DoneFlag = True

    def Start(self):
        self.SubThread.start()

    def Done(self):
        if self.DoneFlag is True:
            self.Finish()
        elif self.DoneFlag is False:
            self.Start()

    def Finish(self):
        print("Measurement done (N = {})\n".format(self.N))
        self.Btn1.Enable(True)
        self.Btn2.Enable(False)

    def Check(self):
        # In the actual program, this method communicates with a data acquisition device to check its status
        # For example,
        # "RunningStatus" is True when the device is still running (acquisition has not been done yet),
        #                 is False when the device is in idle state (acquisition has done) 
        #
        #     [Structure of the actual program]
        #     while True:
        #         RunningStatus = GetStatusFromDevice()
        #         if RunningStatus is False or self.DoneFlag is True:
        #             break
        #         else:
        #             time.sleep(0.1)
    
        # In below code, it just waits 3 seconds then assumes the acqusition is done
        t = time.time()
        time.sleep(1)
        for i in range(3):
            if self.DoneFlag is True:
                break
            print("{} sec left".format(int(5-time.time()+t)))
            time.sleep(1)

        # Proceed to the next steps after the acquisition is done.
        if self.DoneFlag is False:
            self.N += 1
            print("Data acquired (N = {})\n".format(self.N))
            if self.N == self.N_max:
                self.DoneFlag = True
        self.Done() # This method should be excuted in the main thread
        
if __name__ == "__main__":
    app = wx.App()
    frame = TestFrame()
    frame.Show()
    app.MainLoop()

Tags: theselffalsetrueiftimeisenable
1条回答
网友
1楼 · 发布于 2024-10-03 11:17:53

使用GUI时,不建议从其他线程调用GUI函数,请参阅: https://docs.wxwidgets.org/trunk/overview_thread.html

您的选择之一是使用events跟踪正在发生的事情。
一个函数在某些事情发生时创建并分派一个event,例如表示进度,而另一个函数侦听特定的event并对其作出反应。
所以,就像pubsub但是是本地的。
在这里,我使用一个事件发布进度信息,另一个事件发布结果信息,但目标不同。
它当然不会完全适合您的场景,但应该提供足够的信息来制定您自己的解决方案

import time
import wx
from threading import Thread

import wx.lib.newevent
progress_event, EVT_PROGRESS_EVENT = wx.lib.newevent.NewEvent()
results_event, EVT_RESULTS_EVENT = wx.lib.newevent.NewEvent()

class ThreadFrame(wx.Frame):

    def __init__(self, title, parent=None):
        wx.Frame.__init__(self, parent=parent, title=title)
        panel = wx.Panel(self)
        self.parent = parent
        self.btn = wx.Button(panel,label='Stop Measurements', size=(200,30), pos=(10,10))
        self.btn.Bind(wx.EVT_BUTTON, self.OnExit)
        self.progress = wx.Gauge(panel,size=(240,10), pos=(10,50), range=30)

        #Bind to the progress event issued by the thread
        self.Bind(EVT_PROGRESS_EVENT, self.OnProgress)
        #Bind to Exit on frame close
        self.Bind(wx.EVT_CLOSE, self.OnExit)
        self.Show()

        self.mythread = TestThread(self)

    def OnProgress(self, event):
        self.progress.SetValue(event.count)
        #or for indeterminate progress
        #self.progress.Pulse()
        if event.result != 0:
            evt = results_event(result=event.result)
            #Send back result to main frame
            try:
                wx.PostEvent(self.parent, evt)
            except:
                pass

    def OnExit(self, event):
        if self.mythread.isAlive():
            self.mythread.terminate() # Shutdown the thread
            self.mythread.join() # Wait for it to finish
        self.Destroy()

class TestThread(Thread):
    def __init__(self,parent_target):
        Thread.__init__(self)
        self.parent = parent_target
        self.stopthread = False
        self.start()    # start the thread

    def run(self):
        curr_loop = 0
        while self.stopthread == False:
            curr_loop += 1
        # Send a result every 3 seconds for test purposes
            if curr_loop < 30:
                time.sleep(0.1)
                evt = progress_event(count=curr_loop,result=0)
                #Send back current count for the progress bar
                try:
                    wx.PostEvent(self.parent, evt)
                except: # The parent frame has probably been destroyed
                    self.terminate()
            else:
                curr_loop = 0
                evt = progress_event(count=curr_loop,result=time.time())
                #Send back current count for the progress bar
                try:
                    wx.PostEvent(self.parent, evt)
                except: # The parent frame has probably been destroyed
                    self.terminate()

    def terminate(self):
        evt = progress_event(count=0,result="Measurements Ended")
        try:
            wx.PostEvent(self.parent, evt)
        except:
            pass
        self.stopthread = True

class MyPanel(wx.Panel):

    def __init__(self, parent):
        wx.Panel.__init__(self, parent)
        self.text_count = 0
        self.thread_count = 0
        self.parent=parent
        btn = wx.Button(self, wx.ID_ANY, label='Start Measurements', size=(200,30), pos=(10,10))
        btn.Bind(wx.EVT_BUTTON, self.Thread_Frame)
        btn2 = wx.Button(self, wx.ID_ANY, label='Is the GUI still active?', size=(200,30), pos=(10,50))
        btn2.Bind(wx.EVT_BUTTON, self.AddText)
        self.txt = wx.TextCtrl(self, wx.ID_ANY, style= wx.TE_MULTILINE, pos=(10,90),size=(400,100))
        #Bind to the result event issued by the thread
        self.Bind(EVT_RESULTS_EVENT, self.OnResult)

    def Thread_Frame(self, event):
        self.thread_count += 1
        frame = ThreadFrame(title='Measurement Task '+str(self.thread_count), parent=self)

    def AddText(self,event):
        self.text_count += 1
        txt = "Gui is still active " + str(self.text_count)+"\n"
        self.txt.write(txt)

    def OnResult(self,event):
        txt = "Result received " + str(event.result)+"\n"
        self.txt.write(txt)

class MainFrame(wx.Frame):

    def __init__(self):
        wx.Frame.__init__(self, None, title='Main Frame', size=(600,400))
        panel = MyPanel(self)
        self.Show()


if __name__ == '__main__':
    app = wx.App(False)
    frame = MainFrame()
    app.MainLoop()

enter image description here

相关问题 更多 >