Source code for horovod.mxnet

# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from horovod.common.util import check_extension, split_list

check_extension('horovod.mxnet', 'HOROVOD_WITH_MXNET',
                __file__, 'mpi_lib')

from horovod.mxnet.compression import Compression
from horovod.mxnet.functions import allgather_object, broadcast_object
from horovod.mxnet.mpi_ops import allgather, grouped_allgather
from horovod.mxnet.mpi_ops import allreduce, allreduce_, grouped_allreduce, grouped_allreduce_
from horovod.mxnet.mpi_ops import alltoall
from horovod.mxnet.mpi_ops import broadcast, broadcast_
from horovod.mxnet.mpi_ops import reducescatter, grouped_reducescatter
from horovod.mxnet.mpi_ops import init, shutdown
from horovod.mxnet.mpi_ops import is_initialized, start_timeline, stop_timeline
from horovod.mxnet.mpi_ops import size, local_size, cross_size, rank, local_rank, cross_rank
from horovod.mxnet.mpi_ops import mpi_threads_supported, mpi_enabled, mpi_built
from horovod.mxnet.mpi_ops import gloo_enabled, gloo_built
from horovod.mxnet.mpi_ops import nccl_built, ddl_built, ccl_built, cuda_built, rocm_built
from horovod.mxnet.mpi_ops import ProcessSet, global_process_set, add_process_set, remove_process_set
from horovod.mxnet.mpi_ops import Average, Sum, Adasum, Min, Max, Product

import mxnet as mx
from collections import OrderedDict, defaultdict
import types
import warnings


# This is where Horovod's DistributedOptimizer wrapper for MXNet goes
class DistributedOptimizer(mx.optimizer.Optimizer):
    def __init__(self, optimizer, gradient_predivide_factor=1.0, num_groups=0, process_set=global_process_set):
        if gradient_predivide_factor != 1.0 and rocm_built():
            raise ValueError('gradient_predivide_factor not supported yet with ROCm')

        self._optimizer = optimizer
        self._gradient_predivide_factor = gradient_predivide_factor
        self._average_in_framework=False
        # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average.
        self._postscale_factor = self._gradient_predivide_factor
        if rocm_built() or nccl_built() < 21000:
          # Perform average in framework via rescale_grad for ROCM or older NCCL versions
          # without average support
          self._optimizer.rescale_grad *= (gradient_predivide_factor / process_set.size())
          self._postscale_factor = 1.0
          self._average_in_framework=True
        self._num_groups = num_groups
        self._process_set = process_set

    def __getattr__(self, item):
        return getattr(self._optimizer, item)

    def create_state(self, index, weight):
        return self._optimizer.create_state(index, weight)

    def create_state_multi_precision(self, index, weight):
        return self._optimizer.create_state_multi_precision(index, weight)

    def _do_allreduce(self, index, grad):
        if self._process_set.size() == 1: return

        if isinstance(index, (tuple, list)):
            if (self._num_groups > 0):
                grad_split = split_list(grad, self._num_groups)
                index_split = split_list(index, self._num_groups)

                for i, (grads, indices) in enumerate(zip(grad_split, index_split)):
                    grouped_allreduce_(tensors=grads, average=not self._average_in_framework, name="{}:{}".format(indices[0], indices[-1]), priority=-i,
                                       prescale_factor=1.0 / self._gradient_predivide_factor,
                                       postscale_factor=self._postscale_factor,
                                       process_set=self._process_set)
            else:
              for i in range(len(index)):
                  allreduce_(grad[i], average=not self._average_in_framework,
                             name=str(index[i]), priority=-i,
                             prescale_factor=1.0 / self._gradient_predivide_factor,
                             postscale_factor=self._postscale_factor,
                             process_set=self._process_set)
        else:
            allreduce_(grad, average=not self._average_in_framework, name=str(index),
                       prescale_factor=1.0 / self._gradient_predivide_factor,
                       postscale_factor=self._postscale_factor,
                       process_set=self._process_set)

    def update(self, index, weight, grad, state):
        if self._process_set.included():
            self._do_allreduce(index, grad)
        self._optimizer.update(index, weight, grad, state)

    def update_multi_precision(self, index, weight, grad, state):
        if self._process_set.included():
            self._do_allreduce(index, grad)
        self._optimizer.update_multi_precision(index, weight, grad, state)

    def set_learning_rate(self, lr):
        self._optimizer.set_learning_rate(lr)

    def set_lr_mult(self, args_lr_mult):
        self._optimizer.set_lr_mult(args_lr_mult)

    def set_wd_mult(self, args_wd_mult):
        self._optimizer.set_wd_mult(args_wd_mult)


