腾讯云海外购

batch-compute & GPU分布式机器学习

当用户提交一些机器学习任务时,往往需要大规模的计算资源,但是对于响应时间并没有严格的要求。在这种场景下,首先使用的batch-compute(批量计算)产品来自动化提交用户的任务,然后使用分布式+gpu的方式解决算力问题,在任务完成后通知用户,是一个可行的解决方案。

本文将分成2部分:首先通过一个demo介绍上述过程的实现,从仅使用gpu、不考虑并行的简单情况开始,扩展至并行+gpu的情况,并简要介绍batch-compute的使用方法;然后介绍一些技术的实现原理(部分资料来源于知乎和博客,仅供参考)。

一个简单的Demo

使用pytorch,利用torch.Tensor对cuda的支持进行数据和模型的迁移。先不考虑并行,仅考虑如何将传统的基于cpu的机器学习任务迁移到gpu上。

  1. 定义一个简单的模型ConvNet:
class ConvNet(nn.Module):     def __init__(self, num_classes=10):         super(ConvNet, self).__init__()         self.layer1 = nn.Sequential(             nn.Conv2d(1, 16, kernel_size=5, stride=1,             padding=2),             nn.BatchNorm2d(16),             nn.ReLU(),             nn.MaxPool2d(kernel_size=2, stride=2))         self.layer2 = nn.Sequential(             nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),             nn.BatchNorm2d(32),             nn.ReLU(),             nn.MaxPool2d(kernel_size=2, stride=2))         self.fc = nn.Linear(7*7*32, num_classes)      def forward(self, x):         out = self.layer1(x)         out = self.layer2(out)         out = out.reshape(out.size(0), -1)         out = self.fc(out)         return out

2. 进行基于gpu的训练

def train(gpu, args):     torch.manual_seed(0)     model = ConvNet()     torch.cuda.set_device(gpu)  # set default gpu     model.cuda(gpu) # move model to gpu     batch_size = 100     criterion = nn.CrossEntropyLoss().cuda(gpu)     # move loss function to gpu     optimizer = torch.optim.SGD(model.parameters(), 1e-4)     # Data loading code     train_dataset = torchvision.datasets.MNIST(root='./data',                                                train=True,                                       transform=transforms.ToTensor(),                                                download=True)     train_loader = torch.utils.data.DataLoader(dataset=train_dataset,                                                batch_size=batch_size,                                                shuffle=True,                                                num_workers=0,                                                pin_memory=True)      start = datetime.now()     total_step = len(train_loader)     for epoch in range(args.epochs):         for i, (images, labels) in enumerate(train_loader):             images = images.cuda(non_blocking=True)             labels = labels.cuda(non_blocking=True)             outputs = model(images)             loss = criterion(outputs, labels)              optimizer.zero_grad()             loss.backward()             optimizer.step()

上面代码中的train函数接收一个gpu的编号gpu作为参数,并且在第4行用其指定torch默认使用的gpu。

在第5行,将模型迁移到gpu上。cuda()函数会返回将调用该函数的对象拷贝一份到cuda memory中并返回该拷贝。如果该对象已经存在cuda memory或是正确的gpu中,则直接返回原对象。

在第7行,将损失函数迁移到gpu上(如果不明白为什么函数也要迁移,可以查看github上这个issue)。

这样,机器学习任务就迁移到了gpu上。

然后来考虑并行。这里假设有多个节点,每个节点上有多个gpu,每个进程使用一块gpu。pytorch提供了分布式训练的包torch.distributed,并且支持跨节点训练。

  1. 在脚本中设置master节点的ip和port
import torch.multiprocessing as mp def main():     ...     args.world_size = args.gpus * args.nodes     os.environ['MASTER_ADDR'] = 'xxx.xxx.xxx.xxx'     os.environ['MASTER_PORT'] = '8888'     mp.spawn(train, nprocs=args.gpus, args=(args,))

第5,6行通过环境变量的方式设置了master的ip和端口,之后master将在该端口监听worker的连接请求并完成初始化、广播等操作。

第7行通过spawn函数在本地启动了数量等于gpu数的进程,并且每个进程中运行相同的函数train。如果一个进程异常退出,那么其他进程也会被终止。

2. 初始化本地进程,并等待其他进程初始化完毕

import torch.distributed as dist def train(gpu, args):     rank = args.nr * args.gpus + gpu     print('starting making group.......')     dist.init_process_group(         backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)     print('all processes have been started!')     torch.manual_seed(0)     ...

