16 Distributed Training How to Speed Up Your Model Training

16 Distributed Training How to Speed Up Your Model Training #

Hello, I’m Fang Yuan.

In the previous courses, we have learned the essential components of deep learning, including building networks, loss functions, optimization methods, etc. Once we have mastered these aspects, we can train models for many scenarios.

However, sometimes our models are large, or we have a lot of training data, which can slow down the training process. So what can we do in such cases? This is where distributed training comes in. With distributed training, we can greatly accelerate our training process.

In this lesson, I will introduce you to the basics of distributed training, allowing you to understand the working principles behind it. Finally, I will combine it with a practical project, giving you a chance to apply what you have learned and deepen your understanding of this topic.

Principle of Distributed Training #

Before we dive into the specifics of distributed training, let’s briefly understand why deep learning uses GPUs.

In general, when using a computer, programs store process or thread data resources in memory and perform computations on the CPU. Typical programs involve a lot of branching logic operations, such as if-else statements, which CPUs are good at.

In deep learning, however, the training and computation processes of models do not involve much branching. Instead, they mainly involve matrix or vector calculations, which are well-suited for GPUs. The entire process of GPU processing is a streaming process.

But no matter how good a car is, it can’t compare to one with 12 cylinders compared to one with just one cylinder. Similarly, relying on just one GPU will not be fast enough. Therefore, there is a method called distributed training, which involves multiple GPUs working together. The key to distributed training lies in the following two questions:

  1. What is being distributed? The answer is two parts: data and models.
  2. How is the distribution done? Again, the answer is two parts: single machine multiple cards and multiple machines multiple cards.

In other words, to achieve distributed training in deep learning, we need to use either single machine multiple cards or multiple machines multiple cards to train data and models distributed across different GPUs. Now, let’s start with the simple case of single machine single card to understand the training process of GPUs.

Single Machine Single Card #

Imagine if you were asked to push data or models to a GPU, what steps would you need to take? Let’s start with the case of a single GPU.

The first step is to determine how many GPUs you have. In PyTorch, you can use the torch.cuda.is_available() function to determine if there are available GPUs on the current machine, and the torch.cuda.device_count() function to get the number of available GPUs.

The second step is to obtain an instance of the GPU. For example, the following line of code:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Here, torch.device represents the device on which torch.Tensor is allocated, which is an instance of a device object, namely the GPU. cuda:0 represents the first GPU. Of course, you can also omit :0 and it will default to the first one. If there is no GPU available (torch.cuda.is_available()), then you can only use the CPU.

The third step is to push the data or models to the GPU. This process is called migration.

In PyTorch, this process is highly encapsulated. In other words, as long as the content to be pushed to the GPU is a tensor or model, you can quickly implement it using the to() function. For example:

data = torch.ones((3, 3))
print(data.device)
# Output: cpu

# Get the device
device = torch.device("cuda:0")

# Push data to the GPU
data_gpu = data.to(device)
print(data_gpu.device)
# Output: cuda:0

In the code above, we first create a regular tensor data, and we can see that data is on the CPU by checking the device attribute. Then, using the to() function, we migrate data to the GPU, and we can see that data is now on the GPU by checking the device attribute again.

So, is it the same for models? The answer is yes. Let’s look at an example:

net = nn.Sequential(nn.Linear(3, 3))
net.to(device)

Here, we can still use the to() function.

The single machine single card mode is like having a batch of products to process, and assigning it to one worker and one machine to complete. This case is manageable for a small quantity, but when the number of products becomes large, you need more workers and machines to finish the job quickly.

Deep learning is the same, in many scenarios such as recommendation algorithm models, language models, etc., there are often millions, tens of millions, or even billions of training data. If only one GPU is used, it is definitely not enough. Thus, solutions like single machine multiple cards and multiple machines multiple cards were created.

Single Machine Multiple Cards #

So, how does single machine multiple card training work in PyTorch? In fact, PyTorch provides several solutions, but let’s start with the simplest and most commonly used method: nn.DataParallel(). It is defined as follows:

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

Here, module is the model you defined, device_ids is the GPU device numbers used during model training, and output_device represents the device where the output results are placed, with a default of 0, which is the first card.

We can use the nvidia-smi command to check GPU usage. If you pay close attention, you will notice that when using multiple cards for training, the card used for output_device clearly takes up more video memory.

