daskpytorch已被重命名为daskpytorchddp

dask-pytorch的Python项目详细描述


达斯克Pythorch ddp

注意:dask Pythorch已更名为dask Pythorch ddp!在

我们发布了dask Pythorch v0.1.2的最终版本,它与dask Pythorch DDPv0.2.0相同,以简化这种过渡。你可以在PyPI上的https://pypi.python.org/pypi/dask-pytorch-ddp/找到dask pytorch ddp。在

dask-pytorch-ddp是一个pythorch包,它使得使用分布式数据并行在Dask集群上训练PyTorch模型变得很容易。项目的预期范围是

  • 在达斯克集群上引导Pythorch工人
  • 使用分布式数据存储(如S3)作为普通PyTorch数据集
  • 跟踪和记录中间结果、训练统计信息和检查点的机制。在

在这一点上,这个库和提供的示例是为计算机视觉任务而定制的,但是这个库对于任何类型的PyTorch任务都是有用的。真正特定于图像处理的只有S3ImageFolder数据集类。在图像之外实现Pythorch数据集(假设地图样式的随机访问)目前需要实现__getitem__(self, idx: int):和{}。我们计划在未来为其他用例添加更多不同的示例,并欢迎PRs扩展功能。在

典型的非dask工作流

非dask PyTorch用法的典型示例如下:

加载数据

创建一个数据集(ImageFolder),并将其包装在DataLoader

transform=transforms.Compose([transforms.Resize(256),transforms.CenterCrop(250),transforms.ToTensor()])whole_dataset=ImageFolder(path,transform=transform)batch_size=100num_workers=64indices=list(range(len(data)))np.random.shuffle(indices)train_idx=indices[:num]test_idx=indices[num:num+num]train_sampler=SubsetRandomSampler(train_idx)train_loader=DataLoader(data,sampler=train_sampler,batch_size=batch_size,num_workers=num_workers)

训练模型

在数据集上循环,并通过单步执行优化器来训练模型

^{pr2}$

现在在达斯克

通过dask_Pythorch_ddp和Pythorch分布式数据并行,我们可以对多个工人进行如下培训:

加载数据

从S3加载数据集,并显式设置多处理上下文(Dask默认为spawn,但pytorch通常配置为使用fork)

fromdask_pytorch_ddp.dataimportS3ImageFolderwhole_dataset=S3ImageFolder(bucket,prefix,transform=transform)train_loader=torch.utils.data.DataLoader(whole_dataset,sampler=train_sampler,batch_size=batch_size,num_workers=num_workers,multiprocessing_context=mp.get_context('fork'))

并行训练

将训练循环包装在函数中(并添加度量日志记录)。不需要,但非常有用)。将模型转换为Pythorch分布式数据并行(DDP)模型,该模型知道如何在工人之间同步渐变。在

importuuidimportpickleimportloggingimportjsonkey=uuid.uuid4().hexrh=DaskResultsHandler(key)defrun_transfer_learning(bucket,prefix,samplesize,n_epochs,batch_size,num_workers,train_sampler):worker_rank=int(dist.get_rank())device=torch.device(0)net=models.resnet18(pretrained=False)model=net.to(device)model=DDP(model,device_ids=[0])criterion=nn.CrossEntropyLoss().cuda()lr=0.001optimizer=optim.SGD(model.parameters(),lr=lr,momentum=0.9)whole_dataset=S3ImageFolder(bucket,prefix,transform=transform)train_loader=torch.utils.data.DataLoader(whole_dataset,sampler=train_sampler,batch_size=batch_size,num_workers=num_workers,multiprocessing_context=mp.get_context('fork'))count=0forepochinrange(n_epochs):# Each epoch has a training and validation phasemodel.train()# Set model to training modeforinputs,labelsintrain_loader:dt=datetime.datetime.now().isoformat()inputs=inputs.to(device)labels=labels.to(device)outputs=model(inputs)_,preds=torch.max(outputs,1)loss=criterion(outputs,labels)# zero the parameter gradientsoptimizer.zero_grad()loss.backward()optimizer.step()count+=1# statisticsrh.submit_result(f"worker/{worker_rank}/data-{dt}.json",json.dumps({'loss':loss.item(),'epoch':epoch,'count':count,'worker':worker_rank}))if(count%100)==0andworker_rank==0:rh.submit_result(f"checkpoint-{dt}.pkl",pickle.dumps(model.state_dict()))

它是如何工作的?在

dask-pytorch-ddp很大程度上是现有pytorch功能的包装器。pytorch.distributedDistributed Data Parallel(DDP)提供基础设施。在

在DDP中,创建N个worker,第0个worker是“master”,协调缓冲区和渐变的同步。在SGD中,梯度通常在批处理中的所有数据点之间取平均值。通过在多个worker上运行批处理,并平均梯度,DDP使您能够以更大的批大小运行SGD (N * batch_size)

dask-pytorch-ddp设置一些环境变量来配置“master”主机和端口,然后在训练前调用init_process_group,训练后调用destroy_process_group。这与通常由数据科学家手动完成的过程相同。在

多GPU机器

dask_cuda_worker为它创建的每个worker自动旋转CUDA_VISIBLE_DEVICES(通常每个GPU一个)。因此,Pythorch代码应该始终以第0个GPU开头。在

例如,如果我有一台8gpu机器,那么第三个worker将把CUDA_VISIBLE_DEVICES设置为2,3,4,5,6,7,0,1。在那个worker上,如果我调用torch.device(0),我将得到gpu2。在

还有什么?在

{{cds3>也实现了dask-pytorch-ddp还实现了一个基本的结果聚合框架,这样就可以很容易地跨不同的员工收集培训指标。目前,只实现了利用Dask pub-sub communication protocolsDaskResultsHandler,但计划了一个基于S3的结果处理程序。在

一些注意事项

Dask通常产生进程。Pythorch通常是叉子。使用支持多处理的数据加载器时,最好传递Fork多处理上下文,以强制在数据加载器中使用分叉。在

某些部署不允许生成Dask进程。要覆盖此设置,您可以更改distributed.worker.daemon设置。在

环境变量是一种方便的方法:

DASK_DISTRIBUTED__WORKER__DAEMON=False

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
尝试连接到Red5服务器时出现java问题   java实现Runnable的类被认为是ExecutorServices的“Runnable任务”?   java struts2类中的多个@validation   java未能应用插件[class'org.gradle.api.plugins.scala.ScalaBasePlugin']:gradle v2。13   如何使用Java流仅收集长度最大的元素?   从spring引导应用程序连接到firestore的java引发空指针异常   java从SQLite插入和获取真实数据类型会为连续插入获取空值吗?   当存在未知数量的空格时,使用java替代正向查找   部署如何为当今的浏览器部署java小程序(小程序、嵌入、对象)?   @OneToMany和@ManyToOne@Formula之间的java双向关系返回null   java为什么在我的例子中,协议缓冲区的性能比JSON差?   如何部署混合C++/Java(JNI)应用程序?   java如何在程序中显示字符串的完整信息。反恐精英?   java在Hibernate中从持久性上下文中分离实体中的实体