如何使函数在特定时间和特定间隔开始工作?

2024-09-29 05:30:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图让函数print1启动函数print2(每30分钟运行一次) 但我不能在某个时候停止这个过程。 该功能保持运行。 我需要每天早上6点运行我的功能(例如print1)。每隔30分钟计算一次。晚上10点结束。第二天又一次

import schedule
from schedule import every, repeat, run_pending


def print2():
    print(" interval 30 sec")
    # return schedule.CancelJob


def print1():
    schedule.every(30).minutes.do(print2)
    print("run")
    return schedule.CancelJob


schedule.every().day.at('6:00').do(print1)
job = schedule.every().day.at('10:00').do(print1)
schedule.cancel_job(job)

while True:
    schedule.run_pending()
    time.sleep(1)

Tags: 函数runimport功能returndefjobdo
1条回答
网友
1楼 · 发布于 2024-09-29 05:30:51

下面的代码创建一个每天6:00运行的作业,该作业创建一个重复作业,然后在10:00停止。主6:00作业将每天运行。要工作,需要向main添加一个循环并调用schedule.run_pending()

示例#1

from datetime import datetime, timedelta
import schedule
import time

class Job:

    def __init__(self):
        self.running = False
        self.job = None

    def print2(self):
        if debug:
            print(">> interval 1 sec")
        else:
            print(">> interval 30 mins")

    def print1(self):
        print("started")
        self.running = True
        if debug:
            self.job = schedule.every(1).seconds.do(self.print2)
            stop = (datetime.now() + timedelta(seconds=30)).strftime("%H:%M:%S")
            print("Stop at:", stop)
            schedule.every().day.at(stop).do(self.cancel)
        else:
            self.job = schedule.every(30).minutes.do(self.print2)
            schedule.every().day.at('10:00').do(self.cancel)

    def cancel(self):
        print("cancel job")
        schedule.cancel_job(self.job)
        print("Stopped at ", datetime.now().strftime("%H:%M:%S"))
        self.running = False
        return schedule.CancelJob

# main

# debug = True runs 10 seconds from now then stops repeating job after 30 seconds
# set debug = False to run at 6:00 and stop at 10:00
debug = True

job = Job()

if debug:
    now = datetime.now()
    start = now + timedelta(seconds=10)
    start = start.strftime("%H:%M:%S")
    print("current:", now.strftime("%H:%M:%S"))
    print("start:  ", start)
    schedule.every().day.at(start).do(job.print1)
else:
    schedule.every().day.at('6:00').do(job.print1)

while True:
    if not job.running:
        print("waiting")
    schedule.run_pending()
    time.sleep(1)

输出:

current: 10:29:38
start:   10:29:48
waiting
waiting ... printed 10x
started
Stop at: 10:30:18
>> interval 1 sec
>> interval 1 sec ... print 30x
cancel job
>> interval 1 sec
Stopped at  10:30:18
waiting
waiting
...

另一种方法不是使用调度来启动和停止作业,而是将单个作业调度为每天运行。当计划作业被激活时,它将启动一个新线程,该线程以设置的间隔执行函数,然后在特定时间停止运行

示例#2

from datetime import datetime, timedelta
import schedule
import time
from threading import Thread

class Job:

    def __init__(self):
        self.running = False

    def print2(self):
        if debug:
            print(">> interval 1 sec")
        else:
            print(">> interval 30 mins")
        # do something

    def check_job(self):
        now = datetime.now()
        if debug:
            stop = now + timedelta(seconds=10)
            sleep_time = 1  # every 1 second
        else:
            # stop at 10am
            stop = now.replace(hour=10, minute=0, second=0)
            sleep_time = 600  # every 10 minutes
        print("Stop at:", stop.strftime("%H:%M:%S"))
        while True:
            # perform the periodic function to do something
            self.print2()
            time.sleep(sleep_time)
            if datetime.now() >= stop:
                print("Stopped at ", datetime.now().strftime("%H:%M:%S"))
                self.running = False
                break

    def print1(self):
        print("started")
        self.running = True
        # create and start a new Thread that runs check_job() function
        t = Thread(target=self.check_job, daemon=True)
        t.start()

# uses same main as example #1 above

相关问题 更多 >