Horovod with TensorFlow

To use Horovod, make the following additions to your program. This example uses TensorFlow.

  1. Run hvd.init().

  1. Pin a server GPU to be used by this process using config.gpu_options.visible_device_list.

    With the typical setup of one GPU per process, you can set this to local rank. In that case, the first process on the server will be allocated the first GPU, the second process will be allocated the second GPU, and so forth.

  1. Scale the learning rate by the number of workers.

    Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.

  1. Wrap the optimizer in hvd.DistributedOptimizer.

    The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.

  1. Add hvd.BroadcastGlobalVariablesHook(0) to broadcast initial variable states from rank 0 to all other processes.

    This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you’re not using MonitoredTrainingSession, you can execute the hvd.broadcast_global_variables op after global variables have been initialized.

  1. Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them.

    Accomplish this by passing checkpoint_dir=None to tf.train.MonitoredTrainingSession if hvd.rank() != 0.

Example (see the examples directory for full training examples):

import tensorflow as tf
import horovod.tensorflow as hvd


# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)