Python替换子进程源文件

2024-10-05 13:20:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试编写一个程序,其中有两个文件,一个名为launcher.py,另一个名为sysupdate.py,其中launcher生成子进程并发运行(包括sysupdate),并且sysupdate在网络上侦听压缩的软件更新文件。当sysupdate收到更新文件时,它需要能够终止/暂停其他进程(由launcher创建),替换它们的源代码文件,然后重新启动它们。我正在努力寻找一种简洁的方法来实现这一点,我想知道是否有人对我如何实现这一点有任何建议

我应该提到,这些子流程被设计为无限循环,因此我不能等待它们退出。不幸的是,我需要能够手动杀死它们,替换它们的源文件,然后重新启动它们

当子进程运行时,我需要启动器能够“保持它们的活力”,因此如果它们因任何原因死亡,那么应该重新启动它们。显然,我需要在他们因软件更新而被杀死时暂停这种行为。这段代码是为一个始终在传感器系统,因此我需要一致的循环和重新启动

例如:

launcher.py:

def launch_threads():   
    # Reading thread
    try:
        readthread = Process(target=read_loop, args=(sendqueue, mqttqueue))
        processes.append(readthread)
    except Exception as ex:
        log("Read process creation failed: " + str(ex), 3)
        
    # ..... Other threads/processes here
    
    # System Update Thread
    try:
        global updatethread
        updatethread = Process(target=update_loop, args=(updatequeue,))
        processes.append(updatethread)
    except Exception as ex:
        log("Software updater process creation failed: " + str(ex), 3)

    return processes


if __name__ == '__main__':
        processes = launch_threads()
        for p in processes:
            p.start()
        for p in processes:              # Here I have it trying to keep processes alive permanently, .. 
            p.join()                     # .. I need a way to 'pause' this
            if not p.is_alive():
                p.start()

sysupdate.py:

def update_loop():

    wait_for_zip_on_network()
    extract_zip()
    
    kill_processes()           # Need sysupdate to be able to tell 'launcher' to kill/pause the processes

    replace_source_files()

    resume_processes()         # Tell 'launcher' to resume/restart the processes

Tags: 文件topyloopfor软件进程def
1条回答
网友
1楼 · 发布于 2024-10-05 13:20:52

launch_threads可能用词不当,因为您是在启动进程而不是线程。我假设您正在启动一些可以分配给变量N_TASKS和一个由update_loop表示的额外进程,因此进程总数为N_TASKS+1。此外,我将假设这些N_TASKS进程最终将在没有源代码更新的情况下完成。我的建议是使用一个多处理池,它方便地提供了一些使我们的工作更简单的工具。我还将使用update_loop的修改版本,它只侦听更改、更新源代码并终止,但可以重新启动:

sysupdate.py

def modified_update():
    zip_file = wait_for_zip_on_network()
    return zip_file

然后我们使用multiprocessing模块中的Pool类进行各种回调,这样我们就可以知道各种提交的任务何时完成。我们希望等待modified_update任务的完成,或者等待所有“常规”任务的完成。在任何一种情况下,我们都会终止所有未完成的任务,但在第一种情况下,我们会重新启动所有任务,在第二种情况下,我们完成了:

from multiprocessing import Pool
from threading import Event

# the number of processes that need to run besides the modified_update process:
N_TASKS = 4

completed_event = None
completed_count = 0

def regular_task_completed_callback(result):
    global completed_count, completed_event
    completed_count += 1
    if completed_count == N_TASKS:
        completed_event.set() # we are throug with all the tasks

def new_source_files_callback(zip_file):
    global completed_event
    extract_zip(zip_file)
    replace_source_files()
    completed_event.set()

def launch_threads():
    global completed_event, completed_count
    POOLSIZE = N_TASKS + 1
    while True:
        completed_event = Event()
        completed_count = 0
        pool = Pool(POOLSIZE)
        # start the "regular" processes:
        pool.apply_async(read_loop, args=(sendqueue, mqttqueue), callback=regular_task_completed_callback)
        # etc.
        # start modified update_loop:
        pool.apply_async(modified_update, callback=new_source_files_callback)
        # wait for either the source files to have changed or the "regular" tasks to have completed:
        completed_event.wait()
        # terminate all outstanding tasks
        pool.terminate()
        if completed_count == N_TASKS: # all the "regular" tasks have completed
            return # we are done
        # else we start all over again


if __name__ == '__main__':
    processes = launch_threads()

更新

如果“常规”任务从未终止,那么这就大大简化了逻辑modified_update变成:

sysupdate.py

def modified_update():
    zip_file = wait_for_zip_on_network()
    extract_zip(zip_file)
    replace_source_files()

然后:

launcher.py

from multiprocessing import Pool


def launch_threads():
    # the number of processes that need to run besides the modified_update process:
    N_TASKS = 4
    POOLSIZE = N_TASKS + 1
    while True:
        pool = Pool(POOLSIZE)
        # start the "regular" processes:
        pool.apply_async(read_loop, args=(sendqueue, mqttqueue))
        # etc.
        # start modified_update:
        result = pool.apply_async(modified_update)
        result.get() # wait for modified_update to complete
        # terminate all outstanding (i.e. "regular") tasks
        pool.terminate()
        # and start all over


if __name__ == '__main__':
    launch_threads()

注意

由于我现在使用的Pool设施更少,因此您可以返回到启动单个Process实例。所做工作的要点是:

  1. modified_update不再循环,而是在进行源代码更新后终止
  2. launch_threads由一个循环组成,该循环启动“常规”和modified_update进程,并等待modified_update进程完成,表示已发生源更新。因此,所有“常规”流程都必须终止,一切都必须重新开始。使用池只需简单地跟踪所有进程并通过一次调用终止它们

相关问题 更多 >

    热门问题