让两个函数以不同的“采样”时间周期性地运行

2024-06-25 23:24:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经用sched包中的python调度程序管理了使用特定采样时间T定期执行一个函数:

import sched
import time

def cycle(sche, T, fun, arg):
    sche.enter(T, 1, cycle, (sche, T, fun, arg))
    fun(arg)


def fun(arg):
    print(str(time.time()))
    print(arg)


def main():
    scheduler = sched.scheduler(time.time, time.sleep)
    T = 1
    arg = "some argument"
    cycle(scheduler, T, fun, arg)
    scheduler.run()

我想做的是添加另一个函数fun2(),该函数也将使用另一个采样时间T2定期执行

什么是正确的方法


Tags: 函数importtimedef时间arg调度scheduler
1条回答
网友
1楼 · 发布于 2024-06-25 23:24:11

因此,对我来说,以下解决方案奏效了: 因为我将有两个CPU受限的任务,所以我设置了一个包含两个进程的多处理环境。每个进程启动一个自己的调度程序,该调度程序以自己的“采样”时间“永远”运行

任何比我有更多python经验的人(我刚刚开始:-D)对这种方法有什么看法?你认为这会引起什么问题吗

import time
import multiprocessing
import sched

global schedule1
global schedule2


def fun1(arg):
    print("Im the function that is executed every T1")
    time.sleep(0.05)  # do something for t < T1


def fun2(arg):
    print("Im the function that is executed every T2")
    time.sleep(0.8)  # do something for t < T2


def cycle1(scheduler1, T1, fun, arg):
    global schedule1
    try:
        schedule1.append(scheduler1.enter(T1, 1, cycle1, (scheduler1, T1, fun, arg)))
        fun1(arg)
        scheduler1.run()
    except KeyboardInterrupt:
        for event in schedule1:
            try:
                scheduler1.cancel(event)
            except ValueError:
                continue
        return


def cycle2(scheduler2, T2, fun, arg):
    global schedule2
    try:
        schedule2.append(scheduler2.enter(T2, 1, cycle2, (scheduler2, T2, fun, arg)))
        fun2(arg)
        scheduler2.run()
    except KeyboardInterrupt:
        for event in schedule2:
            try:
                scheduler2.cancel(event)
            except ValueError:
                continue
        return


def main():
    global schedule2
    global schedule1

    schedule2 = []
    schedule1 = []

    scheduler1 = sched.scheduler(time.time, time.sleep)
    scheduler2 = sched.scheduler(time.time, time.sleep)
    T1 = 0.1
    T2 = 1
    list_of_arguments_for_fun1 = []
    list_of_arguments_for_fun2 = []

    processes = []

    # set up first process
    process1 = multiprocessing.Process(target=cycle1, args=(scheduler1, T1, fun1, list_of_arguments_for_fun1))
    processes.append(process1)

    # set up second process
    process2 = multiprocessing.Process(target=cycle2, args=(scheduler2, T2, list_of_arguments_for_fun2, list_of_arguments_for_fun2))
    processes.append(process2)

    process1.start()
    process2.start()

    for process in processes:
        process.join()

    # anything below here in the main() won't be executed


if __name__ == "__main__":
    try:
        start = time.perf_counter()
        main()

    except KeyboardInterrupt:
        print('\nCancelled by User. Bye!')
        finish = time.perf_counter()
        print(f'Finished in {round(finish - start, 2)} second(s)')

相关问题 更多 >