If we continue to observe, we will find that when using DataParallel, data usage is parallel, meaning each card receives the same amount of data. However, the loss is computed on the output_device card, which means the load on the output_device card increases further.

图1

Is that it? Yes, it’s that simple. We only need the DataParallel function to distribute the model to multiple GPUs. However, we still need to understand the underlying logic, because only by understanding this logic can we better utilize GPUs and fully leverage the advantages of multiple GPUs when encountering issues such as time computation, resource estimation, optimization, and debugging in future development.

During the forward calculation process of the model, data is divided into multiple blocks and pushed to different GPUs for computation. However, what’s different is that the model is copied in each GPU. Let’s take a look at the code below:

class ASimpleNet(nn.Module):
    def __init__(self, layers=3):
        super(ASimpleNet, self).__init__()
        self.linears = nn.ModuleList([nn.Linear(3, 3, bias=False) for i in range(layers)])
    def forward(self, x):
        print("forward batchsize is: {}".format(x.size()[0]))
        x = self.linears(x)
        x = torch.relu(x)
        return x

batch_size = 16
inputs = torch.randn(batch_size, 3)
labels = torch.randn(batch_size, 3)
inputs, labels = inputs.to(device), labels.to(device)
net = ASimpleNet()
net = nn.DataParallel(net)
net.to(device)
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
for epoch in range(1):
    outputs = net(inputs)

# Get:
# CUDA_VISIBLE_DEVICES : 3, 2, 1, 0
# forward batchsize is: 4
# forward batchsize is: 4
# forward batchsize is: 4
# forward batchsize is: 4

In the above code, we can see from CUDA_VISIBLE_DEVICES that the number of visible GPUs for the current program is 4, and our batch size is 16. The output of the forward pass of the model on each GPU indicates that each GPU receives 4 data points. This means that DataParallel automatically splits and loads the data onto the corresponding GPUs, copies the model to the respective GPUs, performs forward propagation, calculates gradients, and aggregates them.

Multi-Machine Multi-GPU #

Multi-machine multi-GPU training is generally performed using cluster-based methods for large-scale training. This involves many aspects, but in this lesson, we will only discuss the basic principles and methods. In specific practice, you may encounter other problems related to networks or environments, and you will need to solve them specifically.

DP and DDP #

As mentioned earlier, for single machine multi-GPU training, there is a simplest method called DataParallel. In addition to DataParallel, PyTorch also has another main API for data parallelism, called DistributedDataParallel. DistributedDataParallel (DDP) is the key API for implementing multi-machine multi-GPU training.

DataParallel is abbreviated as DP, while DistributedDataParallel is abbreviated as DDP. Let’s take a closer look at the differences between DP and DDP.

Let’s start with DP. DP controls multiple GPUs with a single process. As we can see from the previous code, DP splits a batch of input data into n parts (n is the number of GPUs actually used) and sends them to the corresponding GPUs for computation.

During the forward pass of the network, the model is copied from the main GPU to the other GPUs. During the backward pass, the gradients on each GPU are accumulated on the main GPU, and after calculating the average, the model parameters are updated and then copied to the other GPUs. This achieves parallelism.

Since the main GPU needs to aggregate gradients, update the model, and distribute computational tasks to other GPUs, the load and utilization of the main GPU are higher than the other GPUs, resulting in GPU load imbalance.

Now let’s talk about DDP. DDP controls multiple GPUs with multiple processes. A separate process is created for each GPU, without a main GPU, and each GPU performs the same tasks. DDP uses the DistributedSampler to load data to ensure that there is no overlap of data between processes.

During the backward pass, after each GPU finishes gradient calculation, the gradients are averaged and broadcasted among the processes, and then each process updates the gradients on their respective GPUs to ensure consistency of the model parameters on each GPU. Since there is no need to copy the model between different GPUs, DDP has less data transfer and faster speed.

DistributedDataParallel can be used for both single machine multi-GPU and multi-machine multi-GPU training, and it can solve the problems of slow DataParallel and GPU load imbalance. Therefore, it is recommended to use DistributedDataParallel for distributed training, which is the topic of the following section.

DDP Training #

DistributedDataParallel is mainly designed for multi-machine multi-GPU training, but it can also be used on a single machine.