# DistributedTrainer, a subclass of MXNet gluon.Trainer.
# There are two differences between DistributedTrainer and Trainer:
# 1. DistributedTrainer calculates gradients using Horovod allreduce
#    API while Trainer does it using kvstore push/pull APIs;
# 2. DistributedTrainer performs allreduce(summation) and average
#    while Trainer only performs allreduce(summation).
class DistributedTrainer(mx.gluon.Trainer):
    """The distributed trainer for data parallel training.

    Arguments:
        params: dict of parameters to train
        optimizer: mx.optim.Optimizer. the choice of optimizer
        optimizer_params: hyper-parameter of the chosen optimizer
        compression: Compression algorithm used during allreduce to reduce the amount
                     of data sent during the each parameter update step.  Defaults to
                     not using compression.
        gradient_predivide_factor: gradient_predivide_factor splits the averaging
              before and after the sum. Gradients are scaled by
              1.0 / gradient_predivide_factor before the sum and
              gradient_predivide_factor / size after the sum.
        prefix: the prefix of the parameters this trainer manages.
              If multiple trainers are used in the same program,
              they must be specified by different prefixes to avoid tensor name collision.
    """
    def __init__(self, params, optimizer, optimizer_params=None,
                 compression=Compression.none,
                 gradient_predivide_factor=1.0, prefix=None,
                 num_groups=0, process_set=global_process_set):
        self._compression = compression
        self._process_set = process_set

        if gradient_predivide_factor != 1.0 and rocm_built():
            raise ValueError('gradient_predivide_factor not supported yet with ROCm')
        if isinstance(optimizer, DistributedOptimizer):
            optimizer = optimizer._optimizer
            warnings.warn("DistributedTrainer does not take DistributedOptimizer "
                          "as its optimizer. We have unwrapped it for you.")

        # To ensure consistent parameter ordering across workers, sort params before
        # passing to base Trainer constructor. This logic is consistent with trainer.py
        # since v1.6 but we do it here for backwards compatability
        if isinstance(params, dict):
            params = OrderedDict(params)
        elif isinstance(params, (list, tuple)):
            params = sorted(params)

        super(DistributedTrainer, self).__init__(
            params, optimizer, optimizer_params=optimizer_params, kvstore=None)

        self._gradient_predivide_factor = gradient_predivide_factor
        self._average_in_framework=False
        # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average.
        self._postscale_factor = self._gradient_predivide_factor
        if rocm_built() or nccl_built() < 21000:
          # Perform average in framework via rescale_grad for ROCM or older NCCL versions
          # without average support
          self._scale *= (gradient_predivide_factor / process_set.size())
          self._postscale_factor = 1.0
          self._average_in_framework=True
        assert prefix is None or isinstance(prefix, str)
        self._prefix = prefix if prefix else ""
        self._num_groups = num_groups

    def _allreduce_grads(self):
        if self._process_set.size() == 1: return
        if not self._process_set.included(): return

        if (self._num_groups > 0):
            grads = []
            names = []
            tensors_compressed = []
            ctxs = []

            for i, param in enumerate(self._params):
                if param.grad_req != 'null':
                    tensor_compressed, ctx = self._compression.compress(param.list_grad()[0])
                    grads.append(tensor_compressed)
                    tensors_compressed.append(tensor_compressed)
                    ctxs.append(ctx)
                    names.append(self._prefix + str(i))

            grads_split = split_list(grads, self._num_groups)
            names_split = split_list(names, self._num_groups)

            for i, (group_grads, group_names) in enumerate(zip(grads_split, names_split)):
                # For better performance, enqueue groups in separate grouped_allreduce calls by dtype.
                entries_by_dtype = defaultdict(list)
                for grad, name in zip(group_grads, group_names):
                    entries_by_dtype[grad.dtype].append((grad, name))

                for entries in entries_by_dtype.values():
                    grads, names = zip(*entries)
                    grouped_allreduce_(tensors=grads, average=not self._average_in_framework,
                                       name="{}:{}".format(names[0], names[-1]), priority=-i,
                                       prescale_factor=1.0 / self._gradient_predivide_factor,
                                       postscale_factor=self._postscale_factor,
                                       process_set=self._process_set)

            if self._compression != Compression.none:
                for i, param in enumerate(self._params):
                    if param.grad_req != 'null':
                        param.list_grad()[0][:] = self._compression.decompress(tensors_compressed.pop(0), ctxs.pop(0))
        else:
            # In MXNet 2.0, param.name is no longer unique.
            # Meanwhile, since horovod requires Python 3.6, there is no need to sort
            # self._params as enumerating a python dict is always deterministic.
            for i, param in enumerate(self._params):
                if param.grad_req != 'null':
                    tensor_compressed, ctx = self._compression.compress(param.list_grad()[0])
                    allreduce_(tensor_compressed, average=not self._average_in_framework,
                               name=self._prefix + str(i), priority=-i,
                               prescale_factor=1.0 / self._gradient_predivide_factor,
                               postscale_factor=self._postscale_factor,
                               process_set=self._process_set)

                    if self._compression != Compression.none:
                        param.list_grad()[0][:] = self._compression.decompress(tensor_compressed, ctx)

