API

horovod.tensorflow

class horovod.tensorflow.BroadcastGlobalVariablesHook(root_rank, device='')[source]

SessionRunHook that will broadcast all global variables from root rank to all other processes during initialization.

This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.

NOTE: deprecated in TensorFlow 2.0.

horovod.tensorflow.DistributedGradientTape(gradtape, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140603435024528'>, gradient_predivide_factor=1.0, num_groups=0, groups=None)[source]

A tape that wraps another tf.GradientTape, using an allreduce to combine gradient values before applying gradients to model weights.

Parameters:
  • gradtape – GradientTape to use for computing gradients and applying updates.
  • device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
  • device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
  • compression – Compression algorithm used during allreduce to reduce the amount of data sent during each parameter update step. Defaults to not using compression.
  • sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
  • op – The reduction operation to use when combining gradients across different ranks.
  • gradient_predivide_factor – If op == Average, gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
  • num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
  • groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of tf.Variable. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of tf.Variable. Variables in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
horovod.tensorflow.DistributedOptimizer(optimizer, name=None, use_locking=False, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, backward_passes_per_step=1, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140603435024528'>, gradient_predivide_factor=1.0, average_aggregated_gradients=False, num_groups=0, groups=None)[source]

Construct a new DistributedOptimizer, which uses another optimizer under the hood for computing single-process gradient values and applying gradient updates after the gradient values have been combined across all the Horovod ranks.

Parameters:
  • optimizer – Optimizer to use for computing gradients and applying updates.
  • name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
  • use_locking – Whether to use locking when updating variables. See Optimizer.__init__ for more info.
  • device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
  • device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
  • compression – Compression algorithm used during allreduce to reduce the amount of data sent during each parameter update step. Defaults to not using compression.
  • sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
  • backward_passes_per_step – Number of backward passes to perform before calling hvd.allreduce. This allows accumulating updates over multiple mini-batches before reducing and applying them.
  • op – The reduction operation to use when combining gradients across different ranks.
  • gradient_predivide_factor – If op == Average, gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
  • average_aggregated_gradients – Whether to average the aggregated gradients that have been accumulated over multiple mini-batches. If true divides gradients updates by backward_passes_per_step. Only applicable for backward_passes_per_step > 1.
  • num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
  • groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of tf.Variable. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of tf.Variable. Variables in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
horovod.tensorflow.allreduce(tensor, average=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, name=None)[source]

Perform an allreduce on a tf.Tensor or tf.IndexedSlices.

This function performs a bandwidth-optimal ring allreduce on the input tensor. If the input is an tf.IndexedSlices, the function instead does an allgather on the values and the indices, effectively doing an allreduce on the represented tensor.

Parameters:
  • tensor – tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce. The shape of the input must be identical across all ranks.
  • average

    Warning

    Deprecated since version 0.19.0.

    Use op instead. Will be removed in v0.21.0.

  • device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
  • device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
  • compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
  • op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
  • prescale_factor – Multiplicative factor to scale tensor before allreduce.
  • postscale_factor – Multiplicative factor to scale tensor after allreduce.
  • name – A name of the allreduce operation
Returns:

A tensor of the same shape and type as tensor, summed across all processes.

horovod.tensorflow.broadcast_global_variables(root_rank)[source]

Broadcasts all global variables from root rank to all other processes.

NOTE: deprecated in TensorFlow 2.0.

Parameters:root_rank – rank of the process from which global variables will be broadcasted to all other processes.
class horovod.tensorflow.elastic.TensorFlowKerasState(model, optimizer=None, backend=None, **kwargs)[source]

State representation of a TensorFlow Keras model and optimizer.

Supports TensorFlow 2 models and optimizers, as well as keras and tf.keras.

Parameters:
  • model – TensorFlow Keras model.
  • optimizer – Optional optimizer, can be compiled into model instead.
  • backend – For TensorFlow v1, backend used by Keras for obtaining the session.
  • kwargs – Additional properties to sync, will be exposed as attributes of the object.
restore()[source]

Restores the last committed state, undoing any uncommitted modifications.

save()[source]

Saves state to host memory.

sync()[source]

Synchronize state across workers.

class horovod.tensorflow.elastic.TensorFlowState(variables=None, session=None, **kwargs)[source]

State representation of a list of TensorFlow variables.

Supports both TensorFlow v1 and v2. For TensorFlow v2, can only be used when eager execution is enabled.

Parameters:
  • variables – List of tf.Variable objects to track (default: tf.global_variables()).
  • session – For TensorFlow v1, session used to materialize variables (default: ops.get_default_session()).
  • kwargs – Additional properties to sync, will be exposed as attributes of the object.
restore()[source]

Restores the last committed state, undoing any uncommitted modifications.

save()[source]

Saves state to host memory.

sync()[source]

Synchronize state across workers.

horovod.tensorflow.elastic.run(func)[source]

Decorator used to run the elastic training process.

The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.

When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.

It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.

Parameters:func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.

horovod.tensorflow.keras

horovod.tensorflow.keras.DistributedOptimizer(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140603435024528'>, backward_passes_per_step=1, average_aggregated_gradients=False)[source]

An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to average gradient values before applying gradients to model weights.

Parameters:
  • optimizer – Optimizer to use for computing gradients and applying updates.
  • name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
  • device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
  • device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
  • compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
  • sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
  • gradient_predivide_factor – gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
  • op – The reduction operation to use when combining gradients across different ranks. Defaults to Average.
  • backward_passes_per_step – Number of backward passes to perform before calling hvd.allreduce. This allows accumulating updates over multiple mini-batches before reducing and applying them.
  • average_aggregated_gradients – Whether to average the aggregated gradients that have been accumulated over multiple mini-batches. If true divides gradient updates by backward_passes_per_step. Only applicable for backward_passes_per_step > 1.
horovod.tensorflow.keras.allgather(value, name=None)[source]

Perform an allgather on a tensor-compatible value.

The concatenation is done on the first dimension, so the input values on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.

Parameters:
  • value – A tensor-compatible value to gather.
  • name – Optional name prefix for the constants created by this operation.
horovod.tensorflow.keras.allreduce(value, name=None, average=None, prescale_factor=1.0, postscale_factor=1.0, op=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[source]

Perform an allreduce on a tensor-compatible value.

Parameters:
  • value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
  • name – Optional name for the constants created by this operation.
  • average

    Warning

    Deprecated since version 0.19.0.

    Use op instead. Will be removed in v0.21.0.

  • prescale_factor – Multiplicative factor to scale tensor before allreduce.
  • postscale_factor – Multiplicative factor to scale tensor after allreduce.
  • op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
  • compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
horovod.tensorflow.keras.broadcast(value, root_rank, name=None)[source]

Perform a broadcast on a tensor-compatible value.

Parameters:
  • value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
  • root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
  • name – Optional name for the constants created by this operation.
horovod.tensorflow.keras.broadcast_global_variables(root_rank)[source]

Broadcasts all global variables from root rank to all other processes.

Parameters:root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
horovod.tensorflow.keras.load_model(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[source]

Loads a saved Keras model with a Horovod DistributedOptimizer.

The DistributedOptimizer will wrap the underlying optimizer used to train the saved model, so that the optimizer state (params and weights) will be picked up for retraining.

By default, all optimizers in the module keras.optimizers will be loaded and wrapped without needing to specify any custom_optimizers or custom_objects.

Parameters:
  • filepath – One of the following: - string, path to the saved model, or - h5py.File object from which to load the model
  • custom_optimizers – Optional list of Optimizer subclasses to support during loading.
  • custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization.
  • compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
Returns:

A Keras model instance.

Raises:
  • ImportError – If h5py is not available.
  • ValueError – In case of an invalid savefile.
class horovod.tensorflow.keras.callbacks.BroadcastGlobalVariablesCallback(root_rank, device='')[source]

Keras Callback that will broadcast all global variables from root rank to all other processes during initialization.

This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.

__init__(root_rank, device='')[source]

Construct a new BroadcastGlobalVariablesCallback that will broadcast all global variables from root rank to all other processes during initialization.

Parameters:
  • root_rank – Rank that will send data, other ranks will receive data.
  • device – Device to be used for broadcasting. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
class horovod.tensorflow.keras.callbacks.LearningRateScheduleCallback(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[source]

LearningRateScheduleCallback sets learning rate between epochs start_epoch and end_epoch to be initial_lr * multiplier. multiplier can be a constant or a function f(epoch) = lr’.

If multiplier is a function and staircase=True, learning rate adjustment will happen at the beginning of each epoch and the epoch passed to the multiplier function will be an integer.

If multiplier is a function and staircase=False, learning rate adjustment will happen at the beginning of each batch and the epoch passed to the multiplier function will be a floating number: epoch’ = epoch + batch / steps_per_epoch. This functionality is useful for smooth learning rate adjustment schedulers, such as LearningRateWarmupCallback.

initial_lr is the learning rate of the model optimizer at the start of the training.

__init__(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[source]

Construct a new LearningRateScheduleCallback.

Parameters:
  • initial_lr – Initial learning rate at the start of training.
  • multiplier – A constant multiplier or a function f(epoch) = lr’
  • start_epoch – The first epoch this adjustment will be applied to. Defaults to 0.
  • end_epoch – The epoch this adjustment will stop applying (exclusive end). Defaults to None.
  • staircase – Whether to adjust learning rate at the start of epoch (staircase=True) or at the start of every batch (staircase=False).
  • momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
  • steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
class horovod.tensorflow.keras.callbacks.LearningRateWarmupCallback(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[source]

Implements gradual learning rate warmup:

lr = initial_lr / hvd.size() —> lr = initial_lr

initial_lr is the learning rate of the model optimizer at the start of the training.

This technique was described in the paper “Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour”. See https://arxiv.org/pdf/1706.02677.pdf for details.

Math recap:

\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]
__init__(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[source]

Construct a new LearningRateWarmupCallback that will gradually warm up the learning rate.

Parameters:
  • initial_lr – Initial learning rate at the start of training.
  • warmup_epochs – The number of epochs of the warmup phase. Defaults to 5.
  • momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
  • steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
  • verbose – verbosity mode, 0 or 1.
class horovod.tensorflow.keras.callbacks.MetricAverageCallback(device='')[source]

Keras Callback that will average metrics across all processes at the end of the epoch. Useful in conjuction with ReduceLROnPlateau, TensorBoard and other metrics-based callbacks.

Note: This callback must be added to the callback list before the ReduceLROnPlateau, TensorBoard or other metrics-based callbacks.

__init__(device='')[source]

Construct a new MetricAverageCallback that will average metrics across all processes at the end of the epoch.

Parameters:device – Device to be used for allreduce. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
class horovod.tensorflow.keras.elastic.KerasState(model, optimizer=None, **kwargs)[source]

State representation of a tf.keras model and optimizer.

Parameters:
  • model – Keras model.
  • optimizer – Optional optimizer, can be compiled into model instead.
  • kwargs – Additional properties to sync, will be exposed as attributes of the object.
class horovod.tensorflow.keras.elastic.CommitStateCallback(state, batches_per_commit=1)[source]

Keras Callback that will commit the state object every batches_per_commit batches at the end of each batch.

class horovod.tensorflow.keras.elastic.UpdateBatchStateCallback(state)[source]

Keras Callback that will update the value of state.batch with the current batch number at the end of each batch. Batch will reset to 0 at the end of each epoch.

If steps_per_epoch is set, then this callback will also ensure that the number of steps in the first epoch following a reset is shortened by the number of batches already processed.

class horovod.tensorflow.keras.elastic.UpdateEpochStateCallback(state)[source]

Keras Callback that will update the value of state.epoch with the current epoch number at the end of each epoch.

horovod.tensorflow.keras.elastic.run(func)[source]

Decorator used to run the elastic training process.

The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.

When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.

It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.

Parameters:func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.

horovod.keras

horovod.keras.DistributedOptimizer(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140603435024528'>, num_groups=0, groups=None)[source]

An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to average gradient values before applying gradients to model weights.

Parameters:
  • optimizer – Optimizer to use for computing gradients and applying updates.
  • name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
  • device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
  • device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
  • compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
  • sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
  • gradient_predivide_factor – gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
  • op – The reduction operation to use when combining gradients across different ranks. Defaults to Average.
  • num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
  • groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of tf.Variable. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of tf.Variable. Variables in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
horovod.keras.allgather(value, name=None)[source]

Perform an allgather on a tensor-compatible value.

The concatenation is done on the first dimension, so the input values on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.

Parameters:
  • value – A tensor-compatible value to gather.
  • name – Optional name prefix for the constants created by this operation.
horovod.keras.allreduce(value, name=None, average=True, prescale_factor=1.0, postscale_factor=1.0)[source]

Perform an allreduce on a tensor-compatible value.

Parameters:
  • value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
  • name – Optional name for the constants created by this operation.
  • average – If True, computes the average over all ranks. Otherwise, computes the sum over all ranks.
  • prescale_factor – Multiplicative factor to scale tensor before allreduce.
  • postscale_factor – Multiplicative factor to scale tensor after allreduce.
horovod.keras.broadcast(value, root_rank, name=None)[source]

Perform a broadcast on a tensor-compatible value.

Parameters:
  • value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
  • root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
  • name – Optional name for the constants created by this operation.
horovod.keras.broadcast_global_variables(root_rank)[source]

Broadcasts all global variables from root rank to all other processes.

Parameters:root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
horovod.keras.load_model(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[source]

Loads a saved Keras model with a Horovod DistributedOptimizer.

The DistributedOptimizer will wrap the underlying optimizer used to train the saved model, so that the optimizer state (params and weights) will be picked up for retraining.

By default, all optimizers in the module keras.optimizers will be loaded and wrapped without needing to specify any custom_optimizers or custom_objects.

Parameters:
  • filepath – One of the following: - string, path to the saved model, or - h5py.File object from which to load the model
  • custom_optimizers – Optional list of Optimizer subclasses to support during loading.
  • custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization.
  • compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
Returns:

A Keras model instance.

Raises:
  • ImportError – If h5py is not available.
  • ValueError – In case of an invalid savefile.
class horovod.keras.callbacks.BroadcastGlobalVariablesCallback(root_rank, device='')[source]

Keras Callback that will broadcast all global variables from root rank to all other processes during initialization.

This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.

__init__(root_rank, device='')[source]

Construct a new BroadcastGlobalVariablesCallback that will broadcast all global variables from root rank to all other processes during initialization.

Parameters:
  • root_rank – Rank that will send data, other ranks will receive data.
  • device – Device to be used for broadcasting. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
class horovod.keras.callbacks.LearningRateScheduleCallback(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[source]

LearningRateScheduleCallback sets learning rate between epochs start_epoch and end_epoch to be initial_lr * multiplier. multiplier can be a constant or a function f(epoch) = lr’.

If multiplier is a function and staircase=True, learning rate adjustment will happen at the beginning of each epoch and the epoch passed to the multiplier function will be an integer.

If multiplier is a function and staircase=False, learning rate adjustment will happen at the beginning of each batch and the epoch passed to the multiplier function will be a floating number: epoch’ = epoch + batch / steps_per_epoch. This functionality is useful for smooth learning rate adjustment schedulers, such as LearningRateWarmupCallback.

initial_lr is the learning rate of the model optimizer at the start of the training.

__init__(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[source]

Construct a new LearningRateScheduleCallback.

Parameters:
  • initial_lr – Initial learning rate at the start of training.
  • multiplier – A constant multiplier or a function f(epoch) = lr’
  • start_epoch – The first epoch this adjustment will be applied to. Defaults to 0.
  • end_epoch – The epoch this adjustment will stop applying (exclusive end). Defaults to None.
  • staircase – Whether to adjust learning rate at the start of epoch (staircase=True) or at the start of every batch (staircase=False).
  • momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
  • steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
class horovod.keras.callbacks.LearningRateWarmupCallback(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[source]

Implements gradual learning rate warmup:

lr = initial_lr / hvd.size() —> lr = initial_lr

initial_lr is the learning rate of the model optimizer at the start of the training.

This technique was described in the paper “Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour”. See https://arxiv.org/pdf/1706.02677.pdf for details.

Math recap:

\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]
__init__(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[source]

Construct a new LearningRateWarmupCallback that will gradually warm up the learning rate.

Parameters:
  • initial_lr – Initial learning rate at the start of training.
  • warmup_epochs – The number of epochs of the warmup phase. Defaults to 5.
  • momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
  • steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
  • verbose – verbosity mode, 0 or 1.
class horovod.keras.callbacks.MetricAverageCallback(device='')[source]

Keras Callback that will average metrics across all processes at the end of the epoch. Useful in conjuction with ReduceLROnPlateau, TensorBoard and other metrics-based callbacks.

Note: This callback must be added to the callback list before the ReduceLROnPlateau, TensorBoard or other metrics-based callbacks.

__init__(device='')[source]

Construct a new MetricAverageCallback that will average metrics across all processes at the end of the epoch.

Parameters:device – Device to be used for allreduce. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
class horovod.keras.elastic.KerasState(model, optimizer=None, **kwargs)[source]

State representation of a keras model and optimizer.

Parameters:
  • model – Keras model.
  • optimizer – Optional optimizer, can be compiled into model instead.
  • kwargs – Additional properties to sync, will be exposed as attributes of the object.
class horovod.keras.elastic.CommitStateCallback(state, batches_per_commit=1)[source]

Keras Callback that will commit the state object every batches_per_commit batches at the end of each batch.

class horovod.keras.elastic.UpdateBatchStateCallback(state)[source]

Keras Callback that will update the value of state.batch with the current batch number at the end of each batch. Batch will reset to 0 at the end of each epoch.

If steps_per_epoch is set, then this callback will also ensure that the number of steps in the first epoch following a reset is shortened by the number of batches already processed.

class horovod.keras.elastic.UpdateEpochStateCallback(state)[source]

Keras Callback that will update the value of state.epoch with the current epoch number at the end of each epoch.

horovod.keras.elastic.run(func)[source]

Decorator used to run the elastic training process.

The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.

When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.

It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.

Parameters:func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.

horovod.torch

class horovod.torch.elastic.TorchState(model=None, optimizer=None, **kwargs)[source]

State representation of a PyTorch training process.

Multiple models and optimizers are supported by providing them as kwargs. During initialization, TorchState will assign attributes for every keyword argument, and handle its state synchronization.

Parameters:
  • model – Optional PyTorch model.
  • optimizer – Optional PyTorch optimizer.
  • kwargs – Attributes sync, will be exposed as attributes of the object. If a handler exists for the attribute type, it will be used to sync the object, otherwise it will be handled an ordinary Python object.
restore()[source]

Restores the last committed state, undoing any uncommitted modifications.

save()[source]

Saves state to host memory.

sync()[source]

Synchronize state across workers.

class horovod.torch.elastic.ElasticSampler(dataset, shuffle=True, seed=0)[source]

Sampler that partitions dataset across ranks and repartitions after reset events.

Works similar to DistributedSampler, but with an optional capability to record which dataset indices have been processed each batch. When tracked by a TorchState object, the sampler will automatically repartition the unprocessed indices among the new set of workers.

In order to use this object successfully it is recommended that the user:

  1. Include this object in the TorchState.
  2. Call record_batch or record_indices after processing a set of samples.
  3. Call set_epoch at the end of each epoch to clear the processed indices.
Parameters:
  • dataset – Dataset used for sampling (assumed to be of constant size).
  • shuffle – If True (default), shuffle the indices.
  • seed – Random seed used to shuffle the sampler when shuffle=True. This number should be identical across all ranks (default: 0).
get_indices(batch_idx, batch_size)[source]

Return list of indices at batch batch_idx with length batch_size.

record_batch(batch_idx, batch_size)[source]

Record indices at batch batch_idx with length batch_size as processed.

record_indices(indices)[source]

Record set indices as processed.

set_epoch(epoch)[source]

Sets the epoch for this sampler.

When shuffle=True, this ensures all replicas use a different random ordering for each epoch.

Will clear and reset the processed_indices for the next epoch. It is important that this is called at the end of the epoch (not the beginning) to ensure that partially completed epochs do not reprocess samples.

Parameters:epoch – Epoch number.
horovod.torch.elastic.run(func)[source]

Decorator used to run the elastic training process.

The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.

When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.

It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.

Parameters:func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.

horovod.mxnet

horovod.mxnet.broadcast_parameters(params, root_rank=0, prefix=None)[source]

Broadcasts the parameters from root rank to all other processes. Typical usage is to broadcast the Module.get_params() or the Block.collect_params().

Parameters:
  • params – One of the following: - dict of parameters to broadcast - ParameterDict to broadcast
  • root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
  • prefix – The prefix of the parameters to broadcast. If multiple broadcast_parameters are called in the same program, they must be specified by different prefixes to avoid tensor name collision.

horovod.spark

horovod.spark.keras

class horovod.spark.keras.KerasEstimator[source]

Bases: horovod.spark.common.estimator.HorovodEstimator, horovod.spark.keras.estimator.KerasEstimatorParamsReadable, horovod.spark.keras.estimator.KerasEstimatorParamsWritable

Spark Estimator for fitting Keras models to a DataFrame.

Supports standalone keras and tf.keras, and TensorFlow 1.X and 2.X.

Parameters:
  • num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
  • model – Keras model to train.
  • backend – Optional Backend object for running distributed training function. Defaults to SparkBackend with num_proc worker processes. Cannot be specified if num_proc is also provided.
  • store – Store object that abstracts reading and writing of intermediate data and run results.
  • custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during serialization/deserialization.
  • optimizer – Keras optimizer to be converted into a hvd.DistributedOptimizer for training.
  • loss – Keras loss or list of losses.
  • loss_weights – Optional list of float weight values to assign each loss.
  • sample_weight_col – Optional column indicating the weight of each sample.
  • gradient_compression – Gradient compression used by hvd.DistributedOptimizer.
  • metrics – Optional metrics to record.
  • feature_cols – Column names used as feature inputs to the model. Must be a list with each feature mapping to a sequential argument in the model’s forward() function.
  • label_cols – Column names used as labels. Must be a list with one label for each output of the model.
  • validation – Optional validation column name (string) where every row in the column is either 1/True or 0/False, or validation split (float) giving percent of data to be randomly selected for validation.
  • callbacks – Keras callbacks.
  • batch_size – Number of rows from the DataFrame per batch.
  • val_batch_size – Number of rows from the DataFrame per batch for validation, if not set, will use batch_size.
  • epochs – Number of epochs to train.
  • verbose – Verbosity level [0, 2] (default: 1).
  • shuffle_buffer_size – Optional size of in-memory shuffle buffer in rows. Allocating a larger buffer size increases randomness of shuffling at the cost of more host memory. Defaults to estimating with an assumption of 4GB of memory per host.
  • partitions_per_process – Number of Parquet partitions to assign per worker process from num_proc (default: 10).
  • run_id – Optional unique ID for this run for organization in the Store. Will be automatically assigned if not provided.
  • train_steps_per_epoch – Number of steps to train each epoch. Useful for testing that model trains successfully. Defaults to training the entire dataset each epoch.
  • validation_steps_per_epoch – Number of validation steps to perform each epoch.
  • transformation_fn – Optional function that takes a row as its parameter and returns a modified row that is then fed into the train or validation step. This transformation is applied after batching. See Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py) for more details. Note that this fucntion constructs another function which should perform the transformation.
  • train_reader_num_workers – This parameter specifies the number of parallel processes that read the training data from data store and apply data transformations to it. Increasing this number will generally increase the reading rate but will also increase the memory footprint. More processes are particularly useful if the bandwidth to the data store is not high enough, or users need to apply transformation such as decompression or data augmentation on raw data.
  • val_reader_num_workers – Similar to the train_reader_num_workers.
class horovod.spark.keras.KerasModel[source]

Bases: horovod.spark.common.estimator.HorovodModel, horovod.spark.keras.estimator.KerasEstimatorParamsReadable, horovod.spark.keras.estimator.KerasEstimatorParamsWritable

Spark Transformer wrapping a Keras model, used for making predictions on a DataFrame.

Retrieve the underlying Keras model by calling keras_model.getModel().

Parameters:
  • history – List of metrics, one entry per epoch during training.
  • model – Trained Keras model.
  • feature_columns – List of feature column names.
  • label_columns – List of label column names.
  • custom_objects – Keras custom objects.
  • run_id – ID of the run used to train the model.

horovod.spark.torch

class horovod.spark.torch.TorchEstimator[source]

Bases: horovod.spark.common.estimator.HorovodEstimator, horovod.spark.torch.estimator.TorchEstimatorParamsWritable, horovod.spark.torch.estimator.TorchEstimatorParamsReadable

Spark Estimator for fitting PyTorch models to a DataFrame.

Parameters:
  • num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
  • model – PyTorch model to train.
  • backend – Optional Backend object for running distributed training function. Defaults to SparkBackend with num_proc worker processes. Cannot be specified if num_proc is also provided.
  • store – Store object that abstracts reading and writing of intermediate data and run results.
  • optimizer – PyTorch optimizer to be converted into a hvd.DistributedOptimizer for training.
  • loss – PyTorch loss or list of losses.
  • loss_constructors – Optional functions that generate losses.
  • metrics – Optional metrics to record.
  • loss_weights – Optional list of float weight values to assign each loss.
  • sample_weight_col – Optional column indicating the weight of each sample.
  • gradient_compression – Gradient compression used by hvd.DistributedOptimizer.
  • feature_cols – Column names used as feature inputs to the model. Must be a list with each feature mapping to a sequential argument in the model’s forward() function.
  • input_shapes – List of shapes for each input tensor to the model.
  • validation – Optional validation column name (string) where every row in the column is either 1/True or 0/False, or validation split (float) giving percent of data to be randomly selected for validation.
  • label_cols – Column names used as labels. Must be a list with one label for each output of the model.
  • batch_size – Number of rows from the DataFrame per batch.
  • val_batch_size – Number of rows from the DataFrame per batch for validation, if not set, will use batch_size.
  • epochs – Number of epochs to train.
  • verbose – Verbosity level [0, 2] (default: 1).
  • shuffle_buffer_size – Optional size of in-memory shuffle buffer in rows. Allocating a larger buffer size increases randomness of shuffling at the cost of more host memory. Defaults to estimating with an assumption of 4GB of memory per host.
  • partitions_per_process – Number of Parquet partitions to assign per worker process from num_proc (default: 10).
  • run_id – Optional unique ID for this run for organization in the Store. Will be automatically assigned if not provided.
  • train_minibatch_fn – Optional custom function to execute within the training loop. Defaults to standard gradient descent process.
  • train_steps_per_epoch – Number of steps to train each epoch. Useful for testing that model trains successfully. Defaults to training the entire dataset each epoch.
  • validation_steps_per_epoch – Number of validation steps to perform each epoch.
  • transformation_fn – Optional function that takes a row as its parameter and returns a modified row that is then fed into the train or validation step. This transformation is applied after batching. See Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py) for more details. Note that this fucntion constructs another function which should perform the transformation.
  • train_reader_num_workers – This parameter specifies the number of parallel processes that read the training data from data store and apply data transformations to it. Increasing this number will generally increase the reading rate but will also increase the memory footprint. More processes are particularly useful if the bandwidth to the data store is not high enough, or users need to apply transformation such as decompression or data augmentation on raw data.
  • val_reader_num_workers – Similar to the train_reader_num_workers.
class horovod.spark.torch.TorchModel[source]

Bases: horovod.spark.common.estimator.HorovodModel, horovod.spark.torch.estimator.TorchEstimatorParamsWritable, horovod.spark.torch.estimator.TorchEstimatorParamsReadable

Spark Transformer wrapping a PyTorch model, used for making predictions on a DataFrame.

Retrieve the underlying PyTorch model by calling torch_model.getModel().

Parameters:
  • history – List of metrics, one entry per epoch during training.
  • model – Trained PyTorch model.
  • feature_columns – List of feature column names.
  • label_columns – List of label column names.
  • optimizer – PyTorch optimizer used during training, containing updated state.
  • run_id – ID of the run used to train the model.
  • loss – PyTorch loss(es).
  • loss_constructors – PyTorch loss constructors.

horovod.spark.common

class horovod.spark.common.estimator.HorovodEstimator[source]
fit(df, params=None)[source]

Fits the model to the DataFrame.

Parameters:
  • df – Input dataset, which is an instance of pyspark.sql.DataFrame.
  • params – An optional param map that overrides embedded params.
Returns:

HorovodModel transformer wrapping the trained model.

fit_on_parquet(params=None)[source]

Trains the model on a saved Parquet file at store.get_train_path().

Parameters:params – An optional param map that overrides embedded params.
Returns:Trained HorovodModel transformer of the appropriate subclass wrapping the trained model.
class horovod.spark.common.estimator.HorovodModel[source]
transform(df, params=None)[source]

Transforms the input dataset with prediction columns representing model predictions.

Prediction column names default to <label_column>__output. Override column names by calling transformer.setOutputCols(col_names).

Parameters:
  • df – Input dataset, which is an instance of pyspark.sql.DataFrame.
  • params – An optional param map that overrides embedded params.
Returns:

Transformed dataset.

class horovod.spark.common.backend.Backend[source]

Bases: object

Interface for remote execution of the distributed training function.

A custom backend can be used in cases where the training environment running Horovod is different from the Spark application running the HorovodEstimator.

num_processes()[source]

Returns the number of processes to use for training.

run(fn, args=(), kwargs={}, env=None)[source]

Executes the training fn and returns results from each worker in a list (ordered by ascending rank).

Parameters:
  • fn – Function to run.
  • args – Arguments to pass to fn.
  • kwargs – Keyword arguments to pass to fn.
  • env – Environment dictionary to use in Horovod run. Defaults to os.environ.
Returns:

List of results returned by running fn on each rank.

class horovod.spark.common.backend.SparkBackend(num_proc=None, env=None, **kwargs)[source]

Bases: horovod.spark.common.backend.Backend

Uses horovod.spark.run to execute the distributed training fn.

num_processes()[source]

Returns the number of processes to use for training.

run(fn, args=(), kwargs={}, env=None)[source]

Executes the training fn and returns results from each worker in a list (ordered by ascending rank).

Parameters:
  • fn – Function to run.
  • args – Arguments to pass to fn.
  • kwargs – Keyword arguments to pass to fn.
  • env – Environment dictionary to use in Horovod run. Defaults to os.environ.
Returns:

List of results returned by running fn on each rank.

class horovod.spark.common.store.DBFSLocalStore(prefix_path, *args, **kwargs)[source]

Bases: horovod.spark.common.store.LocalStore

Uses Databricks File System (DBFS) local file APIs as a store of intermediate data and training artifacts.

Initialized from a prefix_path starts with /dbfs/…, file:///dbfs/… or dbfs:/…, see https://docs.databricks.com/data/databricks-file-system.html#local-file-apis.

get_checkpoint_filename()[source]

Returns the basename of the saved checkpoint file.

static normalize_path(path)[source]

Normalize the path to the form /dbfs/…

read_serialized_keras_model(ckpt_path, model, custom_objects)[source]

Returns serialized keras model. The parameter model is for providing the model structure when the checkpoint file only contains model weights.

class horovod.spark.common.store.FilesystemStore(prefix_path, train_path=None, val_path=None, test_path=None, runs_path=None, save_runs=True)[source]

Bases: horovod.spark.common.store.Store

Abstract class for stores that use a filesystem for underlying storage.

exists(path)[source]

Returns True if the path exists in the store.

get_checkpoint_filename()[source]

Returns the basename of the saved checkpoint file.

get_checkpoint_path(run_id)[source]

Returns the path to the checkpoint file for the given run.

get_logs_path(run_id)[source]

Returns the path to the log directory for the given run.

get_logs_subdir()[source]

Returns the subdirectory name for the logs directory.

get_parquet_dataset(path)[source]

Returns a pyarrow.parquet.ParquetDataset from the path.

get_run_path(run_id)[source]

Returns the path to the run with the given ID.

get_runs_path()[source]

Returns the base path for all runs.

get_test_data_path(idx=None)[source]

Returns the path to the test dataset.

get_train_data_path(idx=None)[source]

Returns the path to the training dataset.

get_val_data_path(idx=None)[source]

Returns the path to the validation dataset.

is_parquet_dataset(path)[source]

Returns True if the path is the root of a Parquet dataset.

read(path)[source]

Returns the contents of the path as bytes.

read_serialized_keras_model(ckpt_path, model, custom_objects)[source]

Reads the checkpoint file of the keras model into model bytes and returns the base 64 encoded model bytes. :param ckpt_path: A string of path to the checkpoint file. :param model: A keras model. This parameter will be used in DBFSLocalStore .read_serialized_keras_model() when the ckpt_path only contains model weights. :param custom_objects: This parameter will be used in DBFSLocalStore .read_serialized_keras_model() when loading the keras model. :return: the base 64 encoded model bytes of the checkpoint model.

saving_runs()[source]

Returns True if run output should be saved during training.

class horovod.spark.common.store.HDFSStore(prefix_path, host=None, port=None, user=None, kerb_ticket=None, driver='libhdfs', extra_conf=None, temp_dir=None, *args, **kwargs)[source]

Bases: horovod.spark.common.store.FilesystemStore

Uses HDFS as a store of intermediate data and training artifacts.

Initialized from a prefix_path that can take one of the following forms:

  1. “hdfs://namenode01:8020/user/test/horovod”
  2. “hdfs:///user/test/horovod”
  3. “/user/test/horovod”

The full path (including prefix, host, and port) will be used for all reads and writes to HDFS through Spark. If host and port are not provided, they will be omitted. If prefix is not provided (case 3), it will be prefixed to the full path regardless.

The localized path (without prefix, host, and port) will be used for interaction with PyArrow. Parsed host and port information will be used to initialize PyArrow HadoopFilesystem if they are not provided through the host and port arguments to this initializer. These parameters will default to default and 0 if neither the path URL nor the arguments provide this information.

sync_fn(run_id)[source]

Returns a function that synchronises given path recursively into run path for run_id.

class horovod.spark.common.store.LocalStore(prefix_path, *args, **kwargs)[source]

Bases: horovod.spark.common.store.FilesystemStore

Uses the local filesystem as a store of intermediate data and training artifacts.

sync_fn(run_id)[source]

Returns a function that synchronises given path recursively into run path for run_id.

class horovod.spark.common.store.Store[source]

Bases: object

Storage layer for intermediate files (materialized DataFrames) and training artifacts (checkpoints, logs).

Store provides an abstraction over a filesystem (e.g., local vs HDFS) or blob storage database. It provides the basic semantics for reading and writing objects, and how to access objects with certain definitions.

The store exposes a generic interface that is not coupled to a specific DataFrame, model, or runtime. Every run of an Estimator should result in a separate run directory containing checkpoints and logs, and every variation in dataset should produce a separate intermediate data path.

In order to allow for caching but to prevent overuse of disk space on intermediate data, intermediate datasets are named in a deterministic sequence. When a dataset is done being used for training, the intermediate files can be reclaimed to free up disk space, but will not be automatically removed so that they can be reused as needed. This is to support both parallel training processes using the same store on multiple DataFrames, as well as iterative training using the same DataFrame on different model variations.

exists(path)[source]

Returns True if the path exists in the store.

get_checkpoint_filename()[source]

Returns the basename of the saved checkpoint file.

get_checkpoint_path(run_id)[source]

Returns the path to the checkpoint file for the given run.

get_logs_path(run_id)[source]

Returns the path to the log directory for the given run.

get_logs_subdir()[source]

Returns the subdirectory name for the logs directory.

get_parquet_dataset(path)[source]

Returns a pyarrow.parquet.ParquetDataset from the path.

get_run_path(run_id)[source]

Returns the path to the run with the given ID.

get_runs_path()[source]

Returns the base path for all runs.

get_test_data_path(idx=None)[source]

Returns the path to the test dataset.

get_train_data_path(idx=None)[source]

Returns the path to the training dataset.

get_val_data_path(idx=None)[source]

Returns the path to the validation dataset.

is_parquet_dataset(path)[source]

Returns True if the path is the root of a Parquet dataset.

read(path)[source]

Returns the contents of the path as bytes.

saving_runs()[source]

Returns True if run output should be saved during training.

sync_fn(run_id)[source]

Returns a function that synchronises given path recursively into run path for run_id.

to_remote(run_id, dataset_idx)[source]

Returns a view of the store that can execute in a remote environment without Horoovd deps.

horovod.ray

class horovod.ray.RayExecutor(settings, num_hosts: int = 1, num_slots: int = 1, cpus_per_slot: int = 1, use_gpu: bool = False, gpus_per_slot: Optional[int] = None)[source]

Job class for Horovod + Ray integration.

Parameters:
  • settings (horovod.Settings) – Configuration for job setup. You can use a standard Horovod Settings object or create one directly from RayExecutor.create_settings.
  • num_hosts (int) – Number of machines to execute the job on.
  • num_slots (int) – Humber of workers to be placed on each machine.
  • cpus_per_slot (int) – Number of CPU resources to allocate to each worker.
  • use_gpu (bool) – Whether to use GPU for allocation. TODO: this can be removed.
  • gpus_per_slot (int) – Number of GPU resources to allocate to each worker.
classmethod create_settings(timeout_s, ssh_identity_file=None, ssh_str=None)[source]

Create a mini setting object.

Parameters:
  • timeout_s (int) – Tiemout parameter for Gloo rendezvous.
  • ssh_identity_file (str) – Path to the identity file to ssh into different hosts on the cluster.
  • ssh_str (str) – CAUTION WHEN USING THIS. Private key file contents. Writes the private key to ssh_identity_file.
Returns:

MiniSettings object.

execute(fn: Callable[[executable_cls], Any]) → List[Any][source]

Executes the provided function on all workers.

Parameters:fn – Target function to be invoked on every object.
Returns:Deserialized return values from the target function.
execute_single(fn: Callable[[executable_cls], Any]) → List[Any][source]

Executes the provided function on the rank 0 worker (chief).

Parameters:fn – Target function to be invoked on the chief object.
Returns:Deserialized return values from the target function.
run(fn: Callable[[Any], Any], args: Optional[List[T]] = None, kwargs: Optional[Dict[KT, VT]] = None) → List[Any][source]

Executes the provided function on all workers.

Parameters:
  • fn – Target function that can be executed with arbitrary args and keyword arguments.
  • args – List of arguments to be passed into the target function.
  • kwargs – Dictionary of keyword arguments to be passed into the target function.
Returns:

Deserialized return values from the target function.

run_remote(fn: Callable[[Any], Any], args: Optional[List[T]] = None, kwargs: Optional[Dict[KT, VT]] = None) → List[Any][source]

Executes the provided function on all workers.

Parameters:
  • fn – Target function that can be executed with arbitrary args and keyword arguments.
  • args – List of arguments to be passed into the target function.
  • kwargs – Dictionary of keyword arguments to be passed into the target function.
Returns:

List of ObjectRefs that you can run ray.get on to

retrieve values.

Return type:

list

shutdown()[source]

Destroys the provided workers.

start(executable_cls: type = None, executable_args: Optional[List[T]] = None, executable_kwargs: Optional[Dict[KT, VT]] = None, extra_env_vars: Optional[Dict[KT, VT]] = None)[source]

Starts the workers and colocates them on all machines.

We implement a node grouping because it seems like our implementation doesn’t quite work for imbalanced nodes. Also, colocation performance is typically much better than non-colocated workers.

Parameters:
  • executable_cls (type) – The class that will be created within an actor (BaseHorovodWorker). This will allow Horovod to establish its connections and set env vars.
  • executable_args (List) – Arguments to be passed into the worker class upon initialization.
  • executable_kwargs (Dict) – Keyword arguments to be passed into the worker class upon initialization.
  • extra_env_vars (Dict) – Environment variables to be set on the actors (worker processes) before initialization.
class horovod.ray.ElasticRayExecutor(settings: horovod.runner.elastic.settings.ElasticSettings, use_gpu: bool = False, cpus_per_slot: int = 1, gpus_per_slot: Optional[int] = None, env_vars: dict = None, override_discovery=True)[source]

Executor for elastic jobs using Ray.

Leverages the Ray global state to detect available hosts and slots. Assumes that the entire Ray cluster is available for the Executor to use.

Parameters:
  • settings – Configuration for the elastic job setup. You can use a standard Horovod ElasticSettings object or create one directly from ElasticRayExecutor.create_settings.
  • use_gpu (bool) – Whether to use GPU for allocation.
  • cpus_per_slot (int) – Number of CPU resources to allocate to each worker.
  • gpus_per_slot (int) – Number of GPU resources to allocate to each worker.
  • env_vars (Dict) – Environment variables to be set on the actors (worker processes) before initialization.
  • override_discovery (bool) – Whether for the ElasticRayExecutor to automatically provide a discovery mechanism for ElasticSettings.

Example:

import ray
ray.init(address="auto")
settings = ElasticRayExecutor.create_settings(verbose=True)
executor = ElasticRayExecutor(
    settings, use_gpu=True, cpus_per_slot=2)
executor.start()
executor.run(train_fn)
static create_settings(min_np: int = 1, max_np: int = None, reset_limit: int = None, elastic_timeout: int = 600, timeout_s: int = 30, ssh_identity_file: str = None, nics: str = None, **kwargs)[source]

Returns a Settings object for ElasticRayExecutor.

Note that the discovery property will be set at runtime.

Parameters:
  • min_np (int) – Minimum number of processes running for training to continue. If number of available processes dips below this threshold, then training will wait for more instances to become available.
  • max_np (int) – Maximum number of training processes, beyond which no additional processes will be created. If not specified, then will be unbounded.
  • reset_limit (int) – Maximum number of times that the training job can scale up or down the number of workers after which the job is terminated.
  • elastic_timeout (int) – Timeout for elastic initialisation after re-scaling the cluster. The default value is 600 seconds. Alternatively, the environment variable HOROVOD_ELASTIC_TIMEOUT can also be used.’
  • timeout_s (int) – Horovod performs all the checks and starts the processes before the specified timeout. The default value is 30 seconds.
  • ssh_identity_file (str) – File on the driver from which the identity (private key) is read.
  • nics (set) – Network interfaces that can be used for communication.
run(worker_fn: Callable, callbacks: Optional[List[Callable]] = None) → List[Any][source]

Executes the provided function on all workers.

Parameters:
  • worker_fn – Target elastic function that can be executed.
  • callbacks – List of callables. Each callback must either be a callable function or a class that implements __call__. Every callback will be invoked on every value logged by the rank 0 worker.
Returns:

List of return values from every completed worker.

start()[source]

Starts the Horovod driver and services.

horovod.run

Launch a Horovod job to run the specified process function and get the return value.

param func:The function to be run in Horovod job processes. The function return value will be collected as the corresponding Horovod process return value. This function must be compatible with pickle.
param args:Arguments to pass to func.
param kwargs:Keyword arguments to pass to func.
param np:Number of Horovod processes.
param min_np:Minimum number of processes running for training to continue. If number of available processes dips below this threshold, then training will wait for more instances to become available. Defaults to np
param max_np:Maximum number of training processes, beyond which no additional processes will be created. If not specified, then will be unbounded.
param slots:Number of slots for processes per host. Normally 1 slot per GPU per host. If slots are provided by the output of the host discovery script, then that value will override this parameter.
param reset_limit:
 Maximum number of times that the training job can scale up or down the number of workers after which the job is terminated. A reset event occurs when workers are added or removed from the job after the initial registration. So a reset_limit of 0 would mean the job cannot change membership after its initial set of workers. A reset_limit of 1 means it can resize at most once, etc.
param hosts:List of host names and the number of available slots for running processes on each, of the form: <hostname>:<slots> (e.g.: host1:2,host2:4,host3:1 indicating 2 processes can run on host1, 4 on host2, and 1 on host3). If not specified, defaults to using localhost:<np>
param hostfile:Path to a host file containing the list of host names and the number of available slots. Each line of the file must be of the form: <hostname> slots=<slots>
param start_timeout:
 Horovodrun has to perform all the checks and start the processes before the specified timeout. The default value is 30 seconds. Alternatively, The environment variable HOROVOD_START_TIMEOUT can also be used to specify the initialization timeout.
param ssh_port:SSH port on all the hosts.
param ssh_identity_file:
 SSH identity (private key) file.
param disable_cache:
 If the flag is not set, horovodrun will perform the initialization checks only once every 60 minutes – if the checks successfully pass. Otherwise, all the checks will run every time horovodrun is called.’
param output_filename:
 For Gloo, writes stdout / stderr of all processes to a filename of the form <output_filename>/rank.<rank>/<stdout | stderr>. The <rank> will be padded with 0 characters to ensure lexicographical order. For MPI, delegates its behavior to mpirun.
param verbose:If this flag is set, extra messages will be printed.
param use_gloo:Run Horovod using the Gloo controller. This will be the default if Horovod was not built with MPI support.
param use_mpi:Run Horovod using the MPI controller. This will be the default if Horovod was built with MPI support.
param mpi_args:Extra arguments for the MPI controller. This is only used when use_mpi is True.
param network_interface:
 Network interfaces to use for communication separated by comma. If not specified, Horovod will find the common NICs among all the workers and use those; example, eth0,eth1.
return:Return a list which contains values return by all Horovod processes. The index of the list corresponds to the rank of each Horovod process.