使用“mpi4py.futures”、“ipyparallel”或“dask mpi”在多个核心(>10k)上运行许多“adaptive.learner”。
adaptive-scheduler的Python项目详细描述
使用mpi4py.futures、ipyparallel或dask.distributed在多个核心(>;10K)上运行多个adaptive.Learners。
这是什么?
Adaptive Scheduler解决了以下问题:您需要运行的学习者比使用单个运行器和/或使用>;1K内核运行的学习者多。
ipyparallel和dask.distributed为交互式会话提供了非常强大的引擎。但是,当您想连接到>;1K核心时,它会开始挣扎。除此之外,在共享集群上,经常会出现在有足够可用空间的情况下启动交互式会话的问题。
我们的方法是为每个adaptive.Learner调度不同的作业。这些作业的创建和运行由adaptive-scheduler管理。这意味着您的计算肯定会运行,即使此时集群可能已满。由于这种方法,几乎没有限制要使用多少内核。在调度数百个作业时,可以为1个作业(learner)使用10个节点,也可以为1个作业(learner)使用1个核心。
所有事物都是这样写的,即计算最大限度地是局部的。这意味着这是一个作业崩溃,没有问题,它将自动调度一个新的作业,并在停止时继续计算(因为adaptive具有定期保存功能)。即使中央“作业管理器”死亡,作业仍将继续运行(尽管不会安排新作业)。设计目标
- 需要能够在30K核上高效运行
- 可与自适应软件包无缝配合使用
- 文件系统的最小负载
- 删除与调度程序一起工作的所有样板文件
- 编写作业脚本
- (重新)提交作业脚本
- 以最小的数据丢失处理随机崩溃(或节点逐出)
- 在作业中保留python内核和变量(与为每个参数提交作业不同)
- 将仿真定义代码与运行仿真的代码分开
- 最大化计算局部性,当主进程死亡 时,作业继续运行
它是如何工作的?
创建一个文件,在其中定义一组learners和相应的fnames,以便可以导入它们,例如:
# learners_file.pyimportadaptivefromfunctoolsimportpartialdefh(x,pow,a):returna*x**powcombos=adaptive.utils.named_product(pow=[0,1,2,3,4,5,6,7,8,9],a=[0.1,0.5],)# returns list of dicts, cartesian product of all valueslearners=[adaptive.Learner1D(partial(h,**combo),bounds=(-1,1))forcomboincombos]fnames=[f"data/{combo}"forcomboincombos]
然后开始一个过程,创建和提交尽可能多的作业脚本。例如:
importadaptive_schedulerdefgoal(learner):returnlearner.npoints>200run_manager=adaptive_scheduler.server_support.RunManager(learners_file="learners_file.py",goal=goal,cores_per_job=12,# every learner is one joblog_interval=30,# write info such as npoints, cpu_usage, time, etc. to the job log filesave_interval=300,# save the data every 300 seconds)run_manager.start()
就这样!您可以运行run_manager.info(),它将显示一个交互式的ipywidget,显示正在运行、挂起和完成的作业的数量、取消作业的按钮以及其他有用信息。
但是是如何工作的呢?
~adaptive_scheduler.server_support.runmanager基本上执行下面写的操作。 因此,您需要创建一个learners_file.py来定义learners和fnames(如上面的部分所示)。 然后,一个“作业管理器”编写和提交的作业数量与学习者的数量一样多,但是不知道它将运行哪个学习者! 这是“数据库管理器”的职责,它将数据库保存为job_id <--> learner。
在另一个python文件(在节点上运行的文件)中,我们执行如下操作:
# run_learner.pyimportadaptivefromadaptive_schedulerimportclient_supportfrommpi4py.futuresimportMPIPoolExecutor# the file that defines the learners we created abovefromlearners_fileimportlearners,fnamesif__name__=="__main__":# ← use this, see warning @ https://bit.ly/2HAk0GG# the address of the "database manager"url="tcp://10.75.0.5:37371"# ask the database for a learner that we can runlearner,fname=client_support.get_learner(url,learners,fnames)# load the datalearner.load(fname)# run until `some_goal` is reached with an `MPIPoolExecutor`# you can also use a ipyparallel.Client, or dask.distributed.Clientrunner=adaptive.Runner(learner,executor=MPIPoolExecutor(),shutdown_executor=True,goal=some_goal)# periodically save the data (in case the job dies)runner.start_periodic_saving(dict(fname=fname),interval=600)# log progress info in the job output script, optionalclient_support.log_info(runner,interval=600)# block until runner goal reachedrunner.ioloop.run_until_complete(runner.task)# tell the database that this learner has reached its goalclient_support.tell_done(url,fname)
在Jupyter笔记本中,我们可以启动“作业管理器”和“数据库管理器”,如:
fromadaptive_schedulerimportserver_supportfromlearners_fileimportlearners,fnames# create a new databasedb_fname="running.json"server_support.create_empty_db(db_fname,fnames)# create unique names for the jobsn_jobs=len(learners)job_names=[f"test-job-{i}"foriinrange(n_jobs)]# start the "job manager" and the "database manager"database_task=server_support.start_database_manager("tcp://10.75.0.5:37371",db_fname)job_task=server_support.start_job_manager(job_names,db_fname,cores=200,# number of cores per jobrun_script="run_learner.py",)
总之,您有三个文件:
- learners_file.py定义学习者及其文件名
- run_learner.py选择学习者并运行它
- 运行“数据库管理器”和“作业管理器”的Jupyter笔记本
实际上,你不必离开jupter笔记本,看看example notebook。
Jupyter笔记本示例
安装
警告:this仍然是α前发展阶段。
使用(推荐)安装conda的最新稳定版
conda install adaptive-scheduler
或者从pypi中使用
pip install adaptive_scheduler
或使用
安装master。pip install -U https://github.com/basnijholt/adaptive-scheduler/archive/master.zip
或者克隆存储库并进行dev安装(推荐用于dev)
git clone git@github.com:basnijholt/adaptive-scheduler.git cd adaptive-scheduler pip install -e .
开发
为了不污染笔记本的输出历史,请通过执行设置git过滤器
python ipynb_filter.py
在仓库里。
我们还使用pre-commit,所以pip install pre_commit并运行
pre-commit install
在仓库里。
限制
现在adaptive_scheduler只适用于slurm和pbs,但是只有adaptive_scheduler/slurm.py中的函数才需要为另一种类型的调度程序实现。还有根本没有测试!