# Wrapper to inject Horovod broadcast after parameter initialization
def _append_broadcast_init(param, root_rank, name):
    init_impl = getattr(param, '_init_impl')
    def wrapped_init_impl(self, *args, **kwargs):
        init_impl(*args, **kwargs)
        broadcast_(self.data(), root_rank=root_rank, name=name)
    return wrapped_init_impl


[docs]def broadcast_parameters(params, root_rank=0, prefix=None): """Broadcasts the parameters from root rank to all other processes. Typical usage is to broadcast the `Module.get_params()` or the `Block.collect_params()`. Arguments: params: One of the following: - dict of parameters to broadcast - ParameterDict to broadcast root_rank: The rank of the process from which parameters will be broadcasted to all other processes. prefix: The prefix of the parameters to broadcast. If multiple `broadcast_parameters` are called in the same program, they must be specified by different prefixes to avoid tensor name collision. """ if size() == 1: return tensors = [] names = [] assert prefix is None or isinstance(prefix, str) prefix = prefix if prefix else "" try: from mxnet.gluon.parameter import ParameterDict valid_types = (dict, ParameterDict) except ImportError: valid_types = (dict,) if isinstance(params, valid_types): for name, p in sorted(params.items()): try: if isinstance(p, mx.gluon.parameter.Parameter): tensors.append(p.data()) else: tensors.append(p) names.append(prefix + str(name)) except mx.gluon.parameter.DeferredInitializationError: # Inject wrapper method with post-initialization broadcast to # handle parameters with deferred initialization # we use the key of params instead of param.name, since # param.name is no longer unique in MXNet 2.0 new_init = _append_broadcast_init(p, root_rank, prefix + str(name)) p._init_impl = types.MethodType(new_init, p) else: raise ValueError('invalid params of type: %s' % type(params)) # Run broadcasts. for tensor, name in zip(tensors, names): broadcast_(tensor, root_rank, name=name)