Source code for horovod.common.process_sets

from typing import *
# for type annotations, importing mpi4py has dangerous side effects
try:
    from horovod.common.basics import HorovodBasics
except ImportError:
    class HorovodBasics:
        ...
class MPI:
    class Comm:
        ...

from horovod.common.util import is_iterable


_basics = None  # type: Optional[HorovodBasics]


[docs]class ProcessSet: """ Representation of a set of Horovod processes that will run collective operations together Initialize a ProcessSet with a list of process ranks or an MPI communicator. Then pass this instance to hvd.init() or hvd.add_process_set(). If a valid process set has been initialized, process_set_id will be set to a numeric value. """ process_set_id = None ranks = None mpi_comm = None def __init__(self, ranks_or_comm: Union[Sequence[int], MPI.Comm]): if is_iterable(ranks_or_comm): ranks_or_comm = sorted(ranks_or_comm) if any(not isinstance(rk, int) for rk in ranks_or_comm): raise ValueError( "ProcessSet should be initialized with a list of process ranks or an mpi4py Comm object") self.ranks = ranks_or_comm else: assert _basics is not None, "process_sets._setup() must be called first" if not _basics.mpi_built(): raise ValueError( "Apparently you tried to build a ProcessSet from an MPI communicator, " "but Horovod has not been built with MPI support. Ensure MPI is installed and " "reinstall Horovod with HOROVOD_WITH_MPI=1 to debug the build error.") from mpi4py import MPI if not isinstance(ranks_or_comm, MPI.Comm): raise ValueError( "ProcessSet should be initialized with a list of process ranks or an mpi4py Comm object") self.mpi_comm = ranks_or_comm def _invalidate(self): self.process_set_id = None
[docs] def size(self) -> Optional[int]: """ Return size of the process set or None if not initialized. """ if self.process_set_id is None: return None return _basics._process_set_size(self.process_set_id)
[docs] def rank(self) -> Optional[int]: """ Return rank relative to this process set or None if not initialized. This is useful, e.g., to process the result of hvd.allgather(). Please note that, even with process sets, Horovod operations like hvd.broadcast() are not parameterized by this relative rank, but by the global rank as obtained from hvd.rank(). """ if self.process_set_id is None: return None return _basics._process_set_rank(self.process_set_id)
[docs] def included(self) -> Optional[bool]: """ Return whether the current process is part of this process set or None if not initialized. """ if self.ranks is None: return None return _basics.rank() in self.ranks
def __str__(self) -> str: return f"ProcessSet(process_set_id={self.process_set_id}, ranks={self.ranks}, mpi_comm={self.mpi_comm})"
def _temp_process_set_object(process_set_id: int) -> ProcessSet: """ For Horovod-internal usage where we don't have a ProcessSet instance at hand but know a valid process_set_id. """ ps = ProcessSet.__new__(ProcessSet) ps.process_set_id = process_set_id return ps global_process_set = ProcessSet([]) global_process_set.process_set_id = 0 def _setup(basics): # type: (Optional[HorovodBasics]) -> None """" Horovod internal, to be called after the Horovod C++ module has been loaded. """ global _basics _basics = basics def _init_process_sets(process_set_list: List[ProcessSet]): """ Update process_set_id and ranks entries of all passed process set objects and invalidate any clones. Horovod internal, to be called from hvd.init(). """ # Update process set objects in passed list: ids_seen_in_process_set_list = {0} # global_process_set is not in list id_to_ranks_dict = _basics._get_process_set_ids_and_ranks() ranks_to_id_dict = {tuple(ranks): process_set_id for process_set_id, ranks in id_to_ranks_dict.items()} for ps in process_set_list: if ps.ranks is not None: ps.process_set_id = ranks_to_id_dict[tuple(ps.ranks)] elif ps.mpi_comm is not None: ps.process_set_id = _basics._comm_process_set_id(ps.mpi_comm) ps.ranks = list(id_to_ranks_dict[ps.process_set_id]) if ps.process_set_id in ids_seen_in_process_set_list: ps._invalidate() else: ids_seen_in_process_set_list.add(ps.process_set_id) # Update ranks in global process set object if global_process_set.ranks != id_to_ranks_dict[0]: global_process_set.ranks = id_to_ranks_dict[0]
[docs]def add_process_set(process_set: Union[ProcessSet, Sequence[int]]) -> ProcessSet: """ Add a new process_set after Horovod initialization and return it. Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. No process set containing the same ranks may exist already. The returned process set will be fully initialized. """ assert _basics is not None if not isinstance(process_set, ProcessSet): process_set = ProcessSet(process_set) if process_set.ranks is None and process_set.mpi_comm is not None: raise NotImplementedError( "Dynamically adding process sets defined by an MPI communicator is not implemented. " "Please build the process set via a list of ranks.") assert process_set.ranks is not None process_set_id = _basics._add_process_set_impl(process_set.ranks) if process_set_id is None: raise ValueError(f"Attempted to add a duplicate process set: {process_set}") process_set.process_set_id = process_set_id return process_set
[docs]def remove_process_set(process_set: ProcessSet) -> bool: """ Attempt to remove process set and return whether this attempt is successful. Requires running with HOROVOD_DYNAMIC_PROCESS_SETS=1. If removal is successful, we will invalidate the process_set object. """ assert _basics is not None process_set_id = process_set.process_set_id if process_set_id is None: # process set has not been initialized return False if process_set_id == 0: # will not remove the global process set return False process_set._invalidate() returned_id = _basics._remove_process_set_impl(process_set_id) return returned_id is not None