我正在制作一个与测量设备通信的数据采集程序。需要定期(例如,每0.1秒)检查设备的状态,以查看采集是否完成。此外,程序必须具有“中止”方法,因为采集有时需要超过几分钟的时间。因此,我需要使用多线程
我附上了流程图和示例代码。但是我不知道如何调用主线程来从子线程执行方法
python 3.7.2 wxpython 4.0.6
import wx
import time
from threading import Thread
class TestFrame(wx.Frame):
def __init__(self):
wx.Frame.__init__(self, None, title="Test Frame")
panel = wx.Panel(self)
self.Btn1 = wx.Button(panel, label="Start Measurement")
self.Btn1.Bind(wx.EVT_BUTTON, self.OnStart)
self.Btn2 = wx.Button(panel, label="Abort Measurement")
self.Btn2.Bind(wx.EVT_BUTTON, self.OnAbort)
self.Btn2.Enable(False)
self.DoneFlag = False
self.SubThread = Thread(target=self.Check, daemon=True)
sizer = wx.BoxSizer(wx.VERTICAL)
sizer.Add(self.Btn1, 0, wx.EXPAND)
sizer.Add(self.Btn2, 0, wx.EXPAND)
panel.SetSizer(sizer)
def OnStart(self, event):
# self.N is the number of data points
self.N = 0
# self.N_max is the number of data points that is going to be acquired
self.N_max = int(input("How many data points do yo want? (greater than 1) : "))
self.DoneFlag = False
self.Btn1.Enable(False)
self.Btn2.Enable(True)
self.Start()
def OnAbort(self, event):
self.DoneFlag = True
def Start(self):
self.SubThread.start()
def Done(self):
if self.DoneFlag is True:
self.Finish()
elif self.DoneFlag is False:
self.Start()
def Finish(self):
print("Measurement done (N = {})\n".format(self.N))
self.Btn1.Enable(True)
self.Btn2.Enable(False)
def Check(self):
# In the actual program, this method communicates with a data acquisition device to check its status
# For example,
# "RunningStatus" is True when the device is still running (acquisition has not been done yet),
# is False when the device is in idle state (acquisition has done)
#
# [Structure of the actual program]
# while True:
# RunningStatus = GetStatusFromDevice()
# if RunningStatus is False or self.DoneFlag is True:
# break
# else:
# time.sleep(0.1)
# In below code, it just waits 3 seconds then assumes the acqusition is done
t = time.time()
time.sleep(1)
for i in range(3):
if self.DoneFlag is True:
break
print("{} sec left".format(int(5-time.time()+t)))
time.sleep(1)
# Proceed to the next steps after the acquisition is done.
if self.DoneFlag is False:
self.N += 1
print("Data acquired (N = {})\n".format(self.N))
if self.N == self.N_max:
self.DoneFlag = True
self.Done() # This method should be excuted in the main thread
if __name__ == "__main__":
app = wx.App()
frame = TestFrame()
frame.Show()
app.MainLoop()
使用GUI时,不建议从其他线程调用GUI函数,请参阅: https://docs.wxwidgets.org/trunk/overview_thread.html
您的选择之一是使用
events
跟踪正在发生的事情。一个函数在某些事情发生时创建并分派一个
event
,例如表示进度,而另一个函数侦听特定的event
并对其作出反应。所以,就像
pubsub
但是是本地的。在这里,我使用一个事件发布进度信息,另一个事件发布结果信息,但目标不同。
它当然不会完全适合您的场景,但应该提供足够的信息来制定您自己的解决方案
相关问题 更多 >
编程相关推荐