API¶
horovod.tensorflow¶
-
class
horovod.tensorflow.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
NoneCompressor
-
fp16
¶ alias of
FP16Compressor
-
-
horovod.tensorflow.
allgather
(tensor, name=None)[source]¶ An op which concatenates the input tensor with the same input tensor on all other Horovod processes.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Returns
A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes.
-
horovod.tensorflow.
broadcast
(tensor, root_rank, name=None)[source]¶ An op which broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes.
The broadcast operation is keyed by the name of the op. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.tensorflow.
init
(comm=None)¶ A function that initializes Horovod.
- Parameters
comm – List specifying ranks for the communicator, relative to the MPI_COMM_WORLD communicator OR the MPI communicator to use. Given communicator will be duplicated. If None, Horovod will use MPI_COMM_WORLD Communicator.
-
horovod.tensorflow.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.tensorflow.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.tensorflow.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.tensorflow.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.tensorflow.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.tensorflow.
is_homogeneous
()¶ Returns True if the cluster is homogeneous.
- Returns
A boolean value indicating whether every node in the cluster has same number of ranks.
-
horovod.tensorflow.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.tensorflow.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.tensorflow.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.tensorflow.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.tensorflow.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.tensorflow.
nccl_built
()¶ Returns True if Horovod was compiled with NCCL support.
- Returns
A boolean value indicating whether NCCL support was compiled.
-
horovod.tensorflow.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.tensorflow.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
horovod.tensorflow.
allreduce
(tensor, average=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=None)[source]¶ Perform an allreduce on a tf.Tensor or tf.IndexedSlices.
This function performs a bandwidth-optimal ring allreduce on the input tensor. If the input is an tf.IndexedSlices, the function instead does an allgather on the values and the indices, effectively doing an allreduce on the represented tensor.
- Parameters
tensor – tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce. The shape of the input must be identical across all ranks.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v0.21.0.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_ALLREDUCE.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_ALLGATHER.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
- Returns
A tensor of the same shape and type as tensor, summed across all processes.
-
horovod.tensorflow.
broadcast_variables
(variables, root_rank)[source]¶ Broadcasts variables from root rank to all other processes.
- Parameters
variables – variables for broadcast
root_rank – rank of the process from which global variables will be broadcasted to all other processes.
-
horovod.tensorflow.
broadcast_global_variables
(root_rank)[source]¶ Broadcasts all global variables from root rank to all other processes.
NOTE: deprecated in TensorFlow 2.0.
- Parameters
root_rank – rank of the process from which global variables will be broadcasted to all other processes.
-
class
horovod.tensorflow.
BroadcastGlobalVariablesHook
(root_rank, device='')[source]¶ SessionRunHook that will broadcast all global variables from root rank to all other processes during initialization.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
NOTE: deprecated in TensorFlow 2.0.
-
horovod.tensorflow.
DistributedOptimizer
(optimizer, name=None, use_locking=False, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, backward_passes_per_step=1, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140480769209680'>)[source]¶ Construct a new DistributedOptimizer, which uses another optimizer under the hood for computing single-process gradient values and applying gradient updates after the gradient values have been combined across all the Horovod ranks.
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
use_locking – Whether to use locking when updating variables. See Optimizer.__init__ for more info.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_ALLREDUCE.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_ALLGATHER.
compression – Compression algorithm used during allreduce to reduce the amount of data sent during each parameter update step. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
backward_passes_per_step – Number of backward passes to perform before calling hvd.allreduce. This allows accumulating updates over multiple mini-batches before reducing and applying them.
op – The reduction operation to use when combining gradients across different ranks.
-
horovod.tensorflow.
DistributedGradientTape
(gradtape, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140480769209680'>)[source]¶ A tape that wraps another tf.GradientTape, using an allreduce to combine gradient values before applying gradients to model weights.
- Parameters
gradtape – GradientTape to use for computing gradients and applying updates.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_ALLREDUCE.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_ALLGATHER.
compression – Compression algorithm used during allreduce to reduce the amount of data sent during each parameter update step. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
op – The reduction operation to use when combining gradients across different ranks.
horovod.tensorflow.keras¶
-
horovod.tensorflow.keras.
init
(comm=None)¶ A function that initializes Horovod.
- Parameters
comm – List specifying ranks for the communicator, relative to the MPI_COMM_WORLD communicator OR the MPI communicator to use. Given communicator will be duplicated. If None, Horovod will use MPI_COMM_WORLD Communicator.
-
horovod.tensorflow.keras.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.tensorflow.keras.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.tensorflow.keras.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.tensorflow.keras.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.tensorflow.keras.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.tensorflow.keras.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.tensorflow.keras.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.tensorflow.keras.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.tensorflow.keras.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.tensorflow.keras.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.tensorflow.keras.
nccl_built
()¶ Returns True if Horovod was compiled with NCCL support.
- Returns
A boolean value indicating whether NCCL support was compiled.
-
horovod.tensorflow.keras.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.tensorflow.keras.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
class
horovod.tensorflow.keras.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
NoneCompressor
-
fp16
¶ alias of
FP16Compressor
-
-
horovod.tensorflow.keras.
DistributedOptimizer
(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False)[source]¶ An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to average gradient values before applying gradients to model weights.
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_ALLREDUCE.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_ALLGATHER.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
-
horovod.tensorflow.keras.
broadcast_global_variables
(root_rank)[source]¶ Broadcasts all global variables from root rank to all other processes.
- Parameters
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
-
horovod.tensorflow.keras.
allreduce
(value, name=None, average=True)[source]¶ Perform an allreduce on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
name – Optional name for the constants created by this operation.
average – If True, computes the average over all ranks. Otherwise, computes the sum over all ranks.
-
horovod.tensorflow.keras.
allgather
(value, name=None)[source]¶ Perform an allgather on a tensor-compatible value.
The concatenation is done on the first dimension, so the input values on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
value – A tensor-compatible value to gather.
name – Optional name prefix for the constants created by this operation.
-
horovod.tensorflow.keras.
broadcast
(value, root_rank, name=None)[source]¶ Perform a broadcast on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
name – Optional name for the constants created by this operation.
-
horovod.tensorflow.keras.
load_model
(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[source]¶ Loads a saved Keras model with a Horovod DistributedOptimizer.
The DistributedOptimizer will wrap the underlying optimizer used to train the saved model, so that the optimizer state (params and weights) will be picked up for retraining.
By default, all optimizers in the module keras.optimizers will be loaded and wrapped without needing to specify any custom_optimizers or custom_objects.
- Parameters
filepath – One of the following: - string, path to the saved model, or - h5py.File object from which to load the model
custom_optimizers – Optional list of Optimizer subclasses to support during loading.
custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
- Returns
A Keras model instance.
- Raises
ImportError – If h5py is not available.
ValueError – In case of an invalid savefile.
-
class
horovod.tensorflow.keras.callbacks.
BroadcastGlobalVariablesCallback
(root_rank, device='')[source]¶ Keras Callback that will broadcast all global variables from root rank to all other processes during initialization.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
-
__init__
(root_rank, device='')[source]¶ Construct a new BroadcastGlobalVariablesCallback that will broadcast all global variables from root rank to all other processes during initialization.
- Parameters
root_rank – Rank that will send data, other ranks will receive data.
device – Device to be used for broadcasting. Uses GPU by default if Horovod was build with HOROVOD_GPU_BROADCAST.
-
-
class
horovod.tensorflow.keras.callbacks.
MetricAverageCallback
(device='')[source]¶ Keras Callback that will average metrics across all processes at the end of the epoch. Useful in conjuction with ReduceLROnPlateau, TensorBoard and other metrics-based callbacks.
Note: This callback must be added to the callback list before the ReduceLROnPlateau, TensorBoard or other metrics-based callbacks.
-
class
horovod.tensorflow.keras.callbacks.
LearningRateScheduleCallback
(multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None, initial_lr=None)[source]¶ LearningRateScheduleCallback sets learning rate between epochs start_epoch and end_epoch to be initial_lr * multiplier. multiplier can be a constant or a function f(epoch) = lr’.
If multiplier is a function and staircase=True, learning rate adjustment will happen at the beginning of each epoch and the epoch passed to the multiplier function will be an integer.
If multiplier is a function and staircase=False, learning rate adjustment will happen at the beginning of each batch and the epoch passed to the multiplier function will be a floating number: epoch’ = epoch + batch / steps_per_epoch. This functionality is useful for smooth learning rate adjustment schedulers, such as LearningRateWarmupCallback.
initial_lr is the learning rate of the model optimizer at the start of the training.
-
__init__
(multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None, initial_lr=None)[source]¶ Construct a new LearningRateScheduleCallback.
- Parameters
multiplier – A constant multiplier or a function f(epoch) = lr’
start_epoch – The first epoch this adjustment will be applied to. Defaults to 0.
end_epoch – The epoch this adjustment will stop applying (exclusive end). Defaults to None.
staircase – Whether to adjust learning rate at the start of epoch (staircase=True) or at the start of every batch (staircase=False).
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
initial_lr –
Initial learning rate at the start of training.
Warning
Will be required in v0.21.0.
-
-
class
horovod.tensorflow.keras.callbacks.
LearningRateWarmupCallback
(warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0, initial_lr=None)[source]¶ Implements gradual learning rate warmup:
lr = initial_lr / hvd.size() —> lr = initial_lr
initial_lr is the learning rate of the model optimizer at the start of the training.
This technique was described in the paper “Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour”. See https://arxiv.org/pdf/1706.02677.pdf for details.
Math recap:
\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]-
__init__
(warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0, initial_lr=None)[source]¶ Construct a new LearningRateWarmupCallback that will gradually warm up the learning rate.
- Parameters
warmup_epochs – The number of epochs of the warmup phase. Defaults to 5.
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
verbose – verbosity mode, 0 or 1.
initial_lr –
Initial learning rate at the start of training.
Warning
Will be required in v0.21.0.
-
horovod.keras¶
-
horovod.keras.
init
(comm=None)¶ A function that initializes Horovod.
- Parameters
comm – List specifying ranks for the communicator, relative to the MPI_COMM_WORLD communicator OR the MPI communicator to use. Given communicator will be duplicated. If None, Horovod will use MPI_COMM_WORLD Communicator.
-
horovod.keras.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.keras.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.keras.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.keras.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.keras.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.keras.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.keras.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.keras.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.keras.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.keras.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.keras.
nccl_built
()¶ Returns True if Horovod was compiled with NCCL support.
- Returns
A boolean value indicating whether NCCL support was compiled.
-
horovod.keras.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.keras.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
class
horovod.keras.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
NoneCompressor
-
fp16
¶ alias of
FP16Compressor
-
-
horovod.keras.
DistributedOptimizer
(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False)[source]¶ An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to average gradient values before applying gradients to model weights.
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_ALLREDUCE.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_ALLGATHER.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
-
horovod.keras.
broadcast_global_variables
(root_rank)[source]¶ Broadcasts all global variables from root rank to all other processes.
- Parameters
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
-
horovod.keras.
allreduce
(value, name=None, average=True)[source]¶ Perform an allreduce on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
name – Optional name for the constants created by this operation.
average – If True, computes the average over all ranks. Otherwise, computes the sum over all ranks.
-
horovod.keras.
allgather
(value, name=None)[source]¶ Perform an allgather on a tensor-compatible value.
The concatenation is done on the first dimension, so the input values on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
value – A tensor-compatible value to gather.
name – Optional name prefix for the constants created by this operation.
-
horovod.keras.
broadcast
(value, root_rank, name=None)[source]¶ Perform a broadcast on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
name – Optional name for the constants created by this operation.
-
horovod.keras.
load_model
(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[source]¶ Loads a saved Keras model with a Horovod DistributedOptimizer.
The DistributedOptimizer will wrap the underlying optimizer used to train the saved model, so that the optimizer state (params and weights) will be picked up for retraining.
By default, all optimizers in the module keras.optimizers will be loaded and wrapped without needing to specify any custom_optimizers or custom_objects.
- Parameters
filepath – One of the following: - string, path to the saved model, or - h5py.File object from which to load the model
custom_optimizers – Optional list of Optimizer subclasses to support during loading.
custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
- Returns
A Keras model instance.
- Raises
ImportError – If h5py is not available.
ValueError – In case of an invalid savefile.
-
class
horovod.keras.callbacks.
BroadcastGlobalVariablesCallback
(root_rank, device='')[source]¶ Keras Callback that will broadcast all global variables from root rank to all other processes during initialization.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
-
__init__
(root_rank, device='')[source]¶ Construct a new BroadcastGlobalVariablesCallback that will broadcast all global variables from root rank to all other processes during initialization.
- Parameters
root_rank – Rank that will send data, other ranks will receive data.
device – Device to be used for broadcasting. Uses GPU by default if Horovod was build with HOROVOD_GPU_BROADCAST.
-
-
class
horovod.keras.callbacks.
MetricAverageCallback
(device='')[source]¶ Keras Callback that will average metrics across all processes at the end of the epoch. Useful in conjuction with ReduceLROnPlateau, TensorBoard and other metrics-based callbacks.
Note: This callback must be added to the callback list before the ReduceLROnPlateau, TensorBoard or other metrics-based callbacks.
-
class
horovod.keras.callbacks.
LearningRateScheduleCallback
(multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None, initial_lr=None)[source]¶ LearningRateScheduleCallback sets learning rate between epochs start_epoch and end_epoch to be initial_lr * multiplier. multiplier can be a constant or a function f(epoch) = lr’.
If multiplier is a function and staircase=True, learning rate adjustment will happen at the beginning of each epoch and the epoch passed to the multiplier function will be an integer.
If multiplier is a function and staircase=False, learning rate adjustment will happen at the beginning of each batch and the epoch passed to the multiplier function will be a floating number: epoch’ = epoch + batch / steps_per_epoch. This functionality is useful for smooth learning rate adjustment schedulers, such as LearningRateWarmupCallback.
initial_lr is the learning rate of the model optimizer at the start of the training.
-
__init__
(multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None, initial_lr=None)[source]¶ Construct a new LearningRateScheduleCallback.
- Parameters
multiplier – A constant multiplier or a function f(epoch) = lr’
start_epoch – The first epoch this adjustment will be applied to. Defaults to 0.
end_epoch – The epoch this adjustment will stop applying (exclusive end). Defaults to None.
staircase – Whether to adjust learning rate at the start of epoch (staircase=True) or at the start of every batch (staircase=False).
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
initial_lr –
Initial learning rate at the start of training.
Warning
Will be required in v0.21.0.
-
-
class
horovod.keras.callbacks.
LearningRateWarmupCallback
(warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0, initial_lr=None)[source]¶ Implements gradual learning rate warmup:
lr = initial_lr / hvd.size() —> lr = initial_lr
initial_lr is the learning rate of the model optimizer at the start of the training.
This technique was described in the paper “Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour”. See https://arxiv.org/pdf/1706.02677.pdf for details.
Math recap:
\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]-
__init__
(warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0, initial_lr=None)[source]¶ Construct a new LearningRateWarmupCallback that will gradually warm up the learning rate.
- Parameters
warmup_epochs – The number of epochs of the warmup phase. Defaults to 5.
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
verbose – verbosity mode, 0 or 1.
initial_lr –
Initial learning rate at the start of training.
Warning
Will be required in v0.21.0.
-
horovod.torch¶
-
class
horovod.torch.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
NoneCompressor
-
fp16
¶ alias of
FP16Compressor
-
-
horovod.torch.
allreduce
(tensor, average=None, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=None)[source]¶ A function that performs averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v0.21.0.
name – A name of the reduction operation.
compression – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.torch.
allreduce_async
(tensor, average=None, name=None, op=None)[source]¶ A function that performs asynchronous averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v0.21.0.
name – A name of the reduction operation.
op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
- Returns
A handle to the allreduce operation that can be used with poll() or synchronize().
-
horovod.torch.
allreduce_
(tensor, average=None, name=None, op=None)[source]¶ A function that performs in-place averaging or summation of the input tensor over all the Horovod processes.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v0.21.0.
name – A name of the reduction operation.
op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.torch.
allreduce_async_
(tensor, average=None, name=None, op=None)[source]¶ A function that performs asynchronous in-place averaging or summation of the input tensor over all the Horovod processes.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v0.21.0.
name – A name of the reduction operation.
op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
- Returns
A handle to the allreduce operation that can be used with poll() or synchronize().
-
horovod.torch.
allgather
(tensor, name=None)[source]¶ A function that concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to allgather.
name – A name of the allgather operation.
- Returns
A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes.
-
horovod.torch.
allgather_async
(tensor, name=None)[source]¶ A function that asynchronously concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensor – A tensor to allgather.
name – A name of the allgather operation.
- Returns
A handle to the allgather operation that can be used with poll() or synchronize().
-
horovod.torch.
broadcast
(tensor, root_rank, name=None)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.torch.
broadcast_async
(tensor, root_rank, name=None)[source]¶ A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
- Returns
A handle to the broadcast operation that can be used with poll() or synchronize().
-
horovod.torch.
broadcast_
(tensor, root_rank, name=None)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.torch.
broadcast_async_
(tensor, root_rank, name=None)[source]¶ A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
- Returns
A handle to the broadcast operation that can be used with poll() or synchronize().
-
horovod.torch.
join
(device=-1)[source]¶ A function that indicates that the rank finished processing data.
All ranks that did not call join() continue to process allreduce operations. This function blocks Python thread until all ranks join.
- Parameters
device – An id of the device to create temprorary zero tensors (default -1, CPU)
- Returns
Id of the rank that joined last.
-
horovod.torch.
poll
(handle)[source]¶ Polls an allreduce, allgather or broadcast handle to determine whether underlying asynchronous operation has completed. After poll() returns True, synchronize() will return without blocking.
- Parameters
handle – A handle returned by an allreduce, allgather or broadcast asynchronous operation.
- Returns
A flag indicating whether the operation has completed.
-
horovod.torch.
synchronize
(handle)[source]¶ Synchronizes an asynchronous allreduce, allgather or broadcast operation until it’s completed. Returns the result of the operation.
- Parameters
handle – A handle returned by an allreduce, allgather or broadcast asynchronous operation.
- Returns
An output tensor of the operation.
-
horovod.torch.
init
(comm=None)¶ A function that initializes Horovod.
- Parameters
comm – List specifying ranks for the communicator, relative to the MPI_COMM_WORLD communicator OR the MPI communicator to use. Given communicator will be duplicated. If None, Horovod will use MPI_COMM_WORLD Communicator.
-
horovod.torch.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.torch.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.torch.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.torch.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.torch.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.torch.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.torch.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.torch.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.torch.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.torch.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.torch.
nccl_built
()¶ Returns True if Horovod was compiled with NCCL support.
- Returns
A boolean value indicating whether NCCL support was compiled.
-
horovod.torch.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.torch.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
class
horovod.torch.
SyncBatchNorm
(num_features, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)[source]¶ Applies synchronous version of N-dimensional BatchNorm.
In this version, normalization parameters are synchronized across workers during forward pass. This is very useful in situations where each GPU can fit a very small number of examples.
See https://pytorch.org/docs/stable/nn.html#batchnorm2d for more details about BatchNorm.
- Parameters
num_features – number of channels C from the shape (N, C, …)
eps – a value added to the denominator for numerical stability. Default: 1e-5
momentum – the value used for the running_mean and running_var computation. Can be set to None for cumulative moving average (i.e. simple average). Default: 0.1
affine – a boolean value that when set to True, this module has learnable affine parameters. Default: True
track_running_stats – a boolean value that when set to True, this module tracks the running mean and variance, and when set to False, this module does not track such statistics and always uses batch statistics in both training and eval modes. Default: True
Note
Only GPU input tensors are supported in the training mode.
-
horovod.torch.
DistributedOptimizer
(optimizer, named_parameters=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, backward_passes_per_step=1, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140480766322448'>)[source]¶ An optimizer that wraps another torch.optim.Optimizer, using an allreduce to combine gradient values before applying gradients to model weights.
Allreduce operations are executed after each gradient is computed by
loss.backward()
in parallel with each other. Thestep()
method ensures that all allreduce operations are finished before applying gradients to the model.DistributedOptimizer exposes the
synchronize()
method, which forces allreduce operations to finish before continuing the execution. It’s useful in conjunction with gradient clipping, or other operations that modify gradients in place beforestep()
is executed. Make sure to useoptimizer.skip_synchronize()
if you’re callingsynchronize()
in your code.Example of gradient clipping:
output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.synchronize() torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip) with optimizer.skip_synchronize(): optimizer.step()
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
named_parameters – A mapping between parameter names and values. Used for naming of allreduce operations. Typically just
model.named_parameters()
.compression – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Defaults to not using compression.
backward_passes_per_step – Number of expected backward passes to perform before calling step()/synchronize(). This allows accumulating gradients over multiple mini-batches before reducing and applying them.
op – The reduction operation to use when combining gradients across different ranks.
-
horovod.torch.
broadcast_parameters
(params, root_rank)[source]¶ Broadcasts the parameters from root rank to all other processes. Typical usage is to broadcast the
model.state_dict()
,model.named_parameters()
, ormodel.parameters()
.- Parameters
params – One of the following: - list of parameters to broadcast - dict of parameters to broadcast
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
-
horovod.torch.
broadcast_optimizer_state
(optimizer, root_rank)[source]¶ Broadcasts an optimizer state from root rank to all other processes.
- Parameters
optimizer – An optimizer.
root_rank – The rank of the process from which the optimizer will be broadcasted to all other processes.
-
horovod.torch.
broadcast_object
(obj, root_rank, name=None)[source]¶ Serializes and broadcasts an object from root rank to all other processes. Typical usage is to broadcast the optimizer.state_dict(), for example:
state_dict = broadcast_object(optimizer.state_dict(), 0) if hvd.rank() > 0: optimizer.load_state_dict(state_dict)
- Parameters
obj – An object capable of being serialized without losing any context.
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
name – Optional name to use during broadcast, will default to the class type.
- Returns
The object that was broadcast from the root_rank.
horovod.mxnet¶
-
horovod.mxnet.
allgather
(tensor, name=None, priority=0)[source]¶ A function that concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to allgather.
name – A name of the allgather operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
- Returns
A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes.
-
horovod.mxnet.
allreduce
(tensor, average=True, name=None, priority=0)[source]¶ A function that performs averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to average and sum.
average – A flag indicating whether to compute average or summation, defaults to average.
name – A name of the reduction operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.mxnet.
allreduce_
(tensor, average=True, name=None, priority=0)[source]¶ A function that performs in-place averaging or summation of the input tensor over all the Horovod processes.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to average and sum.
average – A flag indicating whether to compute average or summation, defaults to average.
name – A name of the reduction operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.mxnet.
broadcast
(tensor, root_rank, name=None, priority=0)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.mxnet.
broadcast_
(tensor, root_rank, name=None, priority=0)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.mxnet.
init
(comm=None)¶ A function that initializes Horovod.
- Parameters
comm – List specifying ranks for the communicator, relative to the MPI_COMM_WORLD communicator OR the MPI communicator to use. Given communicator will be duplicated. If None, Horovod will use MPI_COMM_WORLD Communicator.
-
horovod.mxnet.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.mxnet.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.mxnet.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.mxnet.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.mxnet.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.mxnet.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.mxnet.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.mxnet.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.mxnet.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.mxnet.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.mxnet.
nccl_built
()¶ Returns True if Horovod was compiled with NCCL support.
- Returns
A boolean value indicating whether NCCL support was compiled.
-
horovod.mxnet.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.mxnet.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
horovod.mxnet.
broadcast_parameters
(params, root_rank=0)[source]¶ Broadcasts the parameters from root rank to all other processes. Typical usage is to broadcast the Module.get_params() or the Block.collect_params().
- Parameters
params – One of the following: - dict of parameters to broadcast - ParameterDict to broadcast
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
horovod.spark¶
-
horovod.spark.
run
(fn, args=(), kwargs={}, num_proc=None, start_timeout=None, use_mpi=None, use_gloo=None, extra_mpi_args=None, env=None, stdout=None, stderr=None, verbose=1, nics=None)[source]¶ Runs Horovod in Spark. Runs num_proc processes executing fn using the same amount of Spark tasks.
- Parameters
fn – Function to run.
args – Arguments to pass to fn.
kwargs – Keyword arguments to pass to fn.
num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
start_timeout – Timeout for Spark tasks to spawn, register and start running the code, in seconds. If not set, falls back to HOROVOD_SPARK_START_TIMEOUT environment variable value. If it is not set as well, defaults to 600 seconds.
extra_mpi_args – Extra arguments for mpi_run. Defaults to no extra args.
env – Environment dictionary to use in Horovod run.
stdout – Horovod stdout is redirected to this stream. Defaults to sys.stdout.
stderr – Horovod stderr is redirected to this stream. Defaults to sys.stderr.
verbose – Debug output verbosity (0-2). Defaults to 1.
nics – List of NICs for tcp network communication.
- Returns
List of results returned by running fn on each rank.
horovod.spark.keras¶
-
class
horovod.spark.keras.
KerasEstimator
[source]¶ Bases:
horovod.spark.common.estimator.HorovodEstimator
,horovod.spark.keras.estimator.KerasEstimatorParamsReadable
,horovod.spark.keras.estimator.KerasEstimatorParamsWritable
Spark Estimator for fitting Keras models to a DataFrame.
Supports standalone keras and tf.keras, and TensorFlow 1.X and 2.X.
- Parameters
num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
model – Keras model to train.
backend – Optional Backend object for running distributed training function. Defaults to SparkBackend with num_proc worker processes. Cannot be specified if num_proc is also provided.
store – Store object that abstracts reading and writing of intermediate data and run results.
custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during serialization/deserialization.
optimizer – Keras optimizer to be converted into a hvd.DistributedOptimizer for training.
loss – Keras loss or list of losses.
loss_weights – Optional list of float weight values to assign each loss.
sample_weight_col – Optional column indicating the weight of each sample.
gradient_compression – Gradient compression used by hvd.DistributedOptimizer.
metrics – Optional metrics to record.
feature_cols – Column names used as feature inputs to the model. Must be a list with each feature mapping to a sequential argument in the model’s forward() function.
label_cols – Column names used as labels. Must be a list with one label for each output of the model.
validation – Optional validation column name (string) where every row in the column is either 1/True or 0/False, or validation split (float) giving percent of data to be randomly selected for validation.
callbacks – Keras callbacks.
batch_size – Number of rows from the DataFrame per batch.
epochs – Number of epochs to train.
verbose – Verbosity level [0, 2] (default: 1).
shuffle_buffer_size – Optional size of in-memory shuffle buffer in rows. Allocating a larger buffer size increases randomness of shuffling at the cost of more host memory. Defaults to estimating with an assumption of 4GB of memory per host.
partitions_per_process – Number of Parquet partitions to assign per worker process from num_proc (default: 10).
run_id – Optional unique ID for this run for organization in the Store. Will be automatically assigned if not provided.
train_steps_per_epoch – Number of steps to train each epoch. Useful for testing that model trains successfully. Defaults to training the entire dataset each epoch.
validation_steps_per_epoch – Number of validation steps to perform each epoch.
transformation_fn – Optional function that takes a row as its parameter and returns a modified row that is then fed into the train or validation step. This transformation is applied after batching. See Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py) for more details. Note that this fucntion constructs another function which should perform the transformation.
train_reader_num_workers – This parameter specifies the number of parallel processes that read the training data from data store and apply data transformations to it. Increasing this number will generally increase the reading rate but will also increase the memory footprint. More processes are particularly useful if the bandwidth to the data store is not high enough, or users need to apply transformation such as decompression or data augmentation on raw data.
val_reader_num_workers – Similar to the train_reader_num_workers.
-
class
horovod.spark.keras.
KerasModel
[source]¶ Bases:
horovod.spark.common.estimator.HorovodModel
,horovod.spark.keras.estimator.KerasEstimatorParamsReadable
,horovod.spark.keras.estimator.KerasEstimatorParamsWritable
Spark Transformer wrapping a Keras model, used for making predictions on a DataFrame.
Retrieve the underlying Keras model by calling keras_model.getModel().
- Parameters
history – List of metrics, one entry per epoch during training.
model – Trained Keras model.
feature_columns – List of feature column names.
label_columns – List of label column names.
custom_objects – Keras custom objects.
run_id – ID of the run used to train the model.
horovod.spark.torch¶
-
class
horovod.spark.torch.
TorchEstimator
[source]¶ Bases:
horovod.spark.common.estimator.HorovodEstimator
,horovod.spark.torch.estimator.TorchEstimatorParamsWritable
,horovod.spark.torch.estimator.TorchEstimatorParamsReadable
Spark Estimator for fitting PyTorch models to a DataFrame.
- Parameters
num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
model – PyTorch model to train.
backend – Optional Backend object for running distributed training function. Defaults to SparkBackend with num_proc worker processes. Cannot be specified if num_proc is also provided.
store – Store object that abstracts reading and writing of intermediate data and run results.
optimizer – PyTorch optimizer to be converted into a hvd.DistributedOptimizer for training.
loss – PyTorch loss or list of losses.
loss_constructors – Optional functions that generate losses.
metrics – Optional metrics to record.
loss_weights – Optional list of float weight values to assign each loss.
sample_weight_col – Optional column indicating the weight of each sample.
gradient_compression – Gradient compression used by hvd.DistributedOptimizer.
feature_cols – Column names used as feature inputs to the model. Must be a list with each feature mapping to a sequential argument in the model’s forward() function.
input_shapes – List of shapes for each input tensor to the model.
validation – Optional validation column name (string) where every row in the column is either 1/True or 0/False, or validation split (float) giving percent of data to be randomly selected for validation.
label_cols – Column names used as labels. Must be a list with one label for each output of the model.
batch_size – Number of rows from the DataFrame per batch.
epochs – Number of epochs to train.
verbose – Verbosity level [0, 2] (default: 1).
shuffle_buffer_size – Optional size of in-memory shuffle buffer in rows. Allocating a larger buffer size increases randomness of shuffling at the cost of more host memory. Defaults to estimating with an assumption of 4GB of memory per host.
partitions_per_process – Number of Parquet partitions to assign per worker process from num_proc (default: 10).
run_id – Optional unique ID for this run for organization in the Store. Will be automatically assigned if not provided.
train_minibatch_fn – Optional custom function to execute within the training loop. Defaults to standard gradient descent process.
train_steps_per_epoch – Number of steps to train each epoch. Useful for testing that model trains successfully. Defaults to training the entire dataset each epoch.
validation_steps_per_epoch – Number of validation steps to perform each epoch.
transformation_fn – Optional function that takes a row as its parameter and returns a modified row that is then fed into the train or validation step. This transformation is applied after batching. See Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py) for more details. Note that this fucntion constructs another function which should perform the transformation.
train_reader_num_workers – This parameter specifies the number of parallel processes that read the training data from data store and apply data transformations to it. Increasing this number will generally increase the reading rate but will also increase the memory footprint. More processes are particularly useful if the bandwidth to the data store is not high enough, or users need to apply transformation such as decompression or data augmentation on raw data.
val_reader_num_workers – Similar to the train_reader_num_workers.
-
class
horovod.spark.torch.
TorchModel
[source]¶ Bases:
horovod.spark.common.estimator.HorovodModel
,horovod.spark.torch.estimator.TorchEstimatorParamsWritable
,horovod.spark.torch.estimator.TorchEstimatorParamsReadable
Spark Transformer wrapping a PyTorch model, used for making predictions on a DataFrame.
Retrieve the underlying PyTorch model by calling torch_model.getModel().
- Parameters
history – List of metrics, one entry per epoch during training.
model – Trained PyTorch model.
feature_columns – List of feature column names.
label_columns – List of label column names.
optimizer – PyTorch optimizer used during training, containing updated state.
run_id – ID of the run used to train the model.
loss – PyTorch loss(es).
loss_constructors – PyTorch loss constructors.
horovod.spark.common¶
-
class
horovod.spark.common.estimator.
HorovodEstimator
[source]¶
-
class
horovod.spark.common.estimator.
HorovodModel
[source]¶ -
transform
(df, params=None)[source]¶ Transforms the input dataset with prediction columns representing model predictions.
Prediction column names default to <label_column>__output. Override column names by calling transformer.setOutputCols(col_names).
- Parameters
df – Input dataset, which is an instance of
pyspark.sql.DataFrame
.params – An optional param map that overrides embedded params.
- Returns
Transformed dataset.
-
-
class
horovod.spark.common.backend.
Backend
[source]¶ Bases:
object
Interface for remote execution of the distributed training function.
A custom backend can be used in cases where the training environment running Horovod is different from the Spark application running the HorovodEstimator.
-
run
(fn, args=(), kwargs={}, env=None)[source]¶ Executes the training fn and returns results from each worker in a list (ordered by ascending rank).
- Parameters
fn – Function to run.
args – Arguments to pass to fn.
kwargs – Keyword arguments to pass to fn.
env – Environment dictionary to use in Horovod run. Defaults to os.environ.
- Returns
List of results returned by running fn on each rank.
-
-
class
horovod.spark.common.backend.
SparkBackend
(num_proc=None, env=None, **kwargs)[source]¶ Bases:
horovod.spark.common.backend.Backend
Uses horovod.spark.run to execute the distributed training fn.
-
run
(fn, args=(), kwargs={}, env=None)[source]¶ Executes the training fn and returns results from each worker in a list (ordered by ascending rank).
- Parameters
fn – Function to run.
args – Arguments to pass to fn.
kwargs – Keyword arguments to pass to fn.
env – Environment dictionary to use in Horovod run. Defaults to os.environ.
- Returns
List of results returned by running fn on each rank.
-
-
class
horovod.spark.common.store.
Store
[source]¶ Bases:
object
Storage layer for intermediate files (materialized DataFrames) and training artifacts (checkpoints, logs).
Store provides an abstraction over a filesystem (e.g., local vs HDFS) or blob storage database. It provides the basic semantics for reading and writing objects, and how to access objects with certain definitions.
The store exposes a generic interface that is not coupled to a specific DataFrame, model, or runtime. Every run of an Estimator should result in a separate run directory containing checkpoints and logs, and every variation in dataset should produce a separate intermediate data path.
In order to allow for caching but to prevent overuse of disk space on intermediate data, intermediate datasets are named in a deterministic sequence. When a dataset is done being used for training, the intermediate files can be reclaimed to free up disk space, but will not be automatically removed so that they can be reused as needed. This is to support both parallel training processes using the same store on multiple DataFrames, as well as iterative training using the same DataFrame on different model variations.
-
class
horovod.spark.common.store.
FilesystemStore
(prefix_path, train_path=None, val_path=None, test_path=None, runs_path=None, save_runs=True)[source]¶ Bases:
horovod.spark.common.store.Store
Abstract class for stores that use a filesystem for underlying storage.
-
class
horovod.spark.common.store.
LocalStore
(prefix_path, *args, **kwargs)[source]¶ Bases:
horovod.spark.common.store.FilesystemStore
Uses the local filesystem as a store of intermediate data and training artifacts.
-
class
horovod.spark.common.store.
HDFSStore
(prefix_path, host=None, port=None, user=None, kerb_ticket=None, driver='libhdfs', extra_conf=None, temp_dir=None, *args, **kwargs)[source]¶ Bases:
horovod.spark.common.store.FilesystemStore
Uses HDFS as a store of intermediate data and training artifacts.
Initialized from a prefix_path that can take one of the following forms:
“hdfs://namenode01:8020/user/test/horovod”
“hdfs:///user/test/horovod”
“/user/test/horovod”
The full path (including prefix, host, and port) will be used for all reads and writes to HDFS through Spark. If host and port are not provided, they will be omitted. If prefix is not provided (case 3), it will be prefixed to the full path regardless.
The localized path (without prefix, host, and port) will be used for interaction with PyArrow. Parsed host and port information will be used to initialize PyArrow HadoopFilesystem if they are not provided through the host and port arguments to this initializer. These parameters will default to default and 0 if neither the path URL nor the arguments provide this information.
horovod.run¶
-
horovod.run.
run
(func, args=(), kwargs=None, np=1, hosts=None, hostfile=None, start_timeout=None, ssh_port=None, disable_cache=None, output_filename=None, verbose=None, use_gloo=None, use_mpi=None, mpi_args=None, network_interface=None)[source]¶ Launch a Horovod job to run the specified process function and get the return value.
- Parameters
func – The function to be run in Horovod job processes. The function return value will be collected as the corresponding Horovod process return value. This function must be compatible with pickle.
args – Arguments to pass to func.
kwargs – Keyword arguments to pass to func.
np – Number of Horovod processes.
hosts – List of host names and the number of available slots for running processes on each, of the form: <hostname>:<slots> (e.g.: host1:2,host2:4,host3:1 indicating 2 processes can run on host1, 4 on host2, and 1 on host3). If not specified, defaults to using localhost:<np>
hostfile – Path to a host file containing the list of host names and the number of available slots. Each line of the file must be of the form: <hostname> slots=<slots>
start_timeout – Horovodrun has to perform all the checks and start the processes before the specified timeout. The default value is 30 seconds. Alternatively, The environment variable HOROVOD_START_TIMEOUT can also be used to specify the initialization timeout.
ssh_port – SSH port on all the hosts.
disable_cache – If the flag is not set, horovodrun will perform the initialization checks only once every 60 minutes – if the checks successfully pass. Otherwise, all the checks will run every time horovodrun is called.’
output_filename – For Gloo, writes stdout / stderr of all processes to a filename of the form <output_filename>/rank.<rank>/<stdout | stderr>. The <rank> will be padded with 0 characters to ensure lexicographical order. For MPI, delegates its behavior to mpirun.
verbose – If this flag is set, extra messages will be printed.
use_gloo – Run Horovod using the Gloo controller. This will be the default if Horovod was not built with MPI support.
use_mpi – Run Horovod using the MPI controller. This will be the default if Horovod was built with MPI support.
mpi_args – Extra arguments for the MPI controller. This is only used when use_mpi is True.
network_interface – Network interfaces to use for communication separated by comma. If not specified, Horovod will find the common NICs among all the workers and use those; example, eth0,eth1.
- Returns
Return a list which contains values return by all Horovod processes. The index of the list corresponds to the rank of each Horovod process.