daskpytorch已被重命名为daskpytorchddp
dask-pytorch的Python项目详细描述
达斯克Pythorch ddp
注意:dask Pythorch已更名为dask Pythorch ddp!在
我们发布了dask Pythorch v0.1.2的最终版本,它与dask Pythorch DDPv0.2.0相同,以简化这种过渡。你可以在PyPI上的https://pypi.python.org/pypi/dask-pytorch-ddp/找到dask pytorch ddp。在
dask-pytorch-ddp
是一个pythorch包,它使得使用分布式数据并行在Dask集群上训练PyTorch模型变得很容易。项目的预期范围是
- 在达斯克集群上引导Pythorch工人
- 使用分布式数据存储(如S3)作为普通PyTorch数据集
- 跟踪和记录中间结果、训练统计信息和检查点的机制。在
在这一点上,这个库和提供的示例是为计算机视觉任务而定制的,但是这个库对于任何类型的PyTorch任务都是有用的。真正特定于图像处理的只有S3ImageFolder
数据集类。在图像之外实现Pythorch数据集(假设地图样式的随机访问)目前需要实现__getitem__(self, idx: int):
和{
典型的非dask工作流
非dask PyTorch用法的典型示例如下:
加载数据
创建一个数据集(ImageFolder
),并将其包装在DataLoader
transform=transforms.Compose([transforms.Resize(256),transforms.CenterCrop(250),transforms.ToTensor()])whole_dataset=ImageFolder(path,transform=transform)batch_size=100num_workers=64indices=list(range(len(data)))np.random.shuffle(indices)train_idx=indices[:num]test_idx=indices[num:num+num]train_sampler=SubsetRandomSampler(train_idx)train_loader=DataLoader(data,sampler=train_sampler,batch_size=batch_size,num_workers=num_workers)
训练模型
在数据集上循环,并通过单步执行优化器来训练模型
^{pr2}$现在在达斯克
通过dask_Pythorch_ddp和Pythorch分布式数据并行,我们可以对多个工人进行如下培训:
加载数据
从S3加载数据集,并显式设置多处理上下文(Dask默认为spawn,但pytorch通常配置为使用fork)
fromdask_pytorch_ddp.dataimportS3ImageFolderwhole_dataset=S3ImageFolder(bucket,prefix,transform=transform)train_loader=torch.utils.data.DataLoader(whole_dataset,sampler=train_sampler,batch_size=batch_size,num_workers=num_workers,multiprocessing_context=mp.get_context('fork'))
并行训练
将训练循环包装在函数中(并添加度量日志记录)。不需要,但非常有用)。将模型转换为Pythorch分布式数据并行(DDP
)模型,该模型知道如何在工人之间同步渐变。在
importuuidimportpickleimportloggingimportjsonkey=uuid.uuid4().hexrh=DaskResultsHandler(key)defrun_transfer_learning(bucket,prefix,samplesize,n_epochs,batch_size,num_workers,train_sampler):worker_rank=int(dist.get_rank())device=torch.device(0)net=models.resnet18(pretrained=False)model=net.to(device)model=DDP(model,device_ids=[0])criterion=nn.CrossEntropyLoss().cuda()lr=0.001optimizer=optim.SGD(model.parameters(),lr=lr,momentum=0.9)whole_dataset=S3ImageFolder(bucket,prefix,transform=transform)train_loader=torch.utils.data.DataLoader(whole_dataset,sampler=train_sampler,batch_size=batch_size,num_workers=num_workers,multiprocessing_context=mp.get_context('fork'))count=0forepochinrange(n_epochs):# Each epoch has a training and validation phasemodel.train()# Set model to training modeforinputs,labelsintrain_loader:dt=datetime.datetime.now().isoformat()inputs=inputs.to(device)labels=labels.to(device)outputs=model(inputs)_,preds=torch.max(outputs,1)loss=criterion(outputs,labels)# zero the parameter gradientsoptimizer.zero_grad()loss.backward()optimizer.step()count+=1# statisticsrh.submit_result(f"worker/{worker_rank}/data-{dt}.json",json.dumps({'loss':loss.item(),'epoch':epoch,'count':count,'worker':worker_rank}))if(count%100)==0andworker_rank==0:rh.submit_result(f"checkpoint-{dt}.pkl",pickle.dumps(model.state_dict()))
它是如何工作的?在
dask-pytorch-ddp
很大程度上是现有pytorch
功能的包装器。pytorch.distributed
为Distributed Data Parallel(DDP)提供基础设施。在
在DDP中,创建N个worker,第0个worker是“master”,协调缓冲区和渐变的同步。在SGD中,梯度通常在批处理中的所有数据点之间取平均值。通过在多个worker上运行批处理,并平均梯度,DDP使您能够以更大的批大小运行SGD (N * batch_size)
dask-pytorch-ddp
设置一些环境变量来配置“master”主机和端口,然后在训练前调用init_process_group
,训练后调用destroy_process_group
。这与通常由数据科学家手动完成的过程相同。在
多GPU机器
dask_cuda_worker
为它创建的每个worker自动旋转CUDA_VISIBLE_DEVICES
(通常每个GPU一个)。因此,Pythorch代码应该始终以第0个GPU开头。在
例如,如果我有一台8gpu机器,那么第三个worker将把CUDA_VISIBLE_DEVICES
设置为2,3,4,5,6,7,0,1
。在那个worker上,如果我调用torch.device(0)
,我将得到gpu2。在
还有什么?在
{{cds3>也实现了DaskResultsHandler
,但计划了一个基于S3的结果处理程序。在
一些注意事项
Dask通常产生进程。Pythorch通常是叉子。使用支持多处理的数据加载器时,最好传递Fork
多处理上下文,以强制在数据加载器中使用分叉。在
某些部署不允许生成Dask进程。要覆盖此设置,您可以更改distributed.worker.daemon设置。在
环境变量是一种方便的方法:
DASK_DISTRIBUTED__WORKER__DAEMON=False
- 项目
标签: