API¶
horovod.tensorflow¶
-
class
horovod.tensorflow.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
horovod.tensorflow.compression.NoneCompressor
-
fp16
¶ alias of
horovod.tensorflow.compression.FP16Compressor
-
-
horovod.tensorflow.
allgather_object
(obj, session=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ Serializes and allgathers an object from all other processes.
- Parameters
obj – An object capable of being serialized without losing any context.
session – Session for TensorFlow v1 compatibility.
name – Optional name to use during allgather, will default to the class type.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
The list of objects that were allgathered across all ranks.
-
horovod.tensorflow.
broadcast_object
(obj, root_rank=0, session=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ Serializes and broadcasts an object from root rank to all other processes in a process set (defaults to all Horovod processes).
- Parameters
obj – An object capable of being serialized without losing any context.
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
session – Session for TensorFlow v1 compatibility.
name – Optional name to use during broadcast, will default to the class type.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
The object that was broadcast from the root_rank.
-
horovod.tensorflow.
broadcast_variables
(variables, root_rank, process_set=<horovod.common.process_sets.ProcessSet object>, inplace=False)[source]¶ Broadcasts variables from root rank to all other processes in a process set (defaults to all Horovod processes).
Optionally, the broadcast may be performed in-place, which avoids temporary memory allocations and fragmentation. This is only supported with TensorFlow 2.6 or later. Reference variables (legacy support in TF 2) must all be of the same data type. There is no such restriction for resource variables (default in TF 2).
- Parameters
variables – variables for broadcast
root_rank – rank of the process from which global variables will be broadcasted to all other processes.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
inplace – whether to perform in-place broadcasts
-
horovod.tensorflow.
allgather
(tensor, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ An op which concatenates the input tensor with the same input tensor on all other Horovod processes.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Returns
A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes.
-
horovod.tensorflow.
grouped_allgather
(tensors, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ An op which concatenates the input tensor with the same input tensor on all other Horovod processes.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Returns
A list of tensors of the same rank and type as tensor, concatenated on dimension zero across all processes. For each returned tensor the shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes.
-
horovod.tensorflow.
broadcast
(tensor, root_rank, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ An op which broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes.
The broadcast operation is keyed by the name of the op. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.tensorflow.
broadcast_
(variables, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ An op which broadcasts the input variables from the root rank to the same input variables on all other Horovod processes. The operation is performed in-place.
The broadcast operation is keyed by the name of the op combined with the names of the variables. The variable type and shape must be the same on all Horovod processes for any given name. The broadcast will not start until all processes are ready to send and receive all variables. In each process all variables need to be located on the same device (CPU or GPU).
Note: This is only supported with TensorFlow 2.6 or later.
- Returns
The tensor values of the updated variables as broadcasted from root rank.
-
horovod.tensorflow.
alltoall
(tensor, splits=None, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ An op that scatters slices of the input tensor to all other Horovod processes and returns a tensor of gathered slices from all other Horovod processes.
The slicing is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensor – A tensor to distribute with alltoall.
splits – A tensor of integers in rank order describing how many elements in tensor to send to each worker. Splitting is applied along the first dimension of tensor. If splits is not provided, the first dimension is split equally by the number of Horovod processes.
name – A name of the alltoall operation.
ignore_name_scope – If True, ignores any outer name scope applied by TensorFlow in the name used by the Horovod operation.
- Returns
A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the gathered tensor slices from different Horovod processes.
If splits has been provided: A tensor of integers in rank order describing how many elements in the output tensor have been received from each worker.
-
horovod.tensorflow.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.tensorflow.
is_initialized
()¶ Returns True if Horovod is initialized
-
horovod.tensorflow.
start_timeline
(file_path, mark_cycles=False)¶ Creates a timeline file at file_path and begins recording.
- Parameters
file_path – String path to the timeline file.
mark_cycles – Boolean indicating that cycles should be marked on the timeline (default: False).
Raises a ValueError if Horovod is not initialized.
-
horovod.tensorflow.
stop_timeline
()¶ Stops the active timeline recording and closes the file.
Raises a ValueError if Horovod is not initialized.
-
horovod.tensorflow.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.tensorflow.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.tensorflow.
cross_size
()¶ A function that returns the number of nodes for the local rank of the current Horovod process. For example, if there are 2 nodes in the job: one running 2 processes and the other running 1 process, then the first process on each node will have cross size 2, and the second process on the first node will have cross size 1.
- Returns
An integer scalar containing the number of cross Horovod processes.
-
horovod.tensorflow.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.tensorflow.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.tensorflow.
cross_rank
()¶ A function that returns the cross Horovod rank of the calling process, across the nodes in the job. The cross rank of a process corresponds to the rank of the node its is running on. For example, if there are 7 nodes in a job, the cross ranks will be zero through six, inclusive.
- Returns
An integer scalar with the cross Horovod rank of the calling process.
-
horovod.tensorflow.
is_homogeneous
()¶ Returns True if the cluster is homogeneous.
- Returns
A boolean value indicating whether every node in the cluster has same number of ranks.
-
horovod.tensorflow.
rank_op
(name=None)[source]¶ An op that returns the Horovod rank of the calling process.
This operation determines the return value at the graph execution time, rather than at the graph construction time, and so allows for a graph to be constructed in a different environment than where it will be executed.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.tensorflow.
local_rank_op
(name=None)[source]¶ An op that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
This operation determines the return value at the graph execution time, rather than at the graph construction time, and so allows for a graph to be constructed in a different environment than where it will be executed.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.tensorflow.
size_op
(process_set_id=0, name=None)[source]¶ An op that returns the number of Horovod processes.
This operation determines the return value at the graph execution time, rather than at the graph construction time, and so allows for a graph to be constructed in a different environment than where it will be executed.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.tensorflow.
local_size_op
(name=None)[source]¶ An op that returns the number of Horovod processes within the node the current process is running on.
This operation determines the return value at the graph execution time, rather than at the graph construction time, and so allows for a graph to be constructed in a different environment than where it will be executed.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.tensorflow.
process_set_included_op
(process_set_id=0, name=None)[source]¶ An op that 0 or 1 depending on whether the current process is included in the specified process set or an error code: HOROVOD_PROCESS_SET_ERROR_INIT if Horovod is not initialized, HOROVOD_PROCESS_SET_ERROR_UNKNOWN_SET if the process set is unknown.
This operation determines the return value at the graph execution time, rather than at the graph construction time, and so allows for a graph to be constructed in a different environment than where it will be executed.
- Returns
An integer scalar with value 0, 1, or an error code.
-
horovod.tensorflow.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.tensorflow.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.tensorflow.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.tensorflow.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.tensorflow.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.tensorflow.
nccl_built
()¶ Function to check if Horovod was compiled with NCCL support.
- Returns
An integer value indicating whether NCCL support was compiled. If NCCL support was compiled, returns NCCL_VERSION_CODE. Otherwise, returns 0.
-
horovod.tensorflow.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.tensorflow.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
horovod.tensorflow.
cuda_built
()¶ Returns True if Horovod was compiled with CUDA support.
- Returns
A boolean value indicating whether CUDA support was compiled.
-
horovod.tensorflow.
rocm_built
()¶ Returns True if Horovod was compiled with ROCm support.
- Returns
A boolean value indicating whether ROCm support was compiled.
-
class
horovod.tensorflow.
ProcessSet
(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[source]¶ Representation of a set of Horovod processes that will run collective operations together
Initialize a ProcessSet with a list of process ranks or an MPI communicator. Then pass this instance to hvd.init() or hvd.add_process_set(). If a valid process set has been initialized, process_set_id will be set to a numeric value.
-
rank
() → Optional[int][source]¶ Return rank relative to this process set or None if not initialized.
This is useful, e.g., to process the result of hvd.allgather().
Please note that, even with process sets, Horovod operations like hvd.broadcast() are not parameterized by this relative rank, but by the global rank as obtained from hvd.rank().
-
-
horovod.tensorflow.
add_process_set
(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]]) → horovod.common.process_sets.ProcessSet[source]¶ Add a new process_set after Horovod initialization and return it.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. No process set containing the same ranks may exist already. The returned process set will be fully initialized.
-
horovod.tensorflow.
remove_process_set
(process_set: horovod.common.process_sets.ProcessSet) → bool[source]¶ Attempt to remove process set and return whether this attempt is successful.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. If removal is successful, we will invalidate the process_set object.
-
horovod.tensorflow.
join
()[source]¶ An op to indicate that the rank finished processing data.
All ranks that did not call join() continue to process allreduce operations. This op is not done before all ranks have joined.
- Returns
An integer scalar containing the the last rank that joined.
-
class
horovod.tensorflow.
LocalGradientAggregationHelper
(backward_passes_per_step, allreduce_func, sparse_as_dense, average_aggregated_gradients, rank, optimizer_type, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True, name='')[source]¶ LocalGradientAggregationHelper aggregates gradient updates locally, and communicates the updates across machines only once every backward_passes_per_step. Only supports graph mode execution.
-
register_local_var
(var)[source]¶ Registers a source/variable as worker local. Horovod will not perform any global operations on gradients corresponding to these sources and will instead return the local gradient.
-
-
horovod.tensorflow.
allreduce
(tensor, average=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, name=None, process_set=<horovod.common.process_sets.ProcessSet object>, ignore_name_scope=False)[source]¶ Perform an allreduce on a tf.Tensor or tf.IndexedSlices.
This function performs a bandwidth-optimal ring allreduce on the input tensor. If the input is an tf.IndexedSlices, the function instead does an allgather on the values and the indices, effectively doing an allreduce on the represented tensor.
- Parameters
tensor – tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce. The shape of the input must be identical across all ranks.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
name – A name of the allreduce operation
ignore_name_scope – If True, ignores any outer name scope applied by TensorFlow in the name used by the Horovod operation.
- Returns
A tensor of the same shape and type as tensor, summed across all processes.
-
horovod.tensorflow.
reducescatter
(tensor, device_dense='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, name=None, process_set=<horovod.common.process_sets.ProcessSet object>, ignore_name_scope=False, prescale_factor=1.0, postscale_factor=1.0)[source]¶ Perform a reducescatter on a tf.Tensor.
This function performs a bandwidth-optimal reduce and scatter on the input tensor.
- Parameters
tensor – tf.Tensor or tf.Variable to reduce. The shape of the input must be identical across all ranks.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_REDUCESCATTER.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Defaults to Average.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
name – A name of the reduce_scatter operation
ignore_name_scope – If True, ignores any outer name scope applied by TensorFlow in the name used by the Horovod operation.
prescale_factor – Multiplicative factor to scale tensor before reducescatter.
postscale_factor – Multiplicative factor to scale tensor after reducescatter.
- Returns
A tensor of the same rank and type as tensor, summed across all processes. The shape is identical to the input shape, except for the first dimension, which will be divided across the different Horovod processes.
-
horovod.tensorflow.
grouped_allreduce
(tensors, average=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, ignore_name_scope=False, name=None)[source]¶ Perform grouped allreduces on a sequence of tf.Tensor or tf.IndexedSlices.
- Parameters
tensors – Sequence of tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce. The tensor type and shape must be the same on all Horovod processes for tensors sharing positions in tensors.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensors before allreduce.
postscale_factor – Multiplicative factor to scale tensors after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
name – A name of the reduce_scatter operation
ignore_name_scope – If True, ignores any outer name scope applied by TensorFlow in the name used by the Horovod operation.
- Returns
A list of tensors of the same shape and type as those in tensors, reduced across all processes.
-
horovod.tensorflow.
grouped_reducescatter
(tensors, device_dense='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[source]¶ Perform grouped reducescatters on a sequence of tf.Tensor.
- Parameters
tensors – Sequence of tf.Tensor or tf.Variable to reduce. The shape must be the same on all Horovod processes for inputs sharing positions in tensors.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
prescale_factor – Multiplicative factor to scale tensors before reducescatter.
postscale_factor – Multiplicative factor to scale tensors after reducescatter.
- Returns
A list of tensors of the same rank and type as those in tensors, reduced across all processes. For each returned tensor the shape is identical to the corresponding input shape, except for the first dimension, which will be divided across the different Horovod processes.
-
horovod.tensorflow.
broadcast_global_variables
(root_rank)[source]¶ Broadcasts all global variables from root rank to all other processes.
NOTE: deprecated in TensorFlow 2.0.
- Parameters
root_rank – rank of the process from which global variables will be broadcasted to all other processes.
-
class
horovod.tensorflow.
BroadcastGlobalVariablesHook
(*args, **kw)[source]¶ SessionRunHook that will broadcast all global variables from root rank to all other processes during initialization.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
NOTE: deprecated in TensorFlow 2.0.
-
horovod.tensorflow.
DistributedOptimizer
(optimizer, name=None, use_locking=False, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, backward_passes_per_step=1, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, gradient_predivide_factor=1.0, average_aggregated_gradients=False, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True)[source]¶ Construct a new DistributedOptimizer, which uses another optimizer under the hood for computing single-process gradient values and applying gradient updates after the gradient values have been combined across all the Horovod ranks.
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
use_locking – Whether to use locking when updating variables. See Optimizer.__init__ for more info.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
compression – Compression algorithm used during allreduce to reduce the amount of data sent during each parameter update step. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
backward_passes_per_step – Number of backward passes to perform before calling hvd.allreduce. This allows accumulating updates over multiple mini-batches before reducing and applying them.
op – The reduction operation to use when combining gradients across different ranks.
gradient_predivide_factor – If op == Average, gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
average_aggregated_gradients – Whether to average the aggregated gradients that have been accumulated over multiple mini-batches. If true divides gradients updates by backward_passes_per_step. Only applicable for backward_passes_per_step > 1.
num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of tf.Variable. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of tf.Variable. Variables in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
process_set – Gradients will only be reduced over Horovod processes belonging to this process set. Defaults to the global process set.
scale_local_gradients – Whether to scale the gradients of local variables. Default is set to True.
-
horovod.tensorflow.
DistributedGradientTape
(gradtape, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, gradient_predivide_factor=1.0, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True)[source]¶ A tape that wraps another tf.GradientTape, using an allreduce to combine gradient values before applying gradients to model weights.
- Parameters
gradtape – GradientTape to use for computing gradients and applying updates.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was built with HOROVOD_GPU_OPERATIONS.
compression – Compression algorithm used during allreduce to reduce the amount of data sent during each parameter update step. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
op – The reduction operation to use when combining gradients across different ranks.
gradient_predivide_factor – If op == Average, gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of tf.Variable. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of tf.Variable. Variables in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
process_set – Gradients will only be reduced over Horovod processes belonging to this process set. Defaults to the global process set.
scale_local_gradients – Whether to scale the gradients of local variables. Default is set to True.
-
horovod.tensorflow.
PartialDistributedGradientTape
(gradtape, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, gradient_predivide_factor=1.0, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, local_layers=None, scale_local_gradients=True)[source]¶ A tape that wraps another tf.GradientTape, using an allreduce to combine gradient values before applying gradients to model weights similar to DistributedGradientTape execpt it skips allreducing gradients of the local layers passed in local_layers parameter.
- Parameters
gradtape – GradientTape to use for computing gradients and applying updates.
local_layers – A collection of type tf.keras.layers.Layer local layers that their gradients need not to be synced accross ranks and is kept and applied locally. If not provided, the functionality of PartialDistributedGradientTape is identical to DistributedGradientTape.
The rest of the arguments are similar to those of DistributedGradientTape.
-
class
horovod.tensorflow.elastic.
TensorFlowKerasState
(model, optimizer=None, backend=None, **kwargs)[source]¶ State representation of a TensorFlow Keras model and optimizer.
Supports TensorFlow 2 models and optimizers, as well as keras and tf.keras.
- Parameters
model – TensorFlow Keras model.
optimizer – Optional optimizer, can be compiled into model instead.
backend – For TensorFlow v1, backend used by Keras for obtaining the session.
kwargs – Additional properties to sync, will be exposed as attributes of the object.
-
class
horovod.tensorflow.elastic.
TensorFlowState
(variables=None, session=None, **kwargs)[source]¶ State representation of a list of TensorFlow variables.
Supports both TensorFlow v1 and v2. For TensorFlow v2, can only be used when eager execution is enabled.
- Parameters
variables – List of tf.Variable objects to track (default: tf.global_variables()).
session – For TensorFlow v1, session used to materialize variables (default: ops.get_default_session()).
kwargs – Additional properties to sync, will be exposed as attributes of the object.
-
horovod.tensorflow.elastic.
run
(func)[source]¶ Decorator used to run the elastic training process.
The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.
When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.
It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.
- Parameters
func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.
horovod.tensorflow.keras¶
-
horovod.tensorflow.keras.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.tensorflow.keras.
is_initialized
()¶ Returns True if Horovod is initialized
-
horovod.tensorflow.keras.
start_timeline
(file_path, mark_cycles=False)¶ Creates a timeline file at file_path and begins recording.
- Parameters
file_path – String path to the timeline file.
mark_cycles – Boolean indicating that cycles should be marked on the timeline (default: False).
Raises a ValueError if Horovod is not initialized.
-
horovod.tensorflow.keras.
stop_timeline
()¶ Stops the active timeline recording and closes the file.
Raises a ValueError if Horovod is not initialized.
-
horovod.tensorflow.keras.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.tensorflow.keras.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.tensorflow.keras.
cross_size
()¶ A function that returns the number of nodes for the local rank of the current Horovod process. For example, if there are 2 nodes in the job: one running 2 processes and the other running 1 process, then the first process on each node will have cross size 2, and the second process on the first node will have cross size 1.
- Returns
An integer scalar containing the number of cross Horovod processes.
-
horovod.tensorflow.keras.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.tensorflow.keras.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.tensorflow.keras.
cross_rank
()¶ A function that returns the cross Horovod rank of the calling process, across the nodes in the job. The cross rank of a process corresponds to the rank of the node its is running on. For example, if there are 7 nodes in a job, the cross ranks will be zero through six, inclusive.
- Returns
An integer scalar with the cross Horovod rank of the calling process.
-
horovod.tensorflow.keras.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.tensorflow.keras.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.tensorflow.keras.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.tensorflow.keras.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.tensorflow.keras.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
class
horovod.tensorflow.keras.
ProcessSet
(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[source]¶ Representation of a set of Horovod processes that will run collective operations together
Initialize a ProcessSet with a list of process ranks or an MPI communicator. Then pass this instance to hvd.init() or hvd.add_process_set(). If a valid process set has been initialized, process_set_id will be set to a numeric value.
-
rank
() → Optional[int][source]¶ Return rank relative to this process set or None if not initialized.
This is useful, e.g., to process the result of hvd.allgather().
Please note that, even with process sets, Horovod operations like hvd.broadcast() are not parameterized by this relative rank, but by the global rank as obtained from hvd.rank().
-
-
horovod.tensorflow.keras.
add_process_set
(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]]) → horovod.common.process_sets.ProcessSet[source]¶ Add a new process_set after Horovod initialization and return it.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. No process set containing the same ranks may exist already. The returned process set will be fully initialized.
-
horovod.tensorflow.keras.
remove_process_set
(process_set: horovod.common.process_sets.ProcessSet) → bool[source]¶ Attempt to remove process set and return whether this attempt is successful.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. If removal is successful, we will invalidate the process_set object.
-
horovod.tensorflow.keras.
nccl_built
()¶ Function to check if Horovod was compiled with NCCL support.
- Returns
An integer value indicating whether NCCL support was compiled. If NCCL support was compiled, returns NCCL_VERSION_CODE. Otherwise, returns 0.
-
horovod.tensorflow.keras.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.tensorflow.keras.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
horovod.tensorflow.keras.
cuda_built
()¶ Returns True if Horovod was compiled with CUDA support.
- Returns
A boolean value indicating whether CUDA support was compiled.
-
horovod.tensorflow.keras.
rocm_built
()¶ Returns True if Horovod was compiled with ROCm support.
- Returns
A boolean value indicating whether ROCm support was compiled.
-
class
horovod.tensorflow.keras.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
horovod.tensorflow.compression.NoneCompressor
-
fp16
¶ alias of
horovod.tensorflow.compression.FP16Compressor
-
-
horovod.tensorflow.keras.
DistributedOptimizer
(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, backward_passes_per_step=1, average_aggregated_gradients=False, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True)[source]¶ An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to average gradient values before applying gradients to model weights.
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
gradient_predivide_factor – gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
op – The reduction operation to use when combining gradients across different ranks. Defaults to Average.
backward_passes_per_step – Number of backward passes to perform before calling hvd.allreduce. This allows accumulating updates over multiple mini-batches before reducing and applying them.
average_aggregated_gradients – Whether to average the aggregated gradients that have been accumulated over multiple mini-batches. If true divides gradient updates by backward_passes_per_step. Only applicable for backward_passes_per_step > 1.
num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of tf.Variable. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of tf.Variable. Variables in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
process_set – Gradients will only be reduced over Horovod processes belonging to this process set. Defaults to the global process set.
scale_local_gradients – Whether to scale the gradients of local variables. Default is set to True.
-
horovod.tensorflow.keras.
broadcast_global_variables
(root_rank)[source]¶ Broadcasts all global variables from root rank to all other processes.
- Parameters
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
-
horovod.tensorflow.keras.
allreduce
(value, name=None, average=None, prescale_factor=1.0, postscale_factor=1.0, op=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[source]¶ Perform an allreduce on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
name – Optional name for the constants created by this operation.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v0.21.0.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
op – The reduction operation to combine tensors across different ranks. Defaults to Average if None is given.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
-
horovod.tensorflow.keras.
allgather
(value, name=None)[source]¶ Perform an allgather on a tensor-compatible value.
The concatenation is done on the first dimension, so the input values on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
value – A tensor-compatible value to gather.
name – Optional name prefix for the constants created by this operation.
-
horovod.tensorflow.keras.
broadcast
(value, root_rank, name=None)[source]¶ Perform a broadcast on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
name – Optional name for the constants created by this operation.
-
horovod.tensorflow.keras.
reducescatter
(value, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>)[source]¶ Perform a reducescatter on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce and scatter. The shape of the input must be identical across all ranks.
name – Optional name for the constants created by this operation.
op – The reduction operation to combine tensors across different ranks. Defaults to Average.
-
horovod.tensorflow.keras.
load_model
(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, legacy_opts=False)[source]¶ Loads a saved Keras model with a Horovod DistributedOptimizer.
The DistributedOptimizer will wrap the underlying optimizer used to train the saved model, so that the optimizer state (params and weights) will be picked up for retraining.
By default, all optimizers in the module keras.optimizers will be loaded and wrapped without needing to specify any custom_optimizers or custom_objects.
- Parameters
filepath – One of the following: - string, path to the saved model, or - h5py.File object from which to load the model
custom_optimizers – Optional list of Optimizer subclasses to support during loading.
custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
legacy_opts – If True, model uses tf.keras.optimizers.legacy.* optimizers
- Returns
A Keras model instance.
- Raises
ImportError – If h5py is not available.
ValueError – In case of an invalid savefile.
-
class
horovod.tensorflow.keras.callbacks.
BroadcastGlobalVariablesCallback
(*args, **kw)[source]¶ Keras Callback that will broadcast all global variables from root rank to all other processes during initialization.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
-
__init__
(root_rank, device='', local_variables=None)[source]¶ Construct a new BroadcastGlobalVariablesCallback that will broadcast all global variables from root rank to all other processes during initialization.
- Parameters
root_rank – Rank that will send data, other ranks will receive data.
device – Device to be used for broadcasting. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
local_variables – A collection of variables that need not to be broadcasted.
-
-
class
horovod.tensorflow.keras.callbacks.
MetricAverageCallback
(*args, **kw)[source]¶ Keras Callback that will average metrics across all processes at the end of the epoch. Useful in conjuction with ReduceLROnPlateau, TensorBoard and other metrics-based callbacks.
Note: This callback must be added to the callback list before the ReduceLROnPlateau, TensorBoard or other metrics-based callbacks.
-
class
horovod.tensorflow.keras.callbacks.
LearningRateScheduleCallback
(*args, **kw)[source]¶ LearningRateScheduleCallback sets learning rate between epochs start_epoch and end_epoch to be initial_lr * multiplier. multiplier can be a constant or a function f(epoch) = lr’.
If multiplier is a function and staircase=True, learning rate adjustment will happen at the beginning of each epoch and the epoch passed to the multiplier function will be an integer.
If multiplier is a function and staircase=False, learning rate adjustment will happen at the beginning of each batch and the epoch passed to the multiplier function will be a floating number: epoch’ = epoch + batch / steps_per_epoch. This functionality is useful for smooth learning rate adjustment schedulers, such as LearningRateWarmupCallback.
initial_lr is the learning rate of the model optimizer at the start of the training.
-
__init__
(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[source]¶ Construct a new LearningRateScheduleCallback.
- Parameters
initial_lr – Initial learning rate at the start of training.
multiplier – A constant multiplier or a function f(epoch) = lr’
start_epoch – The first epoch this adjustment will be applied to. Defaults to 0.
end_epoch – The epoch this adjustment will stop applying (exclusive end). Defaults to None.
staircase – Whether to adjust learning rate at the start of epoch (staircase=True) or at the start of every batch (staircase=False).
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
-
-
class
horovod.tensorflow.keras.callbacks.
LearningRateWarmupCallback
(*args, **kw)[source]¶ Implements gradual learning rate warmup:
lr = initial_lr / hvd.size() —> lr = initial_lr
initial_lr is the learning rate of the model optimizer at the start of the training.
This technique was described in the paper “Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour”. See https://arxiv.org/pdf/1706.02677.pdf for details.
Math recap:
\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]-
__init__
(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[source]¶ Construct a new LearningRateWarmupCallback that will gradually warm up the learning rate.
- Parameters
initial_lr – Initial learning rate at the start of training.
warmup_epochs – The number of epochs of the warmup phase. Defaults to 5.
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
verbose – verbosity mode, 0 or 1.
-
-
class
horovod.tensorflow.keras.elastic.
KerasState
(model, optimizer=None, **kwargs)[source]¶ State representation of a tf.keras model and optimizer.
- Parameters
model – Keras model.
optimizer – Optional optimizer, can be compiled into model instead.
kwargs – Additional properties to sync, will be exposed as attributes of the object.
-
class
horovod.tensorflow.keras.elastic.
CommitStateCallback
(*args, **kw)[source]¶ Keras Callback that will commit the state object every batches_per_commit batches at the end of each batch.
-
class
horovod.tensorflow.keras.elastic.
UpdateBatchStateCallback
(*args, **kw)[source]¶ Keras Callback that will update the value of state.batch with the current batch number at the end of each batch. Batch will reset to 0 at the end of each epoch.
If steps_per_epoch is set, then this callback will also ensure that the number of steps in the first epoch following a reset is shortened by the number of batches already processed.
-
class
horovod.tensorflow.keras.elastic.
UpdateEpochStateCallback
(*args, **kw)[source]¶ Keras Callback that will update the value of state.epoch with the current epoch number at the end of each epoch.
-
horovod.tensorflow.keras.elastic.
run
(func)[source]¶ Decorator used to run the elastic training process.
The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.
When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.
It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.
- Parameters
func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.
horovod.keras¶
-
horovod.keras.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.keras.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.keras.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.keras.
cross_size
()¶ A function that returns the number of nodes for the local rank of the current Horovod process. For example, if there are 2 nodes in the job: one running 2 processes and the other running 1 process, then the first process on each node will have cross size 2, and the second process on the first node will have cross size 1.
- Returns
An integer scalar containing the number of cross Horovod processes.
-
horovod.keras.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.keras.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.keras.
cross_rank
()¶ A function that returns the cross Horovod rank of the calling process, across the nodes in the job. The cross rank of a process corresponds to the rank of the node its is running on. For example, if there are 7 nodes in a job, the cross ranks will be zero through six, inclusive.
- Returns
An integer scalar with the cross Horovod rank of the calling process.
-
horovod.keras.
is_initialized
()¶ Returns True if Horovod is initialized
-
horovod.keras.
start_timeline
(file_path, mark_cycles=False)¶ Creates a timeline file at file_path and begins recording.
- Parameters
file_path – String path to the timeline file.
mark_cycles – Boolean indicating that cycles should be marked on the timeline (default: False).
Raises a ValueError if Horovod is not initialized.
-
horovod.keras.
stop_timeline
()¶ Stops the active timeline recording and closes the file.
Raises a ValueError if Horovod is not initialized.
-
horovod.keras.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.keras.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.keras.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.keras.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.keras.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.keras.
nccl_built
()¶ Function to check if Horovod was compiled with NCCL support.
- Returns
An integer value indicating whether NCCL support was compiled. If NCCL support was compiled, returns NCCL_VERSION_CODE. Otherwise, returns 0.
-
horovod.keras.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.keras.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
horovod.keras.
cuda_built
()¶ Returns True if Horovod was compiled with CUDA support.
- Returns
A boolean value indicating whether CUDA support was compiled.
-
horovod.keras.
rocm_built
()¶ Returns True if Horovod was compiled with ROCm support.
- Returns
A boolean value indicating whether ROCm support was compiled.
-
class
horovod.keras.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
horovod.tensorflow.compression.NoneCompressor
-
fp16
¶ alias of
horovod.tensorflow.compression.FP16Compressor
-
-
horovod.keras.
DistributedOptimizer
(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, num_groups=0, groups=None)[source]¶ An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to average gradient values before applying gradients to model weights.
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
name – Optional name prefix for the operations created when applying gradients. Defaults to “Distributed” followed by the provided optimizer type.
device_dense – Device to be used for dense tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
device_sparse – Device to be used for sparse tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
sparse_as_dense – Treat all sparse gradients as dense tensors. This can help improve performance and memory utilization if the original sparse gradient has high density. Defaults to false.
gradient_predivide_factor – gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
op – The reduction operation to use when combining gradients across different ranks. Defaults to Average.
num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of tf.Variable. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of tf.Variable. Variables in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
-
horovod.keras.
PartialDistributedOptimizer
(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, local_layers=None, scale_local_gradients=True)[source]¶ An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to average gradient values before applying gradients to model weights.
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
process_set – Gradients will only be reduced over Horovod processes belonging to this process set. Defaults to the global process set.
local_layers – A collection of type tf.keras.layers.Layer local layers that their gradients need not to be synced accross ranks and is kept and applied locally. If not provided, the functionality of PartialDistributedOptimizer is identical to DistributedOptimizer.
scale_local_gradients – Whether to scale the gradients of local variables. Default is set to True.
rest of the arguments are similar to those of DistributedOptimizer. (The) –
-
horovod.keras.
broadcast_global_variables
(root_rank)[source]¶ Broadcasts all global variables from root rank to all other processes.
- Parameters
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
-
horovod.keras.
allreduce
(value, name=None, average=True, prescale_factor=1.0, postscale_factor=1.0, op=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[source]¶ Perform an allreduce on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
name – Optional name for the constants created by this operation.
average – If True, computes the average over all ranks. Otherwise, computes the sum over all ranks.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
op – The reduction operation to combine tensors across different ranks.
compression – Gradient compression algorithm to be used during allreduce. Defaults to Compression.none.
-
horovod.keras.
allgather
(value, name=None)[source]¶ Perform an allgather on a tensor-compatible value.
The concatenation is done on the first dimension, so the input values on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
value – A tensor-compatible value to gather.
name – Optional name prefix for the constants created by this operation.
-
horovod.keras.
broadcast
(value, root_rank, name=None)[source]¶ Perform a broadcast on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce. The shape of the input must be identical across all ranks.
root_rank – Rank of the process from which global variables will be broadcasted to all other processes.
name – Optional name for the constants created by this operation.
-
horovod.keras.
reducescatter
(value, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055873507440'>)[source]¶ Perform a reducescatter on a tensor-compatible value.
- Parameters
value – A tensor-compatible value to reduce and scatter. The shape of the input must be identical across all ranks.
name – Optional name for the constants created by this operation.
op – The reduction operation to combine tensors across different ranks. Defaults to Average.
-
horovod.keras.
load_model
(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, legacy_opts=False)[source]¶ Loads a saved Keras model with a Horovod DistributedOptimizer.
The DistributedOptimizer will wrap the underlying optimizer used to train the saved model, so that the optimizer state (params and weights) will be picked up for retraining.
By default, all optimizers in the module keras.optimizers will be loaded and wrapped without needing to specify any custom_optimizers or custom_objects.
- Parameters
filepath – One of the following: - string, path to the saved model, or - h5py.File object from which to load the model
custom_optimizers – Optional list of Optimizer subclasses to support during loading.
custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization.
compression – Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
legacy_opts – If True, model uses tf.keras.optimizers.legacy.* optimizers
- Returns
A Keras model instance.
- Raises
ImportError – If h5py is not available.
ValueError – In case of an invalid savefile.
-
class
horovod.keras.callbacks.
BroadcastGlobalVariablesCallback
(*args, **kw)[source]¶ Keras Callback that will broadcast all global variables from root rank to all other processes during initialization.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
-
__init__
(root_rank, device='', local_variables=None)[source]¶ Construct a new BroadcastGlobalVariablesCallback that will broadcast all global variables from root rank to all other processes during initialization.
- Parameters
root_rank – Rank that will send data, other ranks will receive data.
device – Device to be used for broadcasting. Uses GPU by default if Horovod was build with HOROVOD_GPU_OPERATIONS.
local_variables – A collection of variables that need not to be broadcasted.
-
-
class
horovod.keras.callbacks.
MetricAverageCallback
(*args, **kw)[source]¶ Keras Callback that will average metrics across all processes at the end of the epoch. Useful in conjuction with ReduceLROnPlateau, TensorBoard and other metrics-based callbacks.
Note: This callback must be added to the callback list before the ReduceLROnPlateau, TensorBoard or other metrics-based callbacks.
-
class
horovod.keras.callbacks.
LearningRateScheduleCallback
(*args, **kw)[source]¶ LearningRateScheduleCallback sets learning rate between epochs start_epoch and end_epoch to be initial_lr * multiplier. multiplier can be a constant or a function f(epoch) = lr’.
If multiplier is a function and staircase=True, learning rate adjustment will happen at the beginning of each epoch and the epoch passed to the multiplier function will be an integer.
If multiplier is a function and staircase=False, learning rate adjustment will happen at the beginning of each batch and the epoch passed to the multiplier function will be a floating number: epoch’ = epoch + batch / steps_per_epoch. This functionality is useful for smooth learning rate adjustment schedulers, such as LearningRateWarmupCallback.
initial_lr is the learning rate of the model optimizer at the start of the training.
-
__init__
(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[source]¶ Construct a new LearningRateScheduleCallback.
- Parameters
initial_lr – Initial learning rate at the start of training.
multiplier – A constant multiplier or a function f(epoch) = lr’
start_epoch – The first epoch this adjustment will be applied to. Defaults to 0.
end_epoch – The epoch this adjustment will stop applying (exclusive end). Defaults to None.
staircase – Whether to adjust learning rate at the start of epoch (staircase=True) or at the start of every batch (staircase=False).
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
-
-
class
horovod.keras.callbacks.
LearningRateWarmupCallback
(*args, **kw)[source]¶ Implements gradual learning rate warmup:
lr = initial_lr / hvd.size() —> lr = initial_lr
initial_lr is the learning rate of the model optimizer at the start of the training.
This technique was described in the paper “Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour”. See https://arxiv.org/pdf/1706.02677.pdf for details.
Math recap:
\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]-
__init__
(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[source]¶ Construct a new LearningRateWarmupCallback that will gradually warm up the learning rate.
- Parameters
initial_lr – Initial learning rate at the start of training.
warmup_epochs – The number of epochs of the warmup phase. Defaults to 5.
momentum_correction – Apply momentum correction to optimizers that have momentum. Defaults to True.
steps_per_epoch – The callback will attempt to autodetect number of batches per epoch with Keras >= 2.0.0. Provide this value if you have an older version of Keras.
verbose – verbosity mode, 0 or 1.
-
-
class
horovod.keras.elastic.
KerasState
(model, optimizer=None, **kwargs)[source]¶ State representation of a keras model and optimizer.
- Parameters
model – Keras model.
optimizer – Optional optimizer, can be compiled into model instead.
kwargs – Additional properties to sync, will be exposed as attributes of the object.
-
class
horovod.keras.elastic.
CommitStateCallback
(*args, **kw)[source]¶ Keras Callback that will commit the state object every batches_per_commit batches at the end of each batch.
-
class
horovod.keras.elastic.
UpdateBatchStateCallback
(*args, **kw)[source]¶ Keras Callback that will update the value of state.batch with the current batch number at the end of each batch. Batch will reset to 0 at the end of each epoch.
If steps_per_epoch is set, then this callback will also ensure that the number of steps in the first epoch following a reset is shortened by the number of batches already processed.
-
class
horovod.keras.elastic.
UpdateEpochStateCallback
(*args, **kw)[source]¶ Keras Callback that will update the value of state.epoch with the current epoch number at the end of each epoch.
-
horovod.keras.elastic.
run
(func)[source]¶ Decorator used to run the elastic training process.
The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.
When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.
It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.
- Parameters
func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.
horovod.torch¶
-
class
horovod.torch.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
horovod.torch.compression.NoneCompressor
-
fp16
¶ alias of
horovod.torch.compression.FP16Compressor
-
-
horovod.torch.
allgather_object
(obj, name=None)[source]¶ Serializes and allgathers an object from all other processes.
- Parameters
obj – An object capable of being serialized without losing any context.
name – Optional name to use during allgather, will default to the class type.
- Returns
The list of objects that were allgathered across all ranks.
-
horovod.torch.
broadcast_object
(obj, root_rank=0, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ Serializes and broadcasts an object from root rank to all other processes. Typical usage is to broadcast the optimizer.state_dict(), for example:
state_dict = broadcast_object(optimizer.state_dict(), 0) if hvd.rank() > 0: optimizer.load_state_dict(state_dict)
- Parameters
obj – An object capable of being serialized without losing any context.
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
name – Optional name to use during broadcast, will default to the class type.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
The object that was broadcast from the root_rank.
-
horovod.torch.
broadcast_optimizer_state
(optimizer, root_rank, model=None)[source]¶ Broadcasts an optimizer state from root rank to all other processes.
- Parameters
optimizer – An optimizer.
root_rank – The rank of the process from which the optimizer will be broadcasted to all other processes.
model – (Optional) model, used to identify sparse parameters.
-
horovod.torch.
broadcast_parameters
(params, root_rank)[source]¶ Broadcasts the parameters from root rank to all other processes. Typical usage is to broadcast the
model.state_dict()
,model.named_parameters()
, ormodel.parameters()
.- Parameters
params – One of the following: - list of parameters to broadcast - dict of parameters to broadcast
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
-
horovod.torch.
allreduce
(tensor, average=None, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A name of the reduction operation.
compression – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.torch.
allreduce_async
(tensor, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs asynchronous averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A name of the reduction operation.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the allreduce operation that can be used with poll() or synchronize().
-
horovod.torch.
allreduce_
(tensor, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs in-place averaging or summation of the input tensor over all the Horovod processes.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A name of the reduction operation.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.torch.
allreduce_async_
(tensor, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs asynchronous in-place averaging or summation of the input tensor over all the Horovod processes.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A name of the reduction operation.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the allreduce operation that can be used with poll() or synchronize().
-
horovod.torch.
grouped_allreduce
(tensors, average=None, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs averaging or summation of the input tensor list over all the Horovod processes. The input tensors are not modified.
The reduction operations are keyed by the base name. If a base name is not provided, an incremented auto-generated base name is used. Reductions are performed across tensors in the same list position. The tensor type and shape must be the same on all Horovod processes for tensors sharing positions in the input tensor list. The reduction will not start until all processes are ready to send and receive the tensors.
This acts as a thin wrapper around an autograd function. If your input tensors require gradients, then calling this function will allow gradients to be computed and backpropagated.
- Parameters
tensors – A list of tensors to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A base name to use for the group reduction operation.
compression – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A list containing tensors of the same shape and type as in tensors, averaged or summed across all processes.
-
horovod.torch.
grouped_allreduce_async
(tensors, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs asynchronous averaging or summation of the input tensor list over all the Horovod processes. The input tensors are not modified.
The reduction operations are keyed by the base name. If a base name is not provided, an incremented auto-generated base name is used. Reductions are performed across tensors in the same list position. The tensor type and shape must be the same on all Horovod processes for tensors sharing positions in the input tensor list. The reduction will not start until all processes are ready to send and receive the tensors.
- Parameters
tensors – A list of tensors to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A base name to use for the group reduction operation.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the group allreduce operation that can be used with poll() or synchronize().
-
horovod.torch.
grouped_allreduce_
(tensors, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs in-place averaging or summation of the input tensors over all the Horovod processes.
The reduction operations are keyed by the base name. If a base name is not provided, an incremented auto-generated base name is used. Reductions are performed across tensors in the same list position. The tensor type and shape must be the same on all Horovod processes for tensors sharing positions in the input tensor list. The reduction will not start until all processes are ready to send and receive the tensors.
- Parameters
tensors – A list of tensors to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A base name to use for the group reduction operation.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A list containing tensors of the same shape and type as in tensors, averaged or summed across all processes.
-
horovod.torch.
grouped_allreduce_async_
(tensors, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that performs asynchronous in-place averaging or summation of the input tensors over all the Horovod processes.
The reduction operations are keyed by the base name. If a base name is not provided, an incremented auto-generated base name is used. Reductions are performed across tensors in the same list position. The tensor type and shape must be the same on all Horovod processes for tensors sharing positions in the input tensor list. The reduction will not start until all processes are ready to send and receive the tensors.
- Parameters
tensors – A list of tensors to reduce.
average –
Warning
Deprecated since version 0.19.0.
Use op instead. Will be removed in v1.0.
name – A base name to use for the group reduction operation.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
prescale_factor – Multiplicative factor to scale tensor before allreduce.
postscale_factor – Multiplicative factor to scale tensor after allreduce.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the group allreduce operation that can be used with poll() or synchronize().
-
horovod.torch.
allgather
(tensor, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified.
The concatenation is done on the first dimension, so the corresponding input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to allgather.
name – A name of the allgather operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes.
-
horovod.torch.
allgather_async
(tensor, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that asynchronously concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensor – A tensor to allgather.
name – A name of the allgather operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the allgather operation that can be used with poll() or synchronize().
-
horovod.torch.
grouped_allgather
(tensors, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that concatenates each input tensor with the corresponding input tensor on all other Horovod processes for a list of input tensors. The input tensors are not modified.
The concatenation is done on the first dimension, so the corresponding input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensors – A list of tensors to allgather.
name – A base name to use for the group allgather operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A list containing tensors of the same type as in tensors. Each tensor is concatenated on dimension zero across all processes. Its shape is identical to the corresponding input shape, expect for the first dimension, which may be greater and is the sum of all first dimensions of the corresponding tensor in different Horovod processes.
-
horovod.torch.
grouped_allgather_async
(tensors, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that asynchronously concatenates each input tensor with the corresponding input tensor on all other Horovod processes for a list of input tensors. The input tensors are not modified.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensors – A list of tensors to allgather.
name – A base name to use for the group allgather operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the group allgather operation that can be used with poll() or synchronize().
-
horovod.torch.
broadcast
(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.torch.
broadcast_async
(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the broadcast operation that can be used with poll() or synchronize().
-
horovod.torch.
broadcast_
(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.torch.
broadcast_async_
(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the broadcast operation that can be used with poll() or synchronize().
-
horovod.torch.
alltoall
(tensor, splits=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that scatters slices of the input tensor to all other Horovod processes and returns a tensor of gathered slices from all other Horovod processes. The input tensor is not modified.
The slicing is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to distribute with alltoall.
splits – A tensor of integers in rank order describing how many elements in tensor to send to each worker. Splitting is applied along the first dimension of tensor. If splits is not provided, the first dimension is split equally by the number of Horovod processes.
name – A name of the alltoall operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor containing the gathered tensor data from all workers.
If splits has been provided: A tensor of integers in rank order describing how many elements in the output tensor have been received from each worker.
-
horovod.torch.
alltoall_async
(tensor, splits=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that scatters slices of the input tensor to all other Horovod processes and returns a tensor of gathered slices from all other Horovod processes. The input tensor is not modified.
The slicing is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensor – A tensor to distribute with alltoall.
splits – A tensor of integers in rank order describing how many elements in tensor to send to each worker. Splitting is applied along the first dimension of tensor. If splits is not provided, the first dimension is split equally by the number of Horovod processes.
name – A name of the alltoall operation.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A handle to the alltoall operation that can be used with poll() or synchronize().
-
horovod.torch.
reducescatter
(tensor, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055851771312'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[source]¶ A function that performs reduction of the input tensor over all the Horovod processes, then scatters the results across all Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to average/sum and scatter.
name – A name of the reduction operation.
compression – Compression algorithm used during reducescatter to reduce the amount of data sent during the each parameter update step. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Defaults to Average.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
prescale_factor – Multiplicative factor to scale tensor before reducescatter.
postscale_factor – Multiplicative factor to scale tensor after reducescatter.
- Returns
A tensor of the same rank and type as tensor across all processes. The shape is identical to the input shape, except for the first dimension, which will be divided across the different Horovod processes.
-
horovod.torch.
reducescatter_async
(tensor, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055851771312'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[source]¶ A function that performs asynchronous reduction of the input tensor over all the Horovod processes, then scatters the results across all Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
The input tensors on the different processes must have the same rank and shape. The output tensor will be the same rank on all processes, but the first dimension may be different.
- Parameters
tensor – A tensor to average and sum.
name – A name of the reduction operation.
op – The reduction operation to combine tensors across different ranks. Defaults to Average.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
prescale_factor – Multiplicative factor to scale tensor before reducescatter.
postscale_factor – Multiplicative factor to scale tensor after reducescatter.
- Returns
A handle to the reducescatter operation that can be used with poll() or synchronize().
-
horovod.torch.
grouped_reducescatter
(tensors, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055851771312'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[source]¶ A function that performs reduction of a list of input tensors over all the Horovod processes, then scatters the results across all Horovod processes. The input tensors are not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensors – A list of tensors to average and sum.
name – A base name to use for the group reduction operation.
compression – Compression algorithm used during reducescatter to reduce the amount of data sent during the each parameter update step. Defaults to not using compression.
op – The reduction operation to combine tensors across different ranks. Defaults to Average.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
prescale_factor – Multiplicative factor to scale tensors before reducescatter.
postscale_factor – Multiplicative factor to scale tensors after reducescatter.
- Returns
A list containing tensors of the same rank and type as in tensors. For each tensor the shape is identical to the input shape, except for the first dimension, which will be divided across the different Horovod processes.
-
horovod.torch.
grouped_reducescatter_async
(tensors, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055851771312'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[source]¶ A function that performs asynchronous reduction of a list of input tensors over all the Horovod processes, then scatters the results across all Horovod processes. The input tensors are not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. For each of the input tensors, the type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensors.
The input tensor at some place in the list tensor must have the same rank and shape on the different processes. The corresponding output tensor will be the same rank on all processes, but the first dimension may be different.
- Parameters
tensors – A list of tensors to average and sum.
name – A base name to use for the group reduction operation.
op – The reduction operation to combine tensors across different ranks. Defaults to Average.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
prescale_factor – Multiplicative factor to scale tensors before reducescatter.
postscale_factor – Multiplicative factor to scale tensors after reducescatter.
- Returns
A handle to the group reducescatter operation that can be used with poll() or synchronize().
-
horovod.torch.
join
(device=- 1) → int[source]¶ A function that indicates that the rank finished processing data.
All ranks that did not call join() continue to process allreduce operations. This function blocks Python thread until all ranks join.
- Parameters
device – An id of the device to create temprorary zero tensors (default -1, CPU)
- Returns
Id of the rank that joined last.
-
horovod.torch.
barrier
(process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that acts as a simple sychronization point for ranks specified in the given process group(default to global group). Ranks that reach this function call will stall until all other ranks have reached.
- Parameters
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
-
horovod.torch.
poll
(handle)[source]¶ Polls an allreduce, allgather, alltoall, broadcast, or reducescatter handle to determine whether underlying asynchronous operation has completed. After poll() returns True, synchronize() will return without blocking.
- Parameters
handle – A handle returned by an allreduce, allgather, alltoall, broadcast, or reducescatter asynchronous operation.
- Returns
A flag indicating whether the operation has completed.
-
horovod.torch.
synchronize
(handle)[source]¶ Synchronizes an asynchronous allreduce, allgather, alltoall, broadcast, or reducescatter operation until it’s completed. Returns the result of the operation.
- Parameters
handle – A handle returned by an allreduce, allgather, alltoall, broadcast, or reducescatter asynchronous operation.
- Returns
A single output tensor of the operation or a tuple of multiple output tensors.
-
horovod.torch.
is_initialized
()¶ Returns True if Horovod is initialized
-
horovod.torch.
start_timeline
(file_path, mark_cycles=False)¶ Creates a timeline file at file_path and begins recording.
- Parameters
file_path – String path to the timeline file.
mark_cycles – Boolean indicating that cycles should be marked on the timeline (default: False).
Raises a ValueError if Horovod is not initialized.
-
horovod.torch.
stop_timeline
()¶ Stops the active timeline recording and closes the file.
Raises a ValueError if Horovod is not initialized.
-
horovod.torch.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.torch.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.torch.
cross_size
()¶ A function that returns the number of nodes for the local rank of the current Horovod process. For example, if there are 2 nodes in the job: one running 2 processes and the other running 1 process, then the first process on each node will have cross size 2, and the second process on the first node will have cross size 1.
- Returns
An integer scalar containing the number of cross Horovod processes.
-
horovod.torch.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.torch.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.torch.
cross_rank
()¶ A function that returns the cross Horovod rank of the calling process, across the nodes in the job. The cross rank of a process corresponds to the rank of the node its is running on. For example, if there are 7 nodes in a job, the cross ranks will be zero through six, inclusive.
- Returns
An integer scalar with the cross Horovod rank of the calling process.
-
horovod.torch.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.torch.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.torch.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.torch.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.torch.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.torch.
nccl_built
()¶ Function to check if Horovod was compiled with NCCL support.
- Returns
An integer value indicating whether NCCL support was compiled. If NCCL support was compiled, returns NCCL_VERSION_CODE. Otherwise, returns 0.
-
horovod.torch.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.torch.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
horovod.torch.
cuda_built
()¶ Returns True if Horovod was compiled with CUDA support.
- Returns
A boolean value indicating whether CUDA support was compiled.
-
horovod.torch.
rocm_built
()¶ Returns True if Horovod was compiled with ROCm support.
- Returns
A boolean value indicating whether ROCm support was compiled.
-
class
horovod.torch.
ProcessSet
(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[source]¶ Representation of a set of Horovod processes that will run collective operations together
Initialize a ProcessSet with a list of process ranks or an MPI communicator. Then pass this instance to hvd.init() or hvd.add_process_set(). If a valid process set has been initialized, process_set_id will be set to a numeric value.
-
rank
() → Optional[int][source]¶ Return rank relative to this process set or None if not initialized.
This is useful, e.g., to process the result of hvd.allgather().
Please note that, even with process sets, Horovod operations like hvd.broadcast() are not parameterized by this relative rank, but by the global rank as obtained from hvd.rank().
-
-
horovod.torch.
add_process_set
(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]]) → horovod.common.process_sets.ProcessSet[source]¶ Add a new process_set after Horovod initialization and return it.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. No process set containing the same ranks may exist already. The returned process set will be fully initialized.
-
horovod.torch.
remove_process_set
(process_set: horovod.common.process_sets.ProcessSet) → bool[source]¶ Attempt to remove process set and return whether this attempt is successful.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. If removal is successful, we will invalidate the process_set object.
-
exception
horovod.torch.
HorovodInternalError
[source]¶ Internal error raised when a Horovod collective operation (e.g., allreduce) fails.
This is handled in elastic mode as a recoverable error, and will result in a reset event.
-
horovod.torch.
DistributedOptimizer
(optimizer, named_parameters=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, backward_passes_per_step=1, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055851771312'>, gradient_predivide_factor=1.0, num_groups=0, groups=None, sparse_as_dense=False, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ An optimizer that wraps another torch.optim.Optimizer, using an allreduce to combine gradient values before applying gradients to model weights.
Allreduce operations are executed after each gradient is computed by
loss.backward()
in parallel with each other. Thestep()
method ensures that all allreduce operations are finished before applying gradients to the model.DistributedOptimizer exposes the
synchronize()
method, which forces allreduce operations to finish before continuing the execution. It’s useful in conjunction with gradient clipping, or other operations that modify gradients in place beforestep()
is executed. Make sure to useoptimizer.skip_synchronize()
if you’re callingsynchronize()
in your code.Example of gradient clipping:
output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.synchronize() torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip) with optimizer.skip_synchronize(): optimizer.step()
- Parameters
optimizer – Optimizer to use for computing gradients and applying updates.
named_parameters – A mapping between parameter names and values. Used for naming of allreduce operations. Typically just
model.named_parameters()
.compression – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Defaults to not using compression.
backward_passes_per_step – Number of expected backward passes to perform before calling step()/synchronize(). This allows accumulating gradients over multiple mini-batches before reducing and applying them.
op – The reduction operation to use when combining gradients across different ranks.
gradient_predivide_factor – If op == Average, gradient_predivide_factor splits the averaging before and after the sum. Gradients are scaled by 1.0 / gradient_predivide_factor before the sum and gradient_predivide_factor / size after the sum.
num_groups – Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups.
groups – The parameter to group the gradient allreduce ops. Accept values is a non-negative integer or a list of list of torch.Tensor. If groups is a non-negative integer, it is the number of groups to assign gradient allreduce ops to for explicit grouping. If groups is a list of list of torch.Tensor. Tensors in the same inner list will be assigned to the same group, while parameter that does not appear in any list will form a group itself. Defaults as None, which is no explicit groups.
sparse_as_dense –
- If set True, convert all sparse gradients to dense and perform allreduce, then
convert back to sparse before applying the update.
- process_set: Gradients will only be reduced over Horovod processes belonging
to this process set. Defaults to the global process set.
-
class
horovod.torch.
SyncBatchNorm
(*args, **kw)[source]¶ Applies synchronous version of N-dimensional BatchNorm.
In this version, normalization parameters are synchronized across workers during forward pass. This is very useful in situations where each GPU can fit a very small number of examples.
See https://pytorch.org/docs/stable/nn.html#batchnorm2d for more details about BatchNorm.
- Parameters
num_features – number of channels C from the shape (N, C, …)
eps – a value added to the denominator for numerical stability. Default: 1e-5
momentum – the value used for the running_mean and running_var computation. Can be set to None for cumulative moving average (i.e. simple average). Default: 0.1
affine – a boolean value that when set to True, this module has learnable affine parameters. Default: True
track_running_stats – a boolean value that when set to True, this module tracks the running mean and variance, and when set to False, this module does not track such statistics and always uses batch statistics in both training and eval modes. Default: True
Note
Only GPU input tensors are supported in the training mode.
-
class
horovod.torch.elastic.
TorchState
(model=None, optimizer=None, **kwargs)[source]¶ State representation of a PyTorch training process.
Multiple models and optimizers are supported by providing them as kwargs. During initialization, TorchState will assign attributes for every keyword argument, and handle its state synchronization.
- Parameters
model – Optional PyTorch model.
optimizer – Optional PyTorch optimizer.
kwargs – Attributes sync, will be exposed as attributes of the object. If a handler exists for the attribute type, it will be used to sync the object, otherwise it will be handled an ordinary Python object.
-
class
horovod.torch.elastic.
ElasticSampler
(*args, **kw)[source]¶ Sampler that partitions dataset across ranks and repartitions after reset events.
Works similar to DistributedSampler, but with an optional capability to record which dataset indices have been processed each batch. When tracked by a TorchState object, the sampler will automatically repartition the unprocessed indices among the new set of workers.
In order to use this object successfully it is recommended that the user:
Include this object in the TorchState.
Call record_batch after processing a set of samples.
Call set_epoch at the end of each epoch to clear the processed indices.
- Parameters
dataset – Dataset used for sampling (assumed to be of constant size).
shuffle – If True (default), shuffle the indices.
seed – Random seed used to shuffle the sampler when shuffle=True. This number should be identical across all ranks (default: 0).
-
set_epoch
(epoch)[source]¶ Sets the epoch for this sampler.
When shuffle=True, this ensures all replicas use a different random ordering for each epoch.
Will clear and reset the processed_indices for the next epoch. It is important that this is called at the end of the epoch (not the beginning) to ensure that partially completed epochs do not reprocess samples.
- Parameters
epoch – Epoch number.
-
horovod.torch.elastic.
run
(func)[source]¶ Decorator used to run the elastic training process.
The purpose of this decorator is to allow for uninterrupted execution of the wrapped function across multiple workers in parallel, as workers come and go from the system. When a new worker is added, its state needs to be brought to the same point as the other workers, which is done by synchronizing the state object before executing func.
When a worker is added or removed, other workers will raise an exception to bring them back to such a sync point before executing func again. This ensures that workers do not diverge when such reset events occur.
It’s important to note that collective operations (e.g., broadcast, allreduce) cannot be the call to the wrapped function. Otherwise, new workers could execute these operations during their initialization while other workers are attempting to sync state, resulting in deadlock.
- Parameters
func – a wrapped function taking any number of args or kwargs. The first argument must be a horovod.common.elastic.State object used to synchronize state across workers.
horovod.mxnet¶
-
class
horovod.mxnet.
Compression
[source]¶ Optional gradient compression algorithm used during allreduce.
-
none
¶ Compress all floating point gradients to 16-bit.
alias of
horovod.mxnet.compression.NoneCompressor
-
fp16
¶ alias of
horovod.mxnet.compression.FP16Compressor
-
-
horovod.mxnet.
allgather_object
(obj, name=None)[source]¶ Serializes and allgathers an object from all other processes.
- Parameters
obj – An object capable of being serialized without losing any context.
name – Optional name to use during allgather, will default to the class type.
- Returns
The list of objects that were allgathered across all ranks.
-
horovod.mxnet.
broadcast_object
(obj, root_rank=0, name=None)[source]¶ Serializes and broadcasts an object from root rank to all other processes.
- Parameters
obj – An object capable of being serialized without losing any context.
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
name – Optional name to use during broadcast, will default to the class type.
- Returns
The object that was broadcast from the root_rank.
-
horovod.mxnet.
allgather
(tensor, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified.
The concatenation is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensor – A tensor to allgather.
name – A name of the allgather operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same type as tensor, concatenated on dimension zero across all processes. The shape is identical to the input shape, except for the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes.
-
horovod.mxnet.
grouped_allgather
(tensors, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that concatenates each input tensor with the corresponding input tensor on all other Horovod processes for a list of input tensors. The input tensors are not modified.
The concatenation is done on the first dimension, so the corresponding input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensors – A list of tensors to allgather.
name – A base name to use for the group allgather operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A list containing tensors of the same type as in tensors. Each tensor is concatenated on dimension zero across all processes. Its shape is identical to the corresponding input shape, expect for the first dimension, which may be greater and is the sum of all first dimensions of the corresponding tensor in different Horovod processes.
-
horovod.mxnet.
allreduce
(tensor, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[source]¶ A function that performs averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to average or sum.
average –
Warning
Deprecated since version 0.24.0.
Use op instead. Will be removed in v1.0.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
name – A name of the reduction operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
prescale_factor – Multiplicative factor to scale tensor before allreduce
postscale_factor – Multiplicative factor to scale tensor after allreduce
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.mxnet.
allreduce_
(tensor, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[source]¶ A function that performs in-place averaging or summation of the input tensor over all the Horovod processes.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to average or sum.
average –
Warning
Deprecated since version 0.24.0.
Use op instead. Will be removed in v1.0.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
name – A name of the reduction operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
prescale_factor – Multiplicative factor to scale tensor before allreduce
postscale_factor – Multiplicative factor to scale tensor after allreduce
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, averaged or summed across all processes.
-
horovod.mxnet.
grouped_allreduce
(tensors, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[source]¶ A function that performs averaging or summation of the input tensors over all the Horovod processes. The input tensors are not modified.
The reduction operations are keyed by the base name. If a base name is not provided, an incremented auto-generated base name is used. Reductions are performed across tensors in the same list position. The tensor type and shape must be the same on all Horovod processes for tensors sharing positions in the input tensor list. The reduction will not start until all processes are ready to send and receive the tensors.
- Parameters
tensors – A list of tensors to average or sum.
average –
Warning
Deprecated since version 0.24.0.
Use op instead. Will be removed in v1.0.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
name – A base name to use for the group reduction operation
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
prescale_factor – Multiplicative factor to scale tensor before allreduce
postscale_factor – Multiplicative factor to scale tensor after allreduce
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A list containing tensors of the same shape and type as in tensors, averaged or summed across all processes.
-
horovod.mxnet.
grouped_allreduce_
(tensors, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[source]¶ A function that performs in-place averaging or summation of the input tensors over all the Horovod processes.
The reduction operations are keyed by the base name. If a base name is not provided, an incremented auto-generated base name is used. Reductions are performed across tensors in the same list position. The tensor type and shape must be the same on all Horovod processes for tensors sharing positions in the input tensor list. The reduction will not start until all processes are ready to send and receive the tensors.
- Parameters
tensors – A list of tensors to average or sum.
average –
Warning
Deprecated since version 0.24.0.
Use op instead. Will be removed in v1.0.
op – The reduction operation to combine tensors across different ranks. Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given.
name – A base name to use for the group reduction operation
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
prescale_factor – Multiplicative factor to scale tensor before allreduce
postscale_factor – Multiplicative factor to scale tensor after allreduce
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A list containing tensors of the same shape and type as in tensors, averaged or summed across all processes.
-
horovod.mxnet.
alltoall
(tensor, splits=None, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that scatters slices of the input tensor to all other Horovod processes and returns a tensor of gathered slices from all other Horovod processes. The input tensor is not modified.
The slicing is done on the first dimension, so the input tensors on the different processes must have the same rank and shape, except for the first dimension, which is allowed to be different.
- Parameters
tensor – A tensor to distribute with alltoall.
splits – A tensor of integers in rank order describing how many elements in tensor to send to each worker. Splitting is applied along the first dimension of tensor. If splits is not provided, the first dimension is split equally by the number of Horovod processes.
name – A name of the alltoall operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor containing the gathered tensor data from all workers.
If splits has been provided: A tensor of integers in rank order describing how many elements in the output tensor have been received from each worker.
-
horovod.mxnet.
broadcast
(tensor, root_rank, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.mxnet.
broadcast_
(tensor, root_rank, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[source]¶ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place.
The broadcast operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The broadcast will not start until all processes are ready to send and receive the tensor.
- Parameters
tensor – A tensor to broadcast.
root_rank – The rank to broadcast the value from.
name – A name of the broadcast operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
- Returns
A tensor of the same shape and type as tensor, with the value broadcasted from root rank.
-
horovod.mxnet.
reducescatter
(tensor, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055843834752'>, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[source]¶ A function that performs asynchronous averaging or summation of the input tensor over all the Horovod processes, then scatters the results across all Horovod processes. The input tensor is not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensor – A tensor to average/sum and scatter.
op – The reduction operation to combine tensors across different ranks. Can be Average (default) or Sum.
name – A name of the reduction operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
prescale_factor – Multiplicative factor to scale tensor before reducescatter.
postscale_factor – Multiplicative factor to scale tensor after reducescatter.
- Returns
A tensor of the same rank and type as tensor across all processes. The shape is identical to the input shape except for the first dimension, which will be divided across the different Horovod processes.
-
horovod.mxnet.
grouped_reducescatter
(tensors, op=<MagicMock name='mock().horovod_reduce_op_average()' id='140055843834752'>, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[source]¶ A function that performs reduction of a list of input tensors over all the Horovod processes, then scatters the results across all Horovod processes. The input tensors are not modified.
The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor.
This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients to be computed and backpropagated.
- Parameters
tensors – A list of tensors to average and sum.
op – The reduction operation to combine tensors across different ranks. Can be Average (default) or Sum.
name – A base name to use for the group reduction operation.
priority – The priority of this operation. Higher priority operations are likely to be executed before other operations.
process_set – Process set object to limit this operation to a subset of Horovod processes. Default is the global process set.
prescale_factor – Multiplicative factor to scale tensors before reducescatter.
postscale_factor – Multiplicative factor to scale tensors after reducescatter.
- Returns
A list containing tensors of the same rank and type as in tensors. For each tensor the shape is identical to the input shape, except for the first dimension, which will be divided across the different Horovod processes.
-
horovod.mxnet.
shutdown
()¶ A function that shuts Horovod down.
-
horovod.mxnet.
is_initialized
()¶ Returns True if Horovod is initialized
-
horovod.mxnet.
start_timeline
(file_path, mark_cycles=False)¶ Creates a timeline file at file_path and begins recording.
- Parameters
file_path – String path to the timeline file.
mark_cycles – Boolean indicating that cycles should be marked on the timeline (default: False).
Raises a ValueError if Horovod is not initialized.
-
horovod.mxnet.
stop_timeline
()¶ Stops the active timeline recording and closes the file.
Raises a ValueError if Horovod is not initialized.
-
horovod.mxnet.
size
()¶ A function that returns the number of Horovod processes.
- Returns
An integer scalar containing the number of Horovod processes.
-
horovod.mxnet.
local_size
()¶ A function that returns the number of Horovod processes within the node the current process is running on.
- Returns
An integer scalar containing the number of local Horovod processes.
-
horovod.mxnet.
cross_size
()¶ A function that returns the number of nodes for the local rank of the current Horovod process. For example, if there are 2 nodes in the job: one running 2 processes and the other running 1 process, then the first process on each node will have cross size 2, and the second process on the first node will have cross size 1.
- Returns
An integer scalar containing the number of cross Horovod processes.
-
horovod.mxnet.
rank
()¶ A function that returns the Horovod rank of the calling process.
- Returns
An integer scalar with the Horovod rank of the calling process.
-
horovod.mxnet.
local_rank
()¶ A function that returns the local Horovod rank of the calling process, within the node that it is running on. For example, if there are seven processes running on a node, their local ranks will be zero through six, inclusive.
- Returns
An integer scalar with the local Horovod rank of the calling process.
-
horovod.mxnet.
cross_rank
()¶ A function that returns the cross Horovod rank of the calling process, across the nodes in the job. The cross rank of a process corresponds to the rank of the node its is running on. For example, if there are 7 nodes in a job, the cross ranks will be zero through six, inclusive.
- Returns
An integer scalar with the cross Horovod rank of the calling process.
-
horovod.mxnet.
mpi_threads_supported
()¶ A function that returns a flag indicating whether MPI multi-threading is supported.
If MPI multi-threading is supported, users may mix and match Horovod usage with other MPI libraries, such as mpi4py.
- Returns
A boolean value indicating whether MPI multi-threading is supported.
-
horovod.mxnet.
mpi_enabled
()¶ Returns True if MPI is mode is currently enabled at runtime.
If MPI is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether MPI is enabled.
-
horovod.mxnet.
mpi_built
()¶ Returns True if Horovod was compiled with MPI support.
- Returns
A boolean value indicating whether MPI support was compiled.
-
horovod.mxnet.
gloo_enabled
()¶ Returns True if Gloo is mode is currently enabled at runtime.
If Gloo is enabled, users can use it for controller or data transfer operations.
- Returns
A boolean value indicating whether Gloo is enabled.
-
horovod.mxnet.
gloo_built
()¶ Returns True if Horovod was compiled with Gloo support.
- Returns
A boolean value indicating whether Gloo support was compiled.
-
horovod.mxnet.
nccl_built
()¶ Function to check if Horovod was compiled with NCCL support.
- Returns
An integer value indicating whether NCCL support was compiled. If NCCL support was compiled, returns NCCL_VERSION_CODE. Otherwise, returns 0.
-
horovod.mxnet.
ddl_built
()¶ Returns True if Horovod was compiled with DDL support.
- Returns
A boolean value indicating whether DDL support was compiled.
-
horovod.mxnet.
ccl_built
()¶ Returns True if Horovod was compiled with oneCCL support.
- Returns
A boolean value indicating whether oneCCL support was compiled.
-
horovod.mxnet.
cuda_built
()¶ Returns True if Horovod was compiled with CUDA support.
- Returns
A boolean value indicating whether CUDA support was compiled.
-
horovod.mxnet.
rocm_built
()¶ Returns True if Horovod was compiled with ROCm support.
- Returns
A boolean value indicating whether ROCm support was compiled.
-
class
horovod.mxnet.
ProcessSet
(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[source]¶ Representation of a set of Horovod processes that will run collective operations together
Initialize a ProcessSet with a list of process ranks or an MPI communicator. Then pass this instance to hvd.init() or hvd.add_process_set(). If a valid process set has been initialized, process_set_id will be set to a numeric value.
-
rank
() → Optional[int][source]¶ Return rank relative to this process set or None if not initialized.
This is useful, e.g., to process the result of hvd.allgather().
Please note that, even with process sets, Horovod operations like hvd.broadcast() are not parameterized by this relative rank, but by the global rank as obtained from hvd.rank().
-
-
horovod.mxnet.
add_process_set
(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]]) → horovod.common.process_sets.ProcessSet[source]¶ Add a new process_set after Horovod initialization and return it.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. No process set containing the same ranks may exist already. The returned process set will be fully initialized.
-
horovod.mxnet.
remove_process_set
(process_set: horovod.common.process_sets.ProcessSet) → bool[source]¶ Attempt to remove process set and return whether this attempt is successful.
Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. If removal is successful, we will invalidate the process_set object.
-
class
horovod.mxnet.
OrderedDict
[source]¶ Dictionary that remembers insertion order
-
clear
() → None. Remove all items from od.¶
-
popitem
(last=True)¶ Remove and return a (key, value) pair from the dictionary.
Pairs are returned in LIFO order if last is true or FIFO order if false.
-
move_to_end
(key, last=True)¶ Move an existing element to the end (or beginning if last is false).
Raise KeyError if the element does not exist.
-
update
([E, ]**F) → None. Update D from dict/iterable E and F.¶ If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
-
keys
() → a set-like object providing a view on D’s keys¶
-
items
() → a set-like object providing a view on D’s items¶
-
values
() → an object providing a view on D’s values¶
-
pop
(k[, d]) → v, remove specified key and return the corresponding¶ value. If key is not found, d is returned if given, otherwise KeyError is raised.
-
setdefault
(key, default=None)¶ Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
-
copy
() → a shallow copy of od¶
-
fromkeys
(value=None)¶ Create a new ordered dictionary with keys from iterable and values set to value.
-
-
class
horovod.mxnet.
defaultdict
¶ defaultdict(default_factory[, …]) –> dict with default factory
The default factory is called without arguments to produce a new value when a key is not present, in __getitem__ only. A defaultdict compares equal to a dict with the same items. All remaining arguments are treated the same as if they were passed to the dict constructor, including keyword arguments.
-
copy
() → a shallow copy of D.¶
-
default_factory
¶ Factory for default value called by __missing__().
-
-
horovod.mxnet.
broadcast_parameters
(params, root_rank=0, prefix=None)[source]¶ Broadcasts the parameters from root rank to all other processes. Typical usage is to broadcast the Module.get_params() or the Block.collect_params().
- Parameters
params – One of the following: - dict of parameters to broadcast - ParameterDict to broadcast
root_rank – The rank of the process from which parameters will be broadcasted to all other processes.
prefix – The prefix of the parameters to broadcast. If multiple broadcast_parameters are called in the same program, they must be specified by different prefixes to avoid tensor name collision.
horovod.spark¶
-
horovod.spark.
run
(fn, args=(), kwargs={}, num_proc=None, start_timeout=None, use_mpi=None, use_gloo=None, extra_mpi_args=None, env=None, stdout=None, stderr=None, verbose=1, nics=None, prefix_output_with_timestamp=False, executable=None)[source]¶ Runs Horovod on Spark. Runs num_proc processes executing fn using the same amount of Spark tasks.
- Parameters
fn – Function to run.
args – Arguments to pass to fn.
kwargs – Keyword arguments to pass to fn.
num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
start_timeout – Timeout for Spark tasks to spawn, register and start running the code, in seconds. If not set, falls back to HOROVOD_SPARK_START_TIMEOUT environment variable value. If it is not set as well, defaults to 600 seconds.
extra_mpi_args – Extra arguments for mpi_run. Defaults to no extra args.
env – Environment dictionary to use in Horovod run.
stdout – Horovod stdout is redirected to this stream. Defaults to sys.stdout when used with MPI.
stderr – Horovod stderr is redirected to this stream. Defaults to sys.stderr when used with MPI.
verbose – Debug output verbosity (0-2). Defaults to 1.
nics – List of NICs for tcp network communication.
prefix_output_with_timestamp – shows timestamp in stdout/stderr forwarding on the driver
executable – Optional executable to run when launching the workers. Defaults to sys.executable.
- Returns
List of results returned by running fn on each rank.
-
horovod.spark.
run_elastic
(fn, args=(), kwargs={}, num_proc=None, min_num_proc=None, max_num_proc=None, start_timeout=None, elastic_timeout=None, reset_limit=None, env=None, stdout=None, stderr=None, verbose=1, nics=None, prefix_output_with_timestamp=False, min_np=None, max_np=None)[source]¶ Runs Elastic Horovod on Spark. Runs num_proc processes executing fn using the same amount of Spark tasks.
- Parameters
fn – Function to run.
args – Arguments to pass to fn.
kwargs – Keyword arguments to pass to fn.
num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
min_num_proc – Minimum number of processes running for training to continue. If number of available processes dips below this threshold, then training will wait for more instances to become available.
max_num_proc – Maximum number of training processes, beyond which no additional processes will be created. If not specified, then will be unbounded.
start_timeout – Timeout for Spark tasks to spawn, register and start running the code, in seconds. If not set, falls back to HOROVOD_SPARK_START_TIMEOUT environment variable value. If it is not set as well, defaults to 600 seconds.
elastic_timeout – Timeout for elastic initialisation after re-scaling the cluster. If not set, falls back to HOROVOD_ELASTIC_TIMEOUT environment variable value. If it is not set as well, defaults to 600 seconds.
reset_limit – Maximum number of resets after which the job is terminated.
env – Environment dictionary to use in Horovod run. Defaults to os.environ.
stdout – Horovod stdout is redirected to this stream.
stderr – Horovod stderr is redirected to this stream.
verbose – Debug output verbosity (0-2). Defaults to 1.
nics – List of NICs for tcp network communication.
prefix_output_with_timestamp – shows timestamp in stdout/stderr forwarding on the driver
- Returns
List of results returned by running fn on each rank.
horovod.spark.keras¶
-
class
horovod.spark.keras.
KerasEstimator
(*args, **kwargs)[source]¶ Bases:
horovod.spark.common.estimator.HorovodEstimator
,horovod.spark.keras.estimator.KerasEstimatorParamsReadable
,horovod.spark.keras.estimator.KerasEstimatorParamsWritable
Spark Estimator for fitting Keras models to a DataFrame.
Supports standalone keras and tf.keras, and TensorFlow 1.X and 2.X.
- Parameters
num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
data_module – (Optional) DataModule class used for training and validation, if not set, defaults to the PetastormDataModule.
model – Keras model to train.
backend – Optional Backend object for running distributed training function. Defaults to SparkBackend with num_proc worker processes. Cannot be specified if num_proc is also provided.
store – Store object that abstracts reading and writing of intermediate data and run results.
custom_objects – Optional dictionary mapping names (strings) to custom classes or functions to be considered during serialization/deserialization.
optimizer – Keras optimizer to be converted into a hvd.DistributedOptimizer for training.
loss – Keras loss or list of losses.
loss_weights – Optional list of float weight values to assign each loss.
sample_weight_col – Optional column indicating the weight of each sample.
gradient_compression – Gradient compression used by hvd.DistributedOptimizer.
metrics – Optional metrics to record.
feature_cols – Column names used as feature inputs to the model. Must be a list with each feature mapping to a sequential argument in the model’s forward() function.
label_cols – Column names used as labels. Must be a list with one label for each output of the model.
validation – Optional validation column name (string) where every row in the column is either 1/True or 0/False, or validation split (float) giving percent of data to be randomly selected for validation.
callbacks – Keras callbacks.
batch_size – Number of rows from the DataFrame per batch.
val_batch_size – Number of rows from the DataFrame per batch for validation, if not set, will use batch_size.
epochs – Number of epochs to train.
verbose – Verbosity level [0, 2] (default: 1).
random_seed – Optional random seed to use for Tensorflow. Default: None.
shuffle_buffer_size – (Deprecated) Optional size of in-memory shuffle buffer in rows (on training data). Allocating a larger buffer size increases randomness of shuffling at the cost of more host memory. Defaults to estimating with an assumption of 4GB of memory per host. Set shuffle_buffer_size=0 would turn off shuffle.
shuffle – (Optional) Whether to shuffle training samples or not. Defaults to True.
partitions_per_process – Number of Parquet partitions to assign per worker process from num_proc (default: 10).
run_id – Optional unique ID for this run for organization in the Store. Will be automatically assigned if not provided.
train_steps_per_epoch – Number of steps to train each epoch. Useful for testing that model trains successfully. Defaults to training the entire dataset each epoch.
validation_steps_per_epoch – Number of validation steps to perform each epoch.
transformation_fn – Optional function that takes a row as its parameter and returns a modified row that is then fed into the train or validation step. This transformation is applied after batching. See Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py) for more details. Note that this fucntion constructs another function which should perform the transformation.
train_reader_num_workers – This parameter specifies the number of parallel processes that read the training data from data store and apply data transformations to it. Increasing this number will generally increase the reading rate but will also increase the memory footprint. More processes are particularly useful if the bandwidth to the data store is not high enough, or users need to apply transformation such as decompression or data augmentation on raw data.
val_reader_num_workers – Similar to the train_reader_num_workers.
reader_pool_type – Type of Petastorm worker pool used to parallelize reading data from the dataset. Should be one of [‘thread’, ‘process’, ‘dummy’]. Defaults to ‘thread’.
inmemory_cache_all – boolean value. Cache the data in memory for training and validation. Default: False.
backend_env – dict to add to the environment of the backend. Defaults to setting the java heap size to 2G min and max for libhdfs through petastorm
use_gpu – Whether to use the GPU for training. Defaults to True.
mp_start_method – The method to use to start multiprocessing. Defaults to None.
-
class
horovod.spark.keras.
KerasModel
(*args, **kwargs)[source]¶ Bases:
horovod.spark.common.estimator.HorovodModel
,horovod.spark.keras.estimator.KerasEstimatorParamsReadable
,horovod.spark.keras.estimator.KerasEstimatorParamsWritable
Spark Transformer wrapping a Keras model, used for making predictions on a DataFrame.
Retrieve the underlying Keras model by calling keras_model.getModel().
- Parameters
history – List of metrics, one entry per epoch during training.
model – Trained Keras model.
feature_columns – List of feature column names.
label_columns – List of label column names.
custom_objects – Keras custom objects.
run_id – ID of the run used to train the model.
horovod.spark.torch¶
-
class
horovod.spark.torch.
TorchEstimator
(*args, **kwargs)[source]¶ Bases:
horovod.spark.common.estimator.HorovodEstimator
,horovod.spark.torch.estimator.TorchEstimatorParamsWritable
,horovod.spark.torch.estimator.TorchEstimatorParamsReadable
Spark Estimator for fitting PyTorch models to a DataFrame.
- Parameters
num_proc – Number of Horovod processes. Defaults to spark.default.parallelism.
data_module – (Optional) DataModule class used for training and validation, if not set, defaults to the PetastormDataModule.
model – PyTorch model to train.
backend – Optional Backend object for running distributed training function. Defaults to SparkBackend with num_proc worker processes. Cannot be specified if num_proc is also provided.
store – Store object that abstracts reading and writing of intermediate data and run results.
optimizer – PyTorch optimizer to be converted into a hvd.DistributedOptimizer for training.
loss – PyTorch loss or list of losses.
loss_constructors – Optional functions that generate losses.
metrics – Optional metrics to record.
loss_weights – Optional list of float weight values to assign each loss.
sample_weight_col – Optional column indicating the weight of each sample.
gradient_compression – Gradient compression used by hvd.DistributedOptimizer.
feature_cols – Column names used as feature inputs to the model. Must be a list with each feature mapping to a sequential argument in the model’s forward() function.
continuous_cols – Column names of all columns with continuous features.
categorical_cols – Column names of all columns with categorical features.
input_shapes – List of shapes for each input tensor to the model.
validation – Optional validation column name (string) where every row in the column is either 1/True or 0/False, or validation split (float) giving percent of data to be randomly selected for validation.
label_cols – Column names used as labels. Must be a list with one label for each output of the model.
batch_size – Number of rows from the DataFrame per batch.
val_batch_size – Number of rows from the DataFrame per batch for validation, if not set, will use batch_size.
epochs – Number of epochs to train.
verbose – Verbosity level [0, 2] (default: 1).
random_seed – Optional random seed to use for Torch. Default: None.
shuffle_buffer_size – (Deprecated) Optional size of in-memory shuffle buffer in rows (on training data). Allocating a larger buffer size increases randomness of shuffling at the cost of more host memory. Defaults to estimating with an assumption of 4GB of memory per host. Set shuffle_buffer_size=0 would turn off shuffle.
shuffle – (Optional) Whether to shuffle training samples or not. Defaults to True.
partitions_per_process – Number of Parquet partitions to assign per worker process from num_proc (default: 10).
run_id – Optional unique ID for this run for organization in the Store. Will be automatically assigned if not provided.
train_minibatch_fn – Optional custom function to execute within the training loop. Defaults to standard gradient descent process.
train_steps_per_epoch – Number of steps to train each epoch. Useful for testing that model trains successfully. Defaults to training the entire dataset each epoch.
validation_steps_per_epoch – Number of validation steps to perform each epoch.
transformation_fn – Optional function that takes a row as its parameter and returns a modified row that is then fed into the train or validation step. This transformation is applied after batching. See Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py) for more details. Note that this fucntion constructs another function which should perform the transformation.
train_reader_num_workers – This parameter specifies the number of parallel processes that read the training data from data store and apply data transformations to it. Increasing this number will generally increase the reading rate but will also increase the memory footprint. More processes are particularly useful if the bandwidth to the data store is not high enough, or users need to apply transformation such as decompression or data augmentation on raw data.
val_reader_num_workers – Similar to the train_reader_num_workers.
reader_pool_type – Type of Petastorm worker pool used to parallelize reading data from the dataset. Should be one of [‘thread’, ‘process’, ‘dummy’]. Defaults to ‘thread’.
inmemory_cache_all – (Optional) Cache the data in memory for training and validation.
use_gpu – Whether to use the GPU for training. Defaults to True.
mp_start_method – The method to use to start multiprocessing. Defaults to None.
backward_passes_per_step – Number of backward passes to perform before calling hvd.allreduce. This allows accumulating updates over multiple mini-batches before reducing and applying them. Defaults to 1.
-
class
horovod.spark.torch.
TorchModel
(*args, **kwargs)[source]¶ Bases:
horovod.spark.common.estimator.HorovodModel
,horovod.spark.torch.estimator.TorchEstimatorParamsWritable
,horovod.spark.torch.estimator.TorchEstimatorParamsReadable
Spark Transformer wrapping a PyTorch model, used for making predictions on a DataFrame.
Retrieve the underlying PyTorch model by calling torch_model.getModel().
- Parameters
history – List of metrics, one entry per epoch during training.
model – Trained PyTorch model.
feature_columns – List of feature column names.
label_columns – List of label column names.
optimizer – PyTorch optimizer used during training, containing updated state.
run_id – ID of the run used to train the model.
loss – PyTorch loss(es).
loss_constructors – PyTorch loss constructors.
horovod.spark.common¶
-
class
horovod.spark.common.estimator.
HorovodEstimator
[source]¶
-
class
horovod.spark.common.estimator.
HorovodModel
[source]¶ -
transform
(df, params=None)[source]¶ Transforms the input dataset with prediction columns representing model predictions.
Prediction column names default to <label_column>__output. Override column names by calling transformer.setOutputCols(col_names).
- Parameters
df – Input dataset, which is an instance of
pyspark.sql.DataFrame
.params – An optional param map that overrides embedded params.
- Returns
Transformed dataset.
-
-
class
horovod.spark.common.backend.
Backend
[source]¶ Bases:
object
Interface for remote execution of the distributed training function.
A custom backend can be used in cases where the training environment running Horovod is different from the Spark application running the HorovodEstimator.
-
run
(fn, args=(), kwargs={}, env=None)[source]¶ Executes the training fn and returns results from each worker in a list (ordered by ascending rank).
- Parameters
fn – Function to run.
args – Arguments to pass to fn.
kwargs – Keyword arguments to pass to fn.
env – Environment dictionary to use in Horovod run. Defaults to os.environ.
- Returns
List of results returned by running fn on each rank.
-
-
class
horovod.spark.common.backend.
SparkBackend
(num_proc=None, env=None, **kwargs)[source]¶ Bases:
horovod.spark.common.backend.Backend
Uses horovod.spark.run to execute the distributed training fn.
-
run
(fn, args=(), kwargs={}, env=None)[source]¶ Executes the training fn and returns results from each worker in a list (ordered by ascending rank).
- Parameters
fn – Function to run.
args – Arguments to pass to fn.
kwargs – Keyword arguments to pass to fn.
env – Environment dictionary to use in Horovod run. Defaults to os.environ.
- Returns
List of results returned by running fn on each rank.
-
-
horovod.spark.common.store.
host_hash
(salt=None)[source]¶ Computes this host’s host hash by invoking horovod.runner.common.util.host_hash.host_hash.
Consider environment variable CONTAINER_ID which is present when running Spark via YARN. A YARN container does not share memory with other containers on the same host, so it must be considered a host in the sense of the host_hash.
- Parameters
salt – extra information to include in the hash, ignores Falsy values
- Returns
host hash
-
class
horovod.spark.common.store.
Store
[source]¶ Bases:
object
Storage layer for intermediate files (materialized DataFrames) and training artifacts (checkpoints, logs).
Store provides an abstraction over a filesystem (e.g., local vs HDFS) or blob storage database. It provides the basic semantics for reading and writing objects, and how to access objects with certain definitions.
The store exposes a generic interface that is not coupled to a specific DataFrame, model, or runtime. Every run of an Estimator should result in a separate run directory containing checkpoints and logs, and every variation in dataset should produce a separate intermediate data path.
In order to allow for caching but to prevent overuse of disk space on intermediate data, intermediate datasets are named in a deterministic sequence. When a dataset is done being used for training, the intermediate files can be reclaimed to free up disk space, but will not be automatically removed so that they can be reused as needed. This is to support both parallel training processes using the same store on multiple DataFrames, as well as iterative training using the same DataFrame on different model variations.
-
get_checkpoints
(run_id, suffix='.ckpt')[source]¶ Returns a list of paths for all checkpoints saved this run.
-
-
class
horovod.spark.common.store.
AbstractFilesystemStore
(prefix_path, train_path=None, val_path=None, test_path=None, runs_path=None, save_runs=True, storage_options=None, checkpoint_filename=None, **kwargs)[source]¶ Bases:
horovod.spark.common.store.Store
Abstract class for stores that use a filesystem for underlying storage.
-
read_serialized_keras_model
(ckpt_path, model, custom_objects)[source]¶ Reads the checkpoint file of the keras model into model bytes and returns the base 64 encoded model bytes. :param ckpt_path: A string of path to the checkpoint file. :param model: A keras model. This parameter will be used in DBFSLocalStore .read_serialized_keras_model() when the ckpt_path only contains model weights. :param custom_objects: This parameter will be used in DBFSLocalStore .read_serialized_keras_model() when loading the keras model. :return: the base 64 encoded model bytes of the checkpoint model.
-
-
class
horovod.spark.common.store.
FilesystemStore
(prefix_path, *args, **kwargs)[source]¶ Bases:
horovod.spark.common.store.AbstractFilesystemStore
Concrete filesystems store that delegates to fsspec.
-
sync_fn
(run_id)[source]¶ Returns a function that synchronises given path recursively into run path for run_id.
-
copy
(lpath, rpath, recursive=False, callback=<MagicMock id='140055834504160'>, **kwargs)[source]¶ This method copies the contents of the local source directory to the target directory. This is different from the fsspec’s put() because it does not copy the source folder to the target directory in the case when target directory already exists.
-
-
class
horovod.spark.common.store.
LocalStore
(*args, **kwargs)[source]¶ Bases:
horovod.spark.common.store.FilesystemStore
Uses the local filesystem as a store of intermediate data and training artifacts.
This class is deprecated and now just resolves to FilesystemStore.
-
class
horovod.spark.common.store.
HDFSStore
(prefix_path, host=None, port=None, user=None, kerb_ticket=None, driver='libhdfs', extra_conf=None, *args, **kwargs)[source]¶ Bases:
horovod.spark.common.store.AbstractFilesystemStore
Uses HDFS as a store of intermediate data and training artifacts.
Initialized from a prefix_path that can take one of the following forms:
“hdfs://namenode01:8020/user/test/horovod”
“hdfs:///user/test/horovod”
“/user/test/horovod”
The full path (including prefix, host, and port) will be used for all reads and writes to HDFS through Spark. If host and port are not provided, they will be omitted. If prefix is not provided (case 3), it will be prefixed to the full path regardless.
The localized path (without prefix, host, and port) will be used for interaction with PyArrow. Parsed host and port information will be used to initialize PyArrow HadoopFilesystem if they are not provided through the host and port arguments to this initializer. These parameters will default to default and 0 if neither the path URL nor the arguments provide this information.
-
class
horovod.spark.common.store.
DBFSLocalStore
(prefix_path, *args, **kwargs)[source]¶ Bases:
horovod.spark.common.store.FilesystemStore
Uses Databricks File System (DBFS) local file APIs as a store of intermediate data and training artifacts.
Initialized from a prefix_path starts with /dbfs/…, file:///dbfs/…, file:/dbfs/… or dbfs:/…, see https://docs.databricks.com/data/databricks-file-system.html#local-file-apis.
horovod.ray¶
-
class
horovod.ray.
RayExecutor
(settings, num_workers: Optional[int] = None, num_hosts: Optional[int] = None, num_workers_per_host: int = 1, cpus_per_worker: int = 1, use_gpu: bool = False, gpus_per_worker: Optional[int] = None, use_current_placement_group: bool = True, min_workers: Optional[int] = None, max_workers: Optional[int] = None, reset_limit: Optional[int] = None, cooldown_range: Optional[List[int]] = None, elastic_timeout: int = 600, override_discovery: bool = True)[source]¶ Job class for Horovod + Ray integration.
- Parameters
settings (horovod.Settings) – Configuration for job setup. You can use a standard Horovod Settings object or create one directly from RayExecutor.create_settings.
num_workers (int) – Number of workers to use for training.
cpus_per_worker (int) – Number of CPU resources to allocate to each worker.
use_gpu (bool) – Whether to use GPU for allocation. TODO: this can be removed.
gpus_per_worker (int) – Number of GPU resources to allocate to each worker.
num_hosts (int) – Alternative API to
num_workers
. Number of machines to execute the job on. Used to enforce equal number of workers on each machine.num_workers_per_host (int) – Alternative API to
num_workers
. Number of workers to be placed on each machine. Used to enforce equal number of workers on each machine. Only used in conjunction with num_hosts.use_current_placement_group (bool) – Whether to use the current placement group instead of creating a new one. Defaults to True.
min_workers (int) – Minimum number of processes running for training to continue. If number of available processes dips below this threshold, then training will wait for more instances to become available.
max_workers (int) – Maximum number of training processes, beyond which no additional processes will be created. If not specified, then will be unbounded.
reset_limit (int) – Maximum number of times that the training job can scale up or down the number of workers after which the job is terminated.
elastic_timeout (int) – Timeout for elastic initialisation after re-scaling the cluster. The default value is 600 seconds. Alternatively, the environment variable HOROVOD_ELASTIC_TIMEOUT can also be used.
override_discovery (bool) – Whether for the ElasticRayExecutor to automatically provide a discovery mechanism for ElasticSettings.
-
classmethod
create_settings
(timeout_s=30, ssh_identity_file=None, ssh_str=None, placement_group_timeout_s=100, nics=None)[source]¶ Create a mini setting object.
- Parameters
timeout_s (int) – Timeout parameter for Gloo rendezvous.
ssh_identity_file (str) – Path to the identity file to ssh into different hosts on the cluster.
ssh_str (str) – CAUTION WHEN USING THIS. Private key file contents. Writes the private key to ssh_identity_file.
placement_group_timeout_s (int) – Timeout parameter for Ray Placement Group creation.
nics (set) – Network interfaces that can be used for communication.
- Returns
MiniSettings object.
-
start
(executable_cls: Optional[type] = None, executable_args: Optional[List] = None, executable_kwargs: Optional[Dict] = None, extra_env_vars: Optional[Dict] = None)[source]¶ Starts the workers and colocates them on all machines.
We implement a node grouping because it seems like our implementation doesn’t quite work for imbalanced nodes. Also, colocation performance is typically much better than non-colocated workers.
- Parameters
executable_cls (type) – The class that will be created within an actor (BaseHorovodWorker). This will allow Horovod to establish its connections and set env vars.
executable_args (List) – Arguments to be passed into the worker class upon initialization.
executable_kwargs (Dict) – Keyword arguments to be passed into the worker class upon initialization.
extra_env_vars (Dict) – Environment variables to be set on the actors (worker processes) before initialization.
-
execute
(fn: Callable[[executable_cls], Any], callbacks: Optional[List[Callable]] = None) → List[Any][source]¶ Executes the provided function on all workers.
- Parameters
fn – Target function to be invoked on every object.
callbacks – List of callables. Each callback must either be a callable function or a class that implements __call__. Every callback will be invoked on every value logged by the rank 0 worker.
- Returns
Deserialized return values from the target function.
-
run
(fn: Callable[[Any], Any], args: Optional[List] = None, kwargs: Optional[Dict] = None, callbacks: Optional[List[Callable]] = None) → List[Any][source]¶ Executes the provided function on all workers.
- Parameters
fn – Target function that can be executed with arbitrary args and keyword arguments.
args – List of arguments to be passed into the target function.
kwargs – Dictionary of keyword arguments to be passed into the target function.
callbacks – List of callables. Each callback must either be a callable function or a class that implements __call__. Every callback will be invoked on every value logged by the rank 0 worker.
- Returns
Deserialized return values from the target function.
-
run_remote
(fn: Callable[[Any], Any], args: Optional[List] = None, kwargs: Optional[Dict] = None) → List[Any][source]¶ Executes the provided function on all workers.
- Parameters
fn – Target function that can be executed with arbitrary args and keyword arguments.
args – List of arguments to be passed into the target function.
kwargs – Dictionary of keyword arguments to be passed into the target function.
- Returns
- List of ObjectRefs that you can run ray.get on to
retrieve values.
- Return type
list
-
class
horovod.ray.
ElasticRayExecutor
(settings: horovod.runner.elastic.settings.ElasticSettings, use_gpu: bool = False, cpus_per_slot: int = 1, gpus_per_slot: Optional[int] = None, env_vars: Optional[dict] = None, override_discovery=True)[source]¶ Executor for elastic jobs using Ray.
Leverages the Ray global state to detect available hosts and slots. Assumes that the entire Ray cluster is available for the Executor to use.
- Parameters
settings – Configuration for the elastic job setup. You can use a standard Horovod ElasticSettings object or create one directly from ElasticRayExecutor.create_settings.
use_gpu (bool) – Whether to use GPU for allocation.
cpus_per_slot (int) – Number of CPU resources to allocate to each worker.
gpus_per_slot (int) – Number of GPU resources to allocate to each worker.
env_vars (Dict) – Environment variables to be set on the actors (worker processes) before initialization.
override_discovery (bool) – Whether for the ElasticRayExecutor to automatically provide a discovery mechanism for ElasticSettings.
Example:
import ray ray.init(address="auto") settings = ElasticRayExecutor.create_settings(verbose=True) executor = ElasticRayExecutor( settings, use_gpu=True, cpus_per_slot=2) executor.start() executor.run(train_fn)
warning:: .. deprecated:: 0.25.0
-
static
create_settings
(min_num_proc: int = 1, max_num_proc: Optional[int] = None, reset_limit: Optional[int] = None, elastic_timeout: int = 600, timeout_s: int = 30, ssh_identity_file: Optional[str] = None, nics: Optional[str] = None, min_np=None, max_np=None, **kwargs)[source]¶ Returns a Settings object for ElasticRayExecutor.
Note that the discovery property will be set at runtime.
- Parameters
min_num_proc (int) – Minimum number of processes running for training to continue. If number of available processes dips below this threshold, then training will wait for more instances to become available.
max_num_proc (int) – Maximum number of training processes, beyond which no additional processes will be created. If not specified, then will be unbounded.
reset_limit (int) – Maximum number of times that the training job can scale up or down the number of workers after which the job is terminated.
elastic_timeout (int) – Timeout for elastic initialisation after re-scaling the cluster. The default value is 600 seconds. Alternatively, the environment variable HOROVOD_ELASTIC_TIMEOUT can also be used.’
timeout_s (int) – Horovod performs all the checks and starts the processes before the specified timeout. The default value is 30 seconds.
ssh_identity_file (str) – File on the driver from which the identity (private key) is read.
nics (set) – Network interfaces that can be used for communication.
-
run
(worker_fn: Callable, callbacks: Optional[List[Callable]] = None) → List[Any][source]¶ Executes the provided function on all workers.
- Parameters
worker_fn – Target elastic function that can be executed.
callbacks – List of callables. Each callback must either be a callable function or a class that implements __call__. Every callback will be invoked on every value logged by the rank 0 worker.
- Returns
List of return values from every completed worker.
horovod.run¶
Launch a Horovod job to run the specified process function and get the return value.
- param func
The function to be run in Horovod job processes. The function return value will be collected as the corresponding Horovod process return value. This function must be compatible with pickle.
- param args
Arguments to pass to func.
- param kwargs
Keyword arguments to pass to func.
- param num_proc
Number of Horovod processes.
- param min_num_proc
Minimum number of processes running for training to continue. If number of available processes dips below this threshold, then training will wait for more instances to become available. Defaults to num_proc
- param max_num_proc
Maximum number of training processes, beyond which no additional processes will be created. If not specified, then will be unbounded.
- param slots
Number of slots for processes per host. Normally 1 slot per GPU per host. If slots are provided by the output of the host discovery script, then that value will override this parameter.
- param reset_limit
Maximum number of times that the training job can scale up or down the number of workers after which the job is terminated. A reset event occurs when workers are added or removed from the job after the initial registration. So a reset_limit of 0 would mean the job cannot change membership after its initial set of workers. A reset_limit of 1 means it can resize at most once, etc.
- param cooldown_range
Range of seconds(min, max) a failing host will remain in blacklist.
- param hosts
List of host names and the number of available slots for running processes on each, of the form: <hostname>:<slots> (e.g.: host1:2,host2:4,host3:1 indicating 2 processes can run on host1, 4 on host2, and 1 on host3). If not specified, defaults to using localhost:<num_proc>
- param hostfile
Path to a host file containing the list of host names and the number of available slots. Each line of the file must be of the form: <hostname> slots=<slots>
- param host_discovery_script
Used for elastic training (autoscaling and fault tolerance). An executable script that will print to stdout every available host (one per newline character) that can be used to run worker processes. Optionally specifies number of slots on the same line as the hostname as: “hostname:slots”. Providing a discovery script enables elastic training. The job will fail immediately if execution of the script returns a non-zero exit code on the first call. Subsequent calls will be retried until timeout.
- param start_timeout
Horovodrun has to perform all the checks and start the processes before the specified timeout. The default value is 30 seconds. Alternatively, The environment variable HOROVOD_START_TIMEOUT can also be used to specify the initialization timeout.
- param ssh_port
SSH port on all the hosts.
- param ssh_identity_file
SSH identity (private key) file.
- param disable_cache
If the flag is not set, horovodrun will perform the initialization checks only once every 60 minutes – if the checks successfully pass. Otherwise, all the checks will run every time horovodrun is called.’
- param output_filename
For Gloo, writes stdout / stderr of all processes to a filename of the form <output_filename>/rank.<rank>/<stdout | stderr>. The <rank> will be padded with 0 characters to ensure lexicographical order. For MPI, delegates its behavior to mpirun.
- param verbose
If this flag is set, extra messages will be printed.
- param use_gloo
Run Horovod using the Gloo controller. This will be the default if Horovod was not built with MPI support.
- param use_mpi
Run Horovod using the MPI controller. This will be the default if Horovod was built with MPI support.
- param mpi_args
Extra arguments for the MPI controller. This is only used when use_mpi is True.
- param network_interfaces
List of network interfaces to use for communication. If not specified, Horovod will find the common NICs among all the workers. Example: [“eth0”, “eth1”].
- param executable
Optional executable to run when launching the workers. Defaults to sys.executable.
- return
Return a list which contains values return by all Horovod processes. The index of the list corresponds to the rank of each Horovod process. Returns only the first min_num_proc results, if set.