To understand the training mechanism of DDP, we need to understand several concepts in distributed computing:

  • group: a process group. By default, there is only one group, which is a world.
  • world_size: the total number of processes for training, representing the number of nodes (machines) being used.
  • rank: the process rank used for inter-process communication, representing the priority of the process. The process with rank 0 is the master node.

The specific steps for performing distributed training with DDP are as follows. We will go through each step and implement it accordingly.

Image

The first step is to initialize the process group. We use the init_process_group function to initialize distributed training, with the following definition:

torch.distributed.init_process_group(backend, init_method=None,, world_size=-1, rank=-1, group_name='')

Let’s take a closer look at the parameters:

  • backend: specifies the communication backend to be used, which can be “nccl” or “gloo”. Generally, nccl is used for GPU distributed training, and gloo is used for CPU distributed training.
  • init_method: a string representing the initialization method, it can be “env://” (default) indicating initialization from environment variables, or it can be specified as a TCP URL or a shared file system URL.
  • world_size: the total number of processes for training, representing the number of nodes (machines) being used.
  • rank: the rank of the current process, representing its priority and the identifier of the current node (machine).
  • group_name: the name of the process group.

Here is an example of using the nccl backend:

torch.distributed.init_process_group(backend="nccl")

After initialization, the second step is to parallelize the model. As mentioned before, we can use DistributedDataParallel to distribute the model across multiple GPUs. The function definition is as follows:

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0)

The parameters of DistributedDataParallel are similar to DataParallel, so to parallelize the model, we only need to replace the DataParallel function with DistributedDataParallel. Here is the code:

net = torch.nn.parallel.DistributedDataParallel(net)

The last step is to create a distributed data sampler. In the case of multi-machine multi-GPU training, data loading is also an issue, as each GPU should read different data.

In DP, a batch of data is directly divided across different GPUs, but frequent data transfers between multiple machines and GPUs can seriously affect efficiency. To address this, we use the distributed data sampler DistributedSampler, which assigns each sub-process a portion of the dataset to ensure that DataLoader only loads a specific subset of data for each sub-process, avoiding any duplicate data between processes.

Here is the code for creating and using the distributed data sampler:

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
data_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)

Let me explain the code to you. First, we assign the train_dataset to the DistributedSampler and create a distributed data sampler train_sampler.

When constructing the DataLoader, we pass sampler=train_sampler as a parameter, so that each GPU can load its own subset of data. In other words, when using DDP, each GPU reads its own data from its own disk, instead of the main GPU distributing data to other GPUs.

This is where we will end the discussion of why and how to use distributed training. I believe you now have a preliminary understanding of data parallelism and model parallelism.

Trying Something New #

Below we will explain an official ImageNet example from PyTorch. In the future, you can use this small project as a template for distributed training.

This example can be used with either DP or DDP. Now let’s take a look at the core code together.

if args.distributed:
    if args.dist_url == "env://" and args.rank == -1:
        args.rank = int(os.environ["RANK"])
    if args.multiprocessing_distributed:
        # For multiprocessing distributed training, rank needs to be the global rank among all the processes
        args.rank = args.rank * ngpus_per_node + gpu
    dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                            world_size=args.world_size, rank=args.rank)

Here, you can pay attention to the args.distributed parameter in the example code. If args.distributed is True, it means DDP is being used. Otherwise, it means DP is being used.

Let’s take a look at the initialization code for DDP in the main_worker function. If DDP is being used, the init_process_group function is used to initialize the process group. ngpus_per_node represents the number of GPUs per node.

Next, let’s take a look at this logic code in the main_worker function.

if not torch.cuda.is_available():
    print('using CPU, this will be slow')
elif args.distributed:
    # For multiprocessing distributed, DistributedDataParallel constructor should always set the single device scope, otherwise, DistributedDataParallel will use all available devices.
    if args.gpu is not None:
        torch.cuda.set_device(args.gpu)
        model.cuda(args.gpu)
        # When using a single GPU per process and per DistributedDataParallel, we need to divide the batch size ourselves based on the total number of GPUs we have
        args.batch_size = int(args.batch_size / ngpus_per_node)
        args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    else:
        model.cuda()
        # DistributedDataParallel will divide and allocate batch_size to all available GPUs if device_ids are not set
        model = torch.nn.parallel.DistributedDataParallel(model)
