Source code for horovod.tensorflow.gradient_aggregation

import os
import tensorflow as tf
from packaging import version
from horovod.tensorflow.mpi_ops import size_op
from horovod.tensorflow.mpi_ops import global_process_set


_IS_TF2 = version.parse(tf.__version__) >= version.parse('2.0.0')


def apply_op_to_not_none_tensors(tensor_op, tensors, *args):
    return [
        tensor_op(
            tensor,
            *args
        ) if tensor is not None else tensor for tensor in tensors]


def get_not_none_from_list(tensor_list):
    return [x for x in tensor_list if x is not None]


[docs]class LocalGradientAggregationHelper: """ LocalGradientAggregationHelper aggregates gradient updates locally, and communicates the updates across machines only once every backward_passes_per_step. Only supports graph mode execution. """ _OPTIMIZER_TYPE_KERAS = "optimizer_type_keras" _OPTIMIZER_TYPE_LEGACY = "optimizer_type_legacy" def __init__( self, backward_passes_per_step, allreduce_func, sparse_as_dense, average_aggregated_gradients, rank, optimizer_type, process_set=global_process_set, scale_local_gradients=True, name=""): self._allreduce_grads = allreduce_func self.name = name # backward_passes_per_step controls how often gradient updates are # synchronized. self.backward_passes_per_step = backward_passes_per_step if self.backward_passes_per_step <= 0: raise ValueError("backward_passes_per_step must be > 0") # average_aggregated_gradients controls whether gradient updates that are # aggregated, should be divided by `backward_passes_per_step`. self.average_aggregated_gradients = average_aggregated_gradients # This is going to be [N] data structure holding the aggregated gradient updates # N is the number of parameters. self.locally_aggregated_grads = [] # Used to know when to allreduce and apply gradients. We allreduce when `self.counter` # is equal to `self.backward_passes_per_step`. We apply gradients when `self.counter` is # equal to 0. self.counter = None self.sparse_as_dense = sparse_as_dense self.rank = rank self.optimizer_type = optimizer_type # Contains the mapping of indexes of grad updates that are not None to their index in # locally_aggregated_grads which only contains not None gradients. When performing # gradient aggregation we have to remove them from the list of grads prior to passing # the list into a tf.cond(). self.not_none_indexes = {} self.num_none_grad_updates = 0 self.process_set = process_set self.scale_local_gradients = scale_local_gradients self._local_vars = set()
[docs] def register_local_var(self, var): """Registers a source/variable as worker local. Horovod will not perform any global operations on gradients corresponding to these sources and will instead return the local gradient.""" if _IS_TF2: self._local_vars.add(var.ref()) else: self._local_vars.add(var)
def _maybe_convert_grad(self, grad): # Handle IndexedSlices. if isinstance(grad, tf.IndexedSlices): if self.sparse_as_dense: return tf.convert_to_tensor(grad) else: raise ValueError( "IndexedSlices are not supported when " "`backward_passes_per_step` > 1 and " "`sparse_as_dense` is False." ) return grad def _init_aggregation_vars(self, grads): """ Initializes the counter that is used when to communicate and aggregate gradients and the tensorflow variables that store the locally aggregated gradients. """ variable_scope_name = "aggregation_variables_" + self.name + str(self.rank) with tf.compat.v1.variable_scope(variable_scope_name, reuse=tf.compat.v1.AUTO_REUSE): self.counter = tf.compat.v1.get_variable( "aggregation_counter", shape=(), dtype=tf.int32, trainable=False, initializer=tf.compat.v1.zeros_initializer(), collections=[tf.compat.v1.GraphKeys.LOCAL_VARIABLES], ) for idx, grad in enumerate(grads): grad = self._maybe_convert_grad(grad) # Handle grads that are None. if grad is None: self.num_none_grad_updates += 1 continue self.not_none_indexes[idx] = len(self.locally_aggregated_grads) # Create shadow variable. grad_aggregation_variable_name = str(idx) zero_grad = tf.zeros(shape=grad.get_shape().as_list(), dtype=grad.dtype) grad_aggregation_variable = tf.compat.v1.get_variable( grad_aggregation_variable_name, trainable=False, initializer=zero_grad, collections=[ tf.compat.v1.GraphKeys.LOCAL_VARIABLES, "aggregating_collection"], ) self.locally_aggregated_grads.append(grad_aggregation_variable) assert len(self.locally_aggregated_grads) + \ self.num_none_grad_updates == len(grads) # We expect to get a `sess` when we need to manually do a `sess.run(...)` # for the variables to be initialized. This is the `tf.keras` # optimizers. if self.optimizer_type == self._OPTIMIZER_TYPE_KERAS: session = tf.compat.v1.keras.backend.get_session(op_input_list=()) vars_init_op = tf.compat.v1.variables_initializer( [self.counter, *get_not_none_from_list(self.locally_aggregated_grads)] ) session.run(vars_init_op) def _clear_grads(self): clear_ops_list = [] for idx, grad_aggregator in enumerate(self.locally_aggregated_grads): clear_op = grad_aggregator.assign(grad_aggregator.initial_value) clear_ops_list.append(clear_op) return tf.group(*clear_ops_list) def _aggregate_grads(self, grads): aggregation_ops_list = [] grads = get_not_none_from_list(grads) assert len(grads) == len(self.locally_aggregated_grads) # Apply new gradient updates to the local copy. for idx, grad in enumerate(grads): if self.sparse_as_dense and isinstance(grad, tf.IndexedSlices): grad = tf.convert_to_tensor(grad) updated_grad_aggregator = self.locally_aggregated_grads[idx].assign_add( grad) aggregation_ops_list.append(updated_grad_aggregator) return aggregation_ops_list def _allreduce_grads_helper(self, vars): def __filtered_reduce_grads(grads, vars): rv = [] rg = [] if _IS_TF2: v2g = {var.ref(): grad for var, grad in zip(vars, grads)} for var, grad in zip(vars, grads): if var.ref() not in self._local_vars: rv.append(var) rg.append(grad) else: v2g = {var: grad for var, grad in zip(vars, grads)} for var, grad in zip(vars, grads): if var not in self._local_vars: rv.append(var) rg.append(grad) rg = self._allreduce_grads(rg, rv) horovod_size = size_op(process_set_id=self.process_set.process_set_id) if int(os.environ.get("HOROVOD_ELASTIC", 0)) else self.process_set.size() if _IS_TF2: for rv, rg in zip(rv, rg): v2g[rv.ref()] = rg if self.scale_local_gradients and len(self._local_vars): # Scale local gradients by a size factor. See pull/3695 and discussions/3705 for context. for v_ref in v2g: if v_ref in self._local_vars and v2g[v_ref] is not None: v2g[v_ref] /= float(horovod_size) return [v2g[rv.ref()] for rv in vars] else: for rv, rg in zip(rv, rg): v2g[rv] = rg if self.scale_local_gradients and len(self._local_vars): # Scale local gradients by a size factor. See pull/3695 and discussions/3705 for context. for v in v2g: if v in self._local_vars and v2g[v] is not None: v2g[v] /= float(horovod_size) return [v2g[rv] for rv in vars] # Read in latest variables values. aggregated_grads = [] aggregation_read_ops_list = [] for idx, locally_aggregated_grad in enumerate( self.locally_aggregated_grads): aggregated_grads.append(locally_aggregated_grad.read_value()) aggregation_read_ops_list.append(aggregated_grads[idx]) aggregation_read_ops = tf.group(*aggregation_read_ops_list) with tf.control_dependencies([aggregation_read_ops]): averaged_gradients = __filtered_reduce_grads(aggregated_grads, vars) # Reset counter. with tf.control_dependencies([g.op for g in averaged_gradients if g is not None]): reset_op = self.counter.assign( tf.constant(0), use_locking=True) # Divide by backward_passes_per_step if # average_aggregated_gradients is True. with tf.control_dependencies([reset_op]): gradient_divisor = self.backward_passes_per_step if \ self.average_aggregated_gradients else 1 averaged_gradients = apply_op_to_not_none_tensors( tf.divide, averaged_gradients, gradient_divisor, ) return averaged_gradients
[docs] def compute_gradients(self, grads, vars): """ Applies the new gradient updates the locally aggregated gradients, and performs cross-machine communication every backward_passes_per_step times it is called. """ self._init_aggregation_vars(grads) # Clear the locally aggregated gradients when the counter is at zero. clear_op = tf.cond( pred=tf.equal(self.counter, 0), true_fn=lambda: self._clear_grads(), false_fn=tf.no_op ) # Add new gradients to the locally aggregated gradients. with tf.control_dependencies([clear_op]): aggregation_ops_list = self._aggregate_grads(grads) # Increment the counter once new gradients have been applied. aggregation_ops = tf.group(*aggregation_ops_list) with tf.control_dependencies([aggregation_ops]): update_counter = self.counter.assign_add(tf.constant(1)) with tf.control_dependencies([update_counter]): grads = get_not_none_from_list(grads) assert len(grads) == len(self.locally_aggregated_grads) # Allreduce locally aggregated gradients when the counter is equivalent to # `backward_passes_per_step`. This the condition is true, it also resets # the counter back to 0. allreduced_grads = tf.cond( tf.equal(self.counter, self.backward_passes_per_step), lambda: self._allreduce_grads_helper(vars), lambda: [self._maybe_convert_grad(g) for g in grads] ) # Handle case where there is only one variable. if not isinstance(allreduced_grads, (list, tuple)): allreduced_grads = (allreduced_grads,) assert len(allreduced_grads) == len(self.locally_aggregated_grads) # Insert gradients that are None back in. allreduced_grads = [ allreduced_grads[self.not_none_indexes[idx]] if idx in self.not_none_indexes else None for idx in range(len(self.locally_aggregated_grads) + self.num_none_grad_updates) ] assert len(allreduced_grads) == len( self.locally_aggregated_grads) + self.num_none_grad_updates # If gradients have not been allreduced this batch, we return the gradients # that were submitted as the updates (the input). return allreduced_grads
[docs] def apply_gradients(self, apply_grads_closure, optimizer, *args, **kwargs): """ Apply updates every backward_passes_per_step, which lines up with the batches on which we communicated the locally aggregated gradients. """ flattended_args0 = [item for tup in args[0] for item in tup] # If optimizer tracks iterations, we increment it on steps where we # are not going to call `apply_gradients()`. def increment_optimizer_iteration(): # (kvignesh1420): Since all `tf.OptimizerV2` instances have the `iterations` # property for modifying the underlying `optimizer._iterations`, it is safe to use # the property instead of the private variable. For instance, the keras # `LossScaleOptimizer` inherits `tf.Optimizer` and exposes the cleaner `iterations` # property instead of the unsafe `_iterations`. if hasattr(optimizer, "iterations") and optimizer.iterations is not None: return optimizer.iterations.assign_add(1).op return tf.no_op() with tf.control_dependencies([tf.group(*get_not_none_from_list(flattended_args0))]): train_op = tf.cond( pred=tf.equal(self.counter, 0), true_fn=apply_grads_closure, false_fn=increment_optimizer_iteration, ) # Since we skip applying updates when the counter is not at zero we # still want to increment the global step if it is being tracked # (e.g., Tensorflow Estimators). def increment_global_step_counter(): global_step_counter = kwargs.get('global_step') if global_step_counter is None: return tf.no_op() return global_step_counter.assign_add( tf.constant(1, dtype=tf.int64), use_locking=True, read_value=False ) with tf.control_dependencies([train_op]): # Increment global step on iterations where we don't call `apply_gradients()`. return tf.cond( pred=tf.equal(self.counter, 0), true_fn=tf.no_op, false_fn=increment_global_step_counter, )