RFC: Distribution Strategy - Revised API by guptapriya · Pull Request #25 · tensorflow/community (original) (raw)
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Conversation73 Commits14 Checks0 Files changed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
[ Show hidden characters]({{ revealButtonHref }})
Review period closes 2018-11-08
Distribution Strategy - Revised API
Status | Proposed |
---|---|
Author(s) | cjfj@google.com, dominikg@google.com, jhseu@google.com, joshl@google.com, |
petebu@google.com, priyag@google.com, tomhennigan@google.com | |
Sponsor | wicke@google.com |
Updated | 2018-11-12 |
Objective
This document presents a proposal to seek feedback on a revised Distribution Strategy API and illustrate its usage in various situations. Distribution Strategy aims to allow users to easily distribute their computation across
different GPUs, TPUs and multiple machines. Until now, the recommended usage of Distribution Strategy has been through TensorFlow high level training frameworks such as tf.keras and Estimator. In this proposal, we want to show how one can use Distribution Strategy APIs directly for distributing custom training loops, and get feedback on those APIs. This use case is important for many users who want more control of their training loops, such as ML researchers. We have tested a similar version of this API internally with many researchers in Alphabet and the current proposal is based on their feedback.
This is also an opportune time to get public feedback as we are in the process of migrating Distribution Strategy APIs from tf.contrib
to core TensorFlow (as part of TF 2.0). As part of the move, we want to improve and reorganize the APIs to make them more user friendly and understandable.
I don't understand why this thing was proposed in the first place. The whole idea of Tensorflow 2.0 was to have an easy to use interface, eager execution and no redundancy
. However, this RFC contradicts it. It's like saying, we are having tf.layers.Dense()
for high-level API but we also have tf.nn.Dense()
if you want more flexibility. IMHO, this is going to create a mess again, two things each of which is able to achieve the same goal. Alternatively, the hooks
should be provided within the DistributedStrategy
API itself so that everyone is on the same page. If you are adding another API for researchers, then researchers would be using Replicator
while applied ML engineers will be using DistributedStrategy
.
mratsim, init27, satian7, godardt, jorellano, mxbi, rohts-patil, bignamehyp, Conchylicultor, gabrieldemarmiesse, and 3 more reacted with thumbs up emoji artmortal93 reacted with thumbs down emoji
Distribution Strategy’s low level APIs can be directly used to distribute
training programs written with low level TensorFlow APIs (i.e. without
Estimator/Keras). But it can be tedious and error prone as it exposes an
extensive and flexible API. There is a skeleton of a mid-level API within
Distribution Strategy where users can write their own main-loops more easily, but this has not yet
been developed fully or tested with real users.
IMO this is a pretty weak reason to create a second API. Why not just finish developing the mid-level API and make sure it works across real use cases?
I see that there was discussion of a num_steps_per_run
parameter, but will the Replicator API support some kind of gradient aggregation? I know we usually like to just throw more accelerators at a problem to increase the effective batch size, but for some of us who are more restricted in terms of compute power I wonder if we can support this kind of logic to have larger effective batch sizes. For example using N GPUs with batch size B, but aggregating gradients for R number of runs of the optimizer and then delay syncing to achieve an effective batch size of N*B*R.
@tfboyd Thank you so much. This will be extremely helpful in my current research projects. I greatly appreciate it.
I see that there was discussion of a
num_steps_per_run
parameter, but will the Replicator API support some kind of gradient aggregation? I know we usually like to just throw more accelerators at a problem to increase the effective batch size, but for some of us who are more restricted in terms of compute power I wonder if we can support this kind of logic to have larger effective batch sizes. For example using N GPUs with batch size B, but aggregating gradients for R number of runs of the optimizer and then delay syncing to achieve an effective batch size of N_B_R.
@terrykong I think that this is an orthogonal problem to the one that Replicator is trying to solve. I believe that it could be achieved using a relatively simple Optimizer
wrapper, with or without Replicator.
I see that there was discussion of a
num_steps_per_run
parameter, but will the Replicator API support some kind of gradient aggregation? I know we usually like to just throw more accelerators at a problem to increase the effective batch size, but for some of us who are more restricted in terms of compute power I wonder if we can support this kind of logic to have larger effective batch sizes. For example using N GPUs with batch size B, but aggregating gradients for R number of runs of the optimizer and then delay syncing to achieve an effective batch size of N_B_R.@terrykong I think that this is an orthogonal problem to the one that Replicator is trying to solve. I believe that it could be achieved using a relatively simple
Optimizer
wrapper, with or without Replicator.
@chr1sj0nes Gotcha, I see your point.
@AakashKumarNain @seanpmorgan thank you for the feedback (regarding comment 1, comment 2).
We've taken this feedback into consideration and are planning to re-organize this proposal (hence the delay in responding). We are leaning towards incorporating the new API into Distribution Strategy - and clearly documenting/marking how to use DistributionStrategy APIs depending on the use case. Please stay tuned for a revamped proposal.
@guptapriya Thanks for the clarification. Waiting for the revamped proposal.
passed to run. It takes an `InputFn` `fn` which either returns a |
---|
`Dataset` or a function which returns |
([nests](https://www.tensorflow.org/api\_docs/python/tf/contrib/framework/nest) |
of) input tensors. It also takes an optional `InputReplicationMode` |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to support both Dataset and nests of tensors? Can't a dataset of nested tensors suffice for the latter use case, such that this always returns a dataset?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature was added in response to internal user feedback. In an RL use-case, it is not always convenient to work with Dataset
s. Wrapping Tensor
s as a Dataset
is non-trivial, and introduces an unnecessary (albeit slight) overhead.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall, perhaps once upon a time, that there was a restriction that the Estimator input_fn had to return datasets to work with DistStrat, and that returning a tuple of (feature, label) would not be supported. Or maybe that was with replicate_model_fn? Does that restriction still hold, or does the acceptance of nested tensors imply that other return types are also acceptable?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I think once we add support for fn returning tensors, we should be able to support input_fn
returning (feature,label) tuples in Estimator as well.
optimizer = tf.train.MomentumOptimizer(learning_rate, 0.9) |
---|
def input_fn(ctx): |
assert effective_batch_size % ctx.num_replicas_in_sync == 0 |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice, if this assert fails, what happens? Or, more broadly, how does error propagation work? Would this stop all replicas, or if one fails, the rest continue?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this type of error will happen during graph construction, in the in-graph case, this will just mean that the graph will not be constructed and no replicas will start training. (for multi-GPU and TPU strategy, this would be the case)
For the between-graph case (which is typically used for multi worker), there are a couple different modes - standalone client and independent worker mode. We don't talk about these in detail in this design, but there will be an upcoming design talking about how to setup the cluster etc for multi worker cases easily.
@yuefengz - can you comment on the current state of error propagation for between graph cases for multi worker training?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the between-graph replication case, if this assert fails, it looks to me it will fail at graph creation time on all workers. Is there a chance that only some workers fail the assertion?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this case will fail on all of them. But Karmel's question is also about broad error propagation - if graph creation fails on one of the workers but not on all?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is not any existing mechanism to broadcast errors. That depends on the outer loop. E.g. Kubeflow can monitor these workers and kill the whole training cluster if any of the worker fails with non-retryable errors.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does checkpointing/restoring from checkpoint work in this world? Is failure handling left to the user?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current proposal we are not providing any support with checkpointing/restoring and these would be left to the user if they're not using Keras/Estimator.
There might be a proposal in the future to provide some help with that.
Incorporated Replicator API into Distribution Strategy API, and re-organized the latter.
We have significantly updated the proposal based on the feedback - the major shift is to incorporate Replicator API into the Distribution Strategy API and re-organize it. Please give it a read and provide your feedback. We would especially welcome your thoughts on the "open questions and discussion topics" at the end of the document.
The review period has been extended for a week until Nov. 8th.
guptapriya changed the title
RFC: Replicator RFC: Distribution Strategy - Revised API
bhack mentioned this pull request
An instance of `InputContext` is passed to the input function. |
---|
```python |
class InputContext(object): |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an expected design for DistributionStrategy
. So how to integrate InputContext
into Estimator
and Keras
seamlessly ? Currently, the input_fn
in Estimator
does not have any parameters . But it would be reasonable to modify some interfaces on Estimator
and Keras
.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @wangsiyu .
For Keras, we have had some discussions around whether we should allow users passing an input function. From the feedback we've got so far, it seems that providing a dataset or numpy array directly, and letting DS figure out how to split/shard the input is the most user friendly approach for Keras APIs. So we are not planning to change Keras API to take an input function. We will instead work on efficient distribution of input from a single dataset.
For Estimator, this is under discussion right now, since it already supports input function. As you mentioned, it doesn't currently take this input context, but it would be reasonable to modify that to allow it for users who do wish to modify their input pipelines. Most likely we will add this support.
However, note that this would require users to modify their existing input code. Until now, our philosophy has been to allow keras/estimator users to be able to use their existing code as much as possible when adding DS. So we are trying to figure out what's the best user experience for users who don't wish to modify their existing input pipelines.
strategy = tf.distribute.MirroredStrategy() |
---|
model.compile(loss='mean_squared_error', |
optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2), |
distribute=strategy) |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wait until compile to pass in the strategy, how do we handle, for example, input sharding? Are there other limitations of the Keras flow, or is the expectation that we should be able to use DS to distribute Keras models to the same scale we can today with Estimators?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plan for input is to rely on datasets to be able to (eventually) efficiently shard input. And this can happen at the compile/fit stage, with whatever dataset is provided at that time. Note that the initial implementation may not be efficient, but we want to design the APIs such that it makes the most sense for users, even if we have to sacrifice performance in the short term.
AFAIK, there aren't any other limitations we've seen so far because of delaying strategy passing until compile that affect performance, but @anj-s maybe you can add more thoughts on this?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe compile is the right stage to pass the strategy and set it as one of the model properties, similar to loss, optimizer etc. We also only have access to the input at the fit stage (and possibly information about the workers as well). There isn't any performance issues so far with the current flow. As guptapriya@ mentioned we will be using an initial implementation for input sharding which may have performance issues but this is not dependent on when we pass the strategy object.
# Start the actors. |
for actor_id in range(num_actors): |
threading.Thread(target=run_actor, args=(actor_id,)).start() |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a reinforcement learning story where the environments run in a Process
instead of a Thread
to support heavier computation.
The agent model call and FIFOQueue.enqueue would be problematic in a Process
.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review jperl@. You're right that supporting separate processes for the actors and learner is important but the proposal doesn't preclude that. We already use the prototype implementation in exactly that way with two separate Python+TF processes. The agent model building isn't a problem - you can just build the same model twice, once in the learner and again the actors. To transfer the weights, you can serialize them out of graph and send them to the other process. The example uses threads for illustration - a multi-process example would be somewhat more involved and would rely on additional RPC libraries for the cross-process weight update.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reply @petebu. I apologize if I am misunderstanding -- but can the Distribution Strategy API support mirrored variables on the same machine across processes, so that the end user does not need to manage the cross-process weight sync manually?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar question to jperl@: When you say "To transfer the weights, you can serialize them out of graph and send them to the other process." does DS support weight transfer between two processes (running on one or multiple machines)? If not, can you outline how DS would be used in combination with other communication libraries to implement distributed RL with distributed data collection and one or more learners (with replay buffers/FIFO queue for experience trajectories)?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any update on this? Currently trying to do this (have data gather loop in its own process) and have been using tf.train.replica_device_setter() building the graph in multiple processes and using the parameter server to share weights between agent collecting data and updater that updates the agent's policy model.
#### Worker |
The physical machine(s) containing the physical devices (e.g. GPUs, TPUs) on |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this definition is a bit confusing to me - is a worker one physical machine or something that runs the replica code on one or more physical machines?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A worker, as defined here, is the minimum of (one machine, one replica).
So in the most common case, it would correspond to one machine, and has one or multiple replicas.
But in the less common case where one replica needs to span > 1 machine, we are defining a worker to span the minimum number of machines needed for 1 replica.
I agreed this is a little confusing, and I am also worried this might be confusing with the "worker" task as used in TensorFlow in general (Although they correspond to each other in the common cases).
Do you have suggestions for a different name, or a different concept we should define here?
Updating the design doc with decisions on open questions that were discussed in the in-person design review.
If I have a model that was not trained using a DistributionStrategy, can I just call tf.train.import_meta_graph or tf.import_graph_def in a DS scope to distribute the model? If not, what else would I need to do?
@michaelstjules This is good question. The tf.train.import_meta_graph
currently won't work with run
or call_for_each_replica
method because it directly adds nodes to a TensorFlow graph but distribution strategy need to capture the creation of variables and ops (to replicate them or place them on right devices). We'll think of some way to support it to some degree, which is out of the scope of our v2 API.
| @@ -0,0 +1,1014 @@ | | | | -------------------------------------- | -------- | | | # Distribution Strategy - Revised API | | | | | | | | | Status | Proposed | |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guptapriya please amend to "Accepted" and then we can merge this, thank you!
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, just did that. Thanks @ewilderj !
model.compile(loss='mean_squared_error', |
---|
optimizer=tf.train.GradientDescentOptimizer(learning_rate=0.2), |
distribute=strategy) |
model.fit(dataset, epochs=5, steps_per_epoch=10) |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface between the model.fit and Dataset API is not clear. Is the input dataset supposed to be a single-epoch or should it be repeated? If the dataset is just returning the input dataset and fit is controlling number of epochs to iterate over it, why is steps_per_epoch necessary?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is good feedback @sshrdp and it applies to model.fit and Dataset API in general. In fact @sb2nov had been looking at cleaning this up to make it more intuitive and usable (along the lines of not needing steps_per_epoch
for dataset case).
@sb2nov - can you shed more light?
mean_disc_loss = strategy.reduce(per_replica_disc_losses) |
---|
mean_gen_loss = strategy.reduce(per_replica_gen_losses) |
with tf.Session() as session: |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The examples are using tf.Session
that is going to be removed in 2.0: are those examples supposed to work wrapping the following lines in a function and decorating it with @tf.function
?
Is this a work in progress or does the distribute strategy now support a clean way to efficiently implement custom training loops? I have seen examples of custom loops in tensorflow 1.14, but inside a session, is it possible to use tf.function in 1.14 (without any slowdown) or should we have to move to 2.0 to use the distribute strategy for custom training in eager mode as intended
@guptapriya I have looked at them, and when I tried implementing my model using TPU distribute strategy about a month back in 1.14(latest stable release),there were errors while distributing the dataset (in eager mode) and also it was slower than using an estimator.
As a side note, the 1.x style example (TPU custom training part) does not mention anything about how the losses are reduced over multiple replicas, where as the GPU mirrored strategy manually sums over all replicas. Does the TPU strategy behave differently here? And how about batch_norm mean and variances(are they per replica or combined?)
Also, is there support for non Keras models,let's say if they were built using tf.nn.layers? I am trying to port a GAN model that I am currently training using estimators without much success.
I even tried directly converting the DCGAN Keras example https://www.tensorflow.org/beta/tutorials/generative/dcgan by putting everything that needed to be put under strategy scope,but it throws an error in the current stable tensorflow build as of today. I'll take your suggestion and try nightly or 2.0 beta. Thanks
Reviewers
sshrdp sshrdp left review comments
alextp alextp left review comments
chr1sj0nes chr1sj0nes left review comments
karmel karmel left review comments
jperl jperl left review comments
petebu petebu left review comments
ewilderj ewilderj left review comments
yuefengz yuefengz left review comments
galeone galeone left review comments
goldiegadde goldiegadde left review comments
egonina egonina left review comments
mjlbach mjlbach left review comments
anj-s anj-s left review comments
wangsiyu wangsiyu approved these changes
martinwicke Awaiting requested review from martinwicke
Labels
TensorFlow 2.0 development
RFC Design Document: Accepted by Review