Source code for horovod.spark.common.backend

# Copyright 2019 Uber Technologies, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import horovod.spark.common._namedtuple_fix

import os

import pyspark

import horovod.spark

def default_num_proc():
    spark_context = pyspark.SparkContext._active_spark_context
    return spark_context.defaultParallelism

[docs]class Backend(object): """Interface for remote execution of the distributed training function. A custom backend can be used in cases where the training environment running Horovod is different from the Spark application running the HorovodEstimator. """
[docs] def run(self, fn, args=(), kwargs={}, env=None): """Executes the training `fn` and returns results from each worker in a list (ordered by ascending rank). Args: fn: Function to run. args: Arguments to pass to `fn`. kwargs: Keyword arguments to pass to `fn`. env: Environment dictionary to use in Horovod run. Defaults to `os.environ`. Returns: List of results returned by running `fn` on each rank. """ raise NotImplementedError()
[docs] def num_processes(self): """Returns the number of processes to use for training.""" raise NotImplementedError()
[docs]class SparkBackend(Backend): """Uses `` to execute the distributed training `fn`.""" def __init__(self, num_proc=None, env=None, **kwargs): """ Args: num_proc: Number of Horovod processes. Defaults to `spark.default.parallelism`. env: Environment dictionary to use in Horovod run. Defaults to `os.environ`. **kwargs: Additional arguments passed to `` at training time. """ self._num_proc = num_proc or default_num_proc() self._env = env self._kwargs = kwargs
[docs] def run(self, fn, args=(), kwargs={}, env=None): full_env = self._env or os.environ.copy() if env: full_env.update(env) if 'CUDA_VISIBLE_DEVICES' in full_env: # In TensorFlow 2.0, we set this before calling `run` in order to prevent TensorFlow # from allocating memory on the GPU outside the training process. Once we submit the # function for execution, we want to ensure that TensorFLow has visibility into GPUs on # the device so we can use them for training, which is why we need to unset this. # See del full_env['CUDA_VISIBLE_DEVICES'] return, args=args, kwargs=kwargs, num_proc=self._num_proc, env=full_env, **self._kwargs)
[docs] def num_processes(self): return self._num_proc