第5行的init_process_group是一个阻塞函数,在所有进程启动完毕且socket连接建立成功后返回。这里使用了nccl作为后端(也就是通信架构),可以参考pytorch官方给出的最佳指南;init_method参数表示通过环境变量发现master;rank表示当前进程在进程组中的优先级,rank=0的进程是master进程;world_size表示进程组中总共有多少进程。

2. 模型梯度同步

model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])

参与训练的数据集被分成多份,每个进程取一份input输入神经网络,独立计算梯度,然后将各个进程的梯度求平均值,用平均值更新模型参数。

3. 将数据划分到各个gpu上

train_sampler = torch.utils.data.distributed.DistributedSampler(     train_dataset,     num_replicas=args.world_size,     rank=rank)

DistributedSampler将输入按照batch_size划分到不同的gpu上,使得每个进程能读到不同的batch,且不同进程间不会读到重复的batch。

这样,机器学习任务就可以在不同节点的多个gpu上并行地执行,不同的进程只需指定不同的rank即可。

最后将任务通过batch-compute实现自动化的任务提交和执行。

首先介绍batch-compute的概念。现代云计算有多种形式,其中常见的2种是流式计算(stream computing)和批量计算(batch computing)。流式计算处理对实时性要求高的请求,具有低延迟、持续性等特征,一般用于实时推荐、监控等服务;批量计算处理对实时性要求低但需要大量计算资源的请求,往往是耗时较长的一次性作业。机器学习任务就是一种很典型的批量计算。

利用的batch-compute(批量计算)产品,开发者需要提供计算执行的环境、命令和输入输出存放的位置,由该产品自动去根据负载获取的弹性资源,并自动调度作业执行流程,将企业和科研机构的双手从架设和配置数据中心中解放出来。

本文中使用batch-compute的python SDK,分为2步:先创建计算环境,然后提交计算作业。

  1. 创建计算环境
req = batch_models.CreateComputeEnvRequest() params = '{\"ComputeEnv\":{\"EnvName\":\"batch-concurrent-test\",\"EnvData\":{\"InstanceType\":\"GN10X.2XLARGE40\",\"ImageId\":\"img-xxxxxx\",\"SystemDisk\":{\"DiskSize\":120},\"DesiredComputeNodeCount\":2},\"Placement\":{\"Zone\":\"ap-guangzhou-3\"}}' print(params) req.from_json_string(params) resp = self.batch_client.CreateComputeEnv(req) self.computeEnvId = json.loads(resp.to_json_string())["EnvId"]

第2行指定了创建2个节点,使用带gpu的机型GN10X.2XLARGE40;通过ImageId指定cvm的镜像,在这个镜像中部署了anaconda,pytorch,nvidia driver,cuda等。

如果需要获取创建节点的ip地址,可以通过第6行获取计算环境的id查看环境的详细信息。

2. 提交计算作业

commands = [     'sudo service docker restart',     'sudo service docker status',     'set -x',     'docker run -t --network host --gpus all <image-name> bash concurrent/task.sh', ] params = '{\"Placement\":{\"Zone\":\"ap-guangzhou-3\"},\"Job\":{\"JobName\":\"test-job\",\"Tasks\":[{\"TaskName\":\"concurrent-task\",\"InputMappings\":[{\"SourcePath\":\"%s\",\"DestinationPath\":\"%s\"}],\"TaskInstanceNum\":2,\"Application\":{\"Command\":\"%s\"},\"EnvId\":\"%s\",\"RedirectInfo\":{\"StdoutRedirectPath\":\"%s\",\"StderrRedirectPath\":\"%s\"}}]}}' % (self.inPath, self.destPath, " && ".join(commands), self.computeEnvId, self.outPath, self.errPath) req.from_json_string(params) resp = self.batch_client.SubmitJob(req)

在第5行启动了一个docker容器并使用容器内装好的cuda。此处将网络设置为host模式使得可以在容器内通过host ip直接访问另一个节点上的容器;设置-t参数使得运行结果与在终端通过命令行手动执行的输出保持一致;但是不能设置-i参数,因为输入设备并不是一个真正的tty;设置cmd参数使得容器启动后执行task.sh脚本:

[[ $(hostname -I | cut -d ' ' -f 1) == "xxx.xxx.xxx.xxx" ]]; python3 concurrent/mnist-distributed.py -n 2 -nr $?

