以特定顺序在同一线程中运行多个函数

2024-09-24 06:32:23 发布

您现在位置:Python中文网/ 问答频道 /正文

我有三个函数使用造纸机执行3个不同的jupyter笔记本,我希望第一个(job1)和第二个(job2)函数同时运行,最后一个函数(job3)只在第一个函数(job1)完成运行后运行,不会出现任何错误。我不确定为第二个函数创建新线程是否有意义,或者如何正确使用join()方法。 我在Windows上运行,出于某种原因,concurrent.futures和multiprocessing不起作用,这就是我使用线程模块的原因

def job1():

    return pm.execute_notebook('notebook1.ipynb',log_output=False)

def job2():

     return pm.execute_notebook('notebook2.ipynb',log_output=False)

def job3():

     return pm.execute_notebook('notebook3.ipynb',log_output=False)


t1 = threading.Thread(target = job1)
t2 = threading.Thread(target = job2)
t3 = threading.Thread(target = job3)


try:
   t1.start()
   t1.join()
   t2.start()

except:
   pass

finally:

   t3.start()

Tags: 函数logfalseoutputexecutereturndeft1
2条回答

我喜欢从可视化所需的流开始,我理解为:

enter image description here

这意味着t1和t2需要同时启动,然后您需要同时连接这两个节点:

   t1.start() # <- Started 
   t2.start() # <- Started
   # t1 and t2 executing concurrently

   t1.join()
   t2.join()
   # wait for both to finish

   t3.start()
   t3.join()

t1、t2连接顺序并不重要,因为程序无论如何都必须等待运行时间最长的线程。如果t1首先完成,它将在t2上阻塞,如果t2首先完成,它仍然需要等待t1,然后将在t2.join()上“no op”

我编写了一个并行运行DAG的程序,这正是您所需要的。这是作为开源项目的一部分完成的。解决方案使用进程而不是线程,但您可以调整它。其基本思想是拥有一个进程池并跟踪每个任务的状态,无论何时任何任务完成,都会发送下一个任务。通过在topological order中迭代DAG来确定顺序

如果任何任务失败,它们的所有下游依赖项都将中止。程序继续运行,直到没有更多的任务要运行,代码如下:https://github.com/ploomber/ploomber/blob/0.4.1/src/ploomber/executors/Parallel.py

我看到您正在尝试在Paperill中运行笔记本,我开发的工具正好支持这一点:它可以运行任意复杂的管道(任务可以参数化Jupyter笔记本)。看起来很适合您的用例:https://github.com/ploomber/ploomber

相关问题 更多 >