我有三个函数使用造纸机执行3个不同的jupyter笔记本,我希望第一个(job1)和第二个(job2)函数同时运行,最后一个函数(job3)只在第一个函数(job1)完成运行后运行,不会出现任何错误。我不确定为第二个函数创建新线程是否有意义,或者如何正确使用join()方法。 我在Windows上运行,出于某种原因,concurrent.futures和multiprocessing不起作用,这就是我使用线程模块的原因
def job1():
return pm.execute_notebook('notebook1.ipynb',log_output=False)
def job2():
return pm.execute_notebook('notebook2.ipynb',log_output=False)
def job3():
return pm.execute_notebook('notebook3.ipynb',log_output=False)
t1 = threading.Thread(target = job1)
t2 = threading.Thread(target = job2)
t3 = threading.Thread(target = job3)
try:
t1.start()
t1.join()
t2.start()
except:
pass
finally:
t3.start()
我喜欢从可视化所需的流开始,我理解为:
这意味着t1和t2需要同时启动,然后您需要同时连接这两个节点:
t1、t2连接顺序并不重要,因为程序无论如何都必须等待运行时间最长的线程。如果t1首先完成,它将在t2上阻塞,如果t2首先完成,它仍然需要等待t1,然后将在t2.join()上“no op”
我编写了一个并行运行DAG的程序,这正是您所需要的。这是作为开源项目的一部分完成的。解决方案使用进程而不是线程,但您可以调整它。其基本思想是拥有一个进程池并跟踪每个任务的状态,无论何时任何任务完成,都会发送下一个任务。通过在topological order中迭代DAG来确定顺序
如果任何任务失败,它们的所有下游依赖项都将中止。程序继续运行,直到没有更多的任务要运行,代码如下:https://github.com/ploomber/ploomber/blob/0.4.1/src/ploomber/executors/Parallel.py
我看到您正在尝试在Paperill中运行笔记本,我开发的工具正好支持这一点:它可以运行任意复杂的管道(任务可以参数化Jupyter笔记本)。看起来很适合您的用例:https://github.com/ploomber/ploomber
相关问题 更多 >
编程相关推荐