elif args.gpu is not None:
    torch.cuda.set_device(args.gpu)
    model = model.cuda(args.gpu)
else:
    # DataParallel will divide and allocate batch_size to all available GPUs
    if args.arch.startswith('alexnet') or args.arch.startswith('vgg'):
        model.features = torch.nn.DataParallel(model.features)
        model.cuda()
    else:
        model = torch.nn.DataParallel(model).cuda()

This code handles the logic of whether to use CPU or GPU, and if GPU is being used, whether to use DP or DDP. As we can see, the DistributedDataParallel or DataParallel functions are used here to parallelize the model. After parallelization, we need to create a distributed data sampler. The specific code is as follows.

if args.distributed:
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
else:
    train_sampler = None

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
    num_workers=args.workers, pin_memory=True, sampler=train_sampler)

It is important to note that when creating the DataLoader, if the sampler parameter is not None, the shuffle parameter should not be set.

Finally, we need to start a process for each GPU on each machine node. PyTorch provides the torch.multiprocessing.spawn function to launch distributed processes on a node. The specific code is as follows.

ngpus_per_node = torch.cuda.device_count()
if args.multiprocessing_distributed:
    # Since we have ngpus_per_node processes per node, the total world_size needs to be adjusted accordingly
    args.world_size = ngpus_per_node * args.world_size
    # Use torch.multiprocessing.spawn to launch distributed processes: the main_worker process function
    mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
else:
    # Simply call main_worker function
    main_worker(args.gpu, ngpus_per_node, args)

Let’s summarize the key points using the code above. As we mentioned before, the main_worker function is the operation that needs to be executed in each process. ngpus_per_node is the number of GPUs per node, which is the same for each node. If there are multiple processes, ngpus_per_node * args.world_size represents the total number of GPUs across all nodes, which is the total number of processes. Generally, process 0 is the main process. For example, we save the model or print log information in the main process.

When the number of nodes is 1, it’s actually single machine multiple GPUs. Therefore, DDP supports both multi-node multi-GPU and single-node multi-GPU.

The main_worker function is called as follows.

main_worker(args.gpu, ngpus_per_node, args)

Here, args.gpu represents the ID of the GPU being used. After calling mp.spawn, a process is started for each GPU on each node, and each process runs main_worker(i, ngpus_per_node, args), where i is from 0 to ngpus_per_node-1. The code for saving the model is as follows.

if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank % ngpus_per_node == 0):
    save_checkpoint({
        'epoch': epoch + 1,
        'arch': args.arch,
        'state_dict': model.state_dict(),
        'best_acc1': best_acc1,
        'optimizer' : optimizer.state_dict(),
    }, is_best)

It is important to note that when DDP is used, it means that multiple processes are running. If we directly save the model, each process will execute the save operation. Therefore, only the main process with one GPU needs to save the model.

Alright, with this, we have covered the key aspects of distributed training in this example.

Summary #

Congratulations on making it this far. In this lesson, we have completed our study on distributed training, and now let’s summarize what we have learned.

Today, we not only learned why we should use distributed training and the principles behind it, but also worked on a hands-on project on distributed training.

In distributed training, there are mainly two modes: DP and DDP. DP is not a complete form of distributed computing, as it only distributes the computation to multiple GPUs, but still relies on “one card has trouble, all cards suffer”. Therefore, DP can have issues such as load imbalance and low efficiency. On the other hand, DDP can solve the aforementioned issues of DP and can be used for both single-machine multi-GPU and multi-machine multi-GPU scenarios. Therefore, it is a better solution for distributed training.

You can consider the examples we discussed today as a template for distributed training. It includes the complete usage process of DP and DDP, as well as how to save models when using DDP. However, there are more details in the code of this example, so it is recommended that you spend some spare time after the lesson to go through the code in depth, search for additional resources, and practice and think more in order to consolidate what you have learned today.

Practice for Each Lesson #

In the torch.distributed.init_process_group(backend="nccl") function, what are the optional backends for the backend parameter, and what are their respective differences?

I recommend you to carefully study today’s distributed training demo. Feel free to record your learning insights or questions in the comment section. I’ll be waiting for you there.