第1行判断当前节点的ip是否为master节点的ip;第二行运行执行机器学习任务的python脚本,并传入rank参数,如果是master节点则传入0,否则,传入1

3. 运行结果

为了直观地演示并行机器学习的输出结果,笔者在两台cvm上手动执行了脚本:

如图,首先通过ip地址判断脚本输入参数中的rank值,并且等待所有进程启动成功。

然后开始训练,可以看见每个节点上进行了3个epoch,batch_size为300,耗时8秒左右。

为了对比使用并行前后的差距,在一个节点上启动任务。

如图,进行了3个epoch,batch_size为600,耗时为12秒左右。

至此,机器学习的任务就通过batch-compute产品提交并且在2台云服务器上并行地执行了,以下搬运一些pytorch文档/博客/知乎上关于分布式训练的原理实现。

原理

  1. DDP(DistributedDataParallel)的构造函数

每个进程都有一个模型(module)。在构造函数中,DDP首先获得该module的引用,然后将module.state_dict()从master进程广播到全体进程,使得所有进程具有相同的初始状态。state_dict的返回值是buffer等不在参数列表中但是代表了网络状态的数据,例如batch normalization中的running_mean。

不同进程间梯度的汇总、求和和同步是通过一个Reducer类实现的。在构造函数中初始化了一个Reducer对象,并通过该对象管理梯度计算。

在Reducer对象的构造函数中,首先将所有的参数装进若干个bucket(桶),之后一桶一桶地计算可以提高效率。参数进入桶的顺序和其在数组Model.parameters中的顺序相反,后向传播中最后一层的梯度是最先被计算完毕的,因此应该最先参加求和。

然后,Reducer为每个参数注册了一个autograd_hook,在该参数被计算完毕后触发。

2. 前向传播

前向传播没有涉及梯度计算,但是设计一个corner case——如果用户定义了某些参数但是没有将其加入模型之中(即神经网络中存在孤立节点),那么autograd_hook永远不会被触发。为此,DDP的构造函数中提供了find_unused_parameters,如果被设置为True,则在前向传播完毕后会找出这些节点并直接将其标记为已完成计算。当然这一操作会引入额外的开销,因此作为一个参数。

3. 后向传播

当所有节点上的同一编号的bucket中所有梯度均计算完成后,启动异步函数all_reduce求和。本地计算梯度和跨节点求平均值可以并行地进行,因为后向传播中用到的只是本地的计算结果(因为前向传播中的output就是只用local input算出来的)。

4. all_reduce实现细节

all_reduce实现了跨节点的求和计算。一种主流的实现方式是Parameter Server,即一个master节点接收其他节点发送的数值并求和,然后将结果发送给其他节点。

但是这样会引入单点故障,因此Pytorch 1.x使用了一种名为Ring AllReduce的算法(Uber的开源分布式框架Horovord也采用了这一算法)。正如其名字所表现的,所有节点排成一个环,每个节点从作邻居接收数据,在本地完成一部分求和工作,然后向右邻居发送数据。所有节点是平等的,没有master节点。

Ring AllReduce算法分成2个阶段:Share-Reduce阶段和Share-Only阶段。

在Share-Reduce阶段结束后,每个节点上会得到一部分位置的求和结果。

如图,经过3轮迭代后,WorkerA将得到a1+b1+c1+d1,WorkerB将得到a2+b2+c2+d2,WorkerC将得到a3+b3+c3+d3,WorkerD将得到a0+b0+c0+d0。

在Share-Only阶段,节点间共享这些和,使得所有节点最终拥有所有位置的求和结果。

如图,经过3轮迭代后,每个节点都会拥有全部4个位置的和。

5. Master进程有何意义?

既然使用了Ring AllReduce算法,那么在使用torch.distributed包时一定要指定的master ip&port有什么作用呢?

Master的主要作用时在初始化时为各个进程建立连接。具体而言,Master会创建一个守护线程,在这个线程中为所有worker各自创建一个socket,然后等待worker的连接,并在连上后发送其他进程所在的位置。

Worker则创建和master通信的socket,并主动连接master,在连上后获取其他进程的位置信息并报告自己的位置,然后和其他进程建立连接。

参考文献

1. https://pytorch.org/docs/stable/data.html

2. https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html

3. https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html

4. https://pytorch.org/docs/stable/notes/ddp.html

5. https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da

6. https://zhuanlan.zhihu.com/p/76638962

7. https://www.zhihu.com/question/306242771/answer/825668022