使用“mpi4py.futures”、“ipyparallel”或“dask mpi”在多个核心(>10k)上运行许多“adaptive.learner”。

adaptive-scheduler的Python项目详细描述


PyPICondaDownloadsBuild StatusDocumentation Status

使用mpi4py.futuresipyparalleldask.distributed在多个核心(>;10K)上运行多个adaptive.Learners。

这是什么?

Adaptive Scheduler解决了以下问题:您需要运行的学习者比使用单个运行器和/或使用>;1K内核运行的学习者多。

ipyparalleldask.distributed为交互式会话提供了非常强大的引擎。但是,当您想连接到>;1K核心时,它会开始挣扎。除此之外,在共享集群上,经常会出现在有足够可用空间的情况下启动交互式会话的问题。

我们的方法是为每个adaptive.Learner调度不同的作业。这些作业的创建和运行由adaptive-scheduler管理。这意味着您的计算肯定会运行,即使此时集群可能已满。由于这种方法,几乎没有限制要使用多少内核。在调度数百个作业时,可以为1个作业(learner)使用10个节点,也可以为1个作业(learner)使用1个核心。

所有事物都是这样写的,即计算最大限度地是局部的。这意味着这是一个作业崩溃,没有问题,它将自动调度一个新的作业,并在停止时继续计算(因为adaptive具有定期保存功能)。即使中央“作业管理器”死亡,作业仍将继续运行(尽管不会安排新作业)。

设计目标

  1. 需要能够在30K核上高效运行
  2. 可与自适应软件包无缝配合使用
  3. 文件系统的最小负载
  4. 删除与调度程序一起工作的所有样板文件
    1. 编写作业脚本
    2. (重新)提交作业脚本
  5. 以最小的数据丢失处理随机崩溃(或节点逐出)
  6. 在作业中保留python内核和变量(与为每个参数提交作业不同)
  7. 将仿真定义代码与运行仿真的代码分开
  8. 最大化计算局部性,当主进程死亡
  9. 时,作业继续运行

它是如何工作的?

创建一个文件,在其中定义一组learners和相应的fnames,以便可以导入它们,例如:

# learners_file.pyimportadaptivefromfunctoolsimportpartialdefh(x,pow,a):returna*x**powcombos=adaptive.utils.named_product(pow=[0,1,2,3,4,5,6,7,8,9],a=[0.1,0.5],)# returns list of dicts, cartesian product of all valueslearners=[adaptive.Learner1D(partial(h,**combo),bounds=(-1,1))forcomboincombos]fnames=[f"data/{combo}"forcomboincombos]

然后开始一个过程,创建和提交尽可能多的作业脚本。例如:

importadaptive_schedulerdefgoal(learner):returnlearner.npoints>200run_manager=adaptive_scheduler.server_support.RunManager(learners_file="learners_file.py",goal=goal,cores_per_job=12,# every learner is one joblog_interval=30,#  write info such as npoints, cpu_usage, time, etc. to the job log filesave_interval=300,# save the data every 300 seconds)run_manager.start()

就这样!您可以运行run_manager.info(),它将显示一个交互式的ipywidget,显示正在运行、挂起和完成的作业的数量、取消作业的按钮以及其他有用信息。

Widget demo

但是是如何工作的呢?

~adaptive_scheduler.server_support.runmanager基本上执行下面写的操作。 因此,您需要创建一个learners_file.py来定义learnersfnames(如上面的部分所示)。 然后,一个“作业管理器”编写和提交的作业数量与学习者的数量一样多,但是不知道它将运行哪个学习者! 这是“数据库管理器”的职责,它将数据库保存为job_id <--> learner

在另一个python文件(在节点上运行的文件)中,我们执行如下操作:

# run_learner.pyimportadaptivefromadaptive_schedulerimportclient_supportfrommpi4py.futuresimportMPIPoolExecutor# the file that defines the learners we created abovefromlearners_fileimportlearners,fnamesif__name__=="__main__":# ← use this, see warning @ https://bit.ly/2HAk0GG# the address of the "database manager"url="tcp://10.75.0.5:37371"# ask the database for a learner that we can runlearner,fname=client_support.get_learner(url,learners,fnames)# load the datalearner.load(fname)# run until `some_goal` is reached with an `MPIPoolExecutor`# you can also use a ipyparallel.Client, or dask.distributed.Clientrunner=adaptive.Runner(learner,executor=MPIPoolExecutor(),shutdown_executor=True,goal=some_goal)# periodically save the data (in case the job dies)runner.start_periodic_saving(dict(fname=fname),interval=600)# log progress info in the job output script, optionalclient_support.log_info(runner,interval=600)# block until runner goal reachedrunner.ioloop.run_until_complete(runner.task)# tell the database that this learner has reached its goalclient_support.tell_done(url,fname)

在Jupyter笔记本中,我们可以启动“作业管理器”和“数据库管理器”,如:

fromadaptive_schedulerimportserver_supportfromlearners_fileimportlearners,fnames# create a new databasedb_fname="running.json"server_support.create_empty_db(db_fname,fnames)# create unique names for the jobsn_jobs=len(learners)job_names=[f"test-job-{i}"foriinrange(n_jobs)]# start the "job manager" and the "database manager"database_task=server_support.start_database_manager("tcp://10.75.0.5:37371",db_fname)job_task=server_support.start_job_manager(job_names,db_fname,cores=200,# number of cores per jobrun_script="run_learner.py",)

总之,您有三个文件:

  • learners_file.py定义学习者及其文件名
  • run_learner.py选择学习者并运行它
  • 运行“数据库管理器”和“作业管理器”的Jupyter笔记本

实际上,你不必离开jupter笔记本,看看example notebook

Jupyter笔记本示例

example.ipynb

安装

警告:this仍然是α前发展阶段。

使用(推荐)安装conda的最新稳定版

conda install adaptive-scheduler

或者从pypi中使用

pip install adaptive_scheduler

或使用

安装master
pip install -U https://github.com/basnijholt/adaptive-scheduler/archive/master.zip

或者克隆存储库并进行dev安装(推荐用于dev)

git clone git@github.com:basnijholt/adaptive-scheduler.git
cd adaptive-scheduler
pip install -e .

开发

为了不污染笔记本的输出历史,请通过执行设置git过滤器

python ipynb_filter.py

在仓库里。

我们还使用pre-commit,所以pip install pre_commit并运行

pre-commit install

在仓库里。

限制

现在adaptive_scheduler只适用于slurm和pbs,但是只有adaptive_scheduler/slurm.py中的函数才需要为另一种类型的调度程序实现。还有根本没有测试

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如何将java ArrayList对象转换为实际值   web服务如何在JAVA类uisng JAXBElement中解组SOAP UI值   包含无值参数的java HttpClient表单URL   运行简单Camel Restlet演示项目的java问题?   带有自定义图标的java简单SWT警报消息?   java Netbeans 6.8:LibImport可以工作,但编译时“包不存在”   java如何获得绘画完成的通知?   java Hazelcast客户端模式群集故障后如何恢复?   Neo4J中Shapefile的java批插入   为什么ThreadPoolExecutor在Eclipse和从命令行运行Java程序时表现不同?   java在Android中计算两个坐标之间的距离时得到了荒谬的值   java在CardLayout中显示卡本身的下一张卡