-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathmodel_builder.py
More file actions
445 lines (396 loc) · 19.1 KB
/
model_builder.py
File metadata and controls
445 lines (396 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# coding=utf-8
# Copyright 2017 The Tensor2Tensor Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model building."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import copy
import math
# Dependency imports
import numpy as np
import six
# pylint: disable=redefined-builtin
from six.moves import xrange
# pylint: enable=redefined-builtin
from tensor2tensor import models # pylint: disable=unused-import
from tensor2tensor.utils import devices
from tensor2tensor.utils import input_fn_builder
from tensor2tensor.utils import metrics
from tensor2tensor.utils import optimize
from tensor2tensor.utils import registry
import tensorflow as tf
from tensorflow.python.framework import dtypes
def model_fn(model,
features,
mode,
hparams,
problem_names,
train_steps=100000,
worker_id=0,
worker_replicas=1,
eval_run_autoregressive=False,
decode_hparams=None):
"""Builds the model for all modes.
* TRAIN: Constructs loss and train_op
* EVAL: Constructs the loss and eval metrics
* PREDICT: Constructs the predictions
Args:
model: str, name of model.
features: dict<feature name, Tensor>. Expected to have keys
{inputs, targets, problem_choice}.
mode: tf.estimator.ModeKeys.
hparams: model HParams.
problem_names: list of str, names of the problems.
train_steps: int, total number of training steps. Used to compute learning
rate decay.
worker_id: int, id of this worker.
worker_replicas: int, number of workers.
eval_run_autoregressive: bool, whether to run evaluation autoregressively.
decode_hparams: HParams for decode settings. Used when mode == PREDICT.
Returns:
tf.estimator.EstimatorSpec
"""
assert len(problem_names) == len(hparams.problem_instances)
decode_hp = decode_hparams
# TODO(rsepassi): This still depends on FLAGS. Rm eventually.
dp = devices.data_parallelism()
tf.get_variable_scope().set_initializer(_get_variable_initializer(hparams))
# set the initializer functions
is_training = mode == tf.estimator.ModeKeys.TRAIN
# Add input statistics for incoming features.
with tf.name_scope("input_stats"):
for (k, v) in six.iteritems(features):
if isinstance(v, tf.Tensor) and v.get_shape().ndims > 1:
tf.summary.scalar("%s_batch" % k, tf.shape(v)[0] // dp.n)
tf.summary.scalar("%s_length" % k, tf.shape(v)[1])
nonpadding = tf.to_float(tf.not_equal(v, 0))
nonpadding_tokens = tf.reduce_sum(nonpadding) # non zeros tokens
if k == "targets":
targets_nonpadding_tokens = nonpadding_tokens
tf.summary.scalar("%s_nonpadding_tokens" % k, nonpadding_tokens)
tf.summary.scalar("%s_nonpadding_fraction" % k,
tf.reduce_mean(nonpadding))
# Get multi-problem logits and loss based on features["problem_choice"].
loss_variable_names = []
def nth_model(n):
"""Build the model for the n-th problem, plus some added variables."""
model_class = registry.model(model)(
hparams,
mode,
hparams.problems[n],
n,
dp,
devices.ps_devices(all_workers=True),
decode_hparams=decode_hparams) # initialize transformer model class: hparams, modalities
if mode == tf.estimator.ModeKeys.PREDICT:
return model_class.infer(
features,
beam_size=decode_hp.beam_size,
top_beams=(decode_hp.beam_size if decode_hp.return_beams else 1),
alpha=decode_hp.alpha,
decode_length=decode_hp.extra_length)
# In distributed mode, we build graph for problem=0 and problem=worker_id.
skipping_is_on = hparams.problem_choice == "distributed" and is_training
problem_worker_id = worker_id % len(hparams.problems)
skip_this_one = n != 0 and n % worker_replicas != problem_worker_id
# On worker 0 also build graph for problems <= 1.
# TODO(lukaszkaiser): why is this hack needed for variables init? Repair.
skip_this_one = skip_this_one and (worker_id != 0 or n > 1)
mrt_samples = getattr(hparams, 'mrt_samples', None)
if eval_run_autoregressive and mode == tf.estimator.ModeKeys.EVAL: # evaluation mode
sharded_logits, losses_dict = model_class.eval_autoregressive(features)
else: # training mode
if hparams.rl:
# generate sample data, it will automatically sharded, samples shape [batch, time, 1, 1]
if model_class._num_datashards == 1: # work on single GPU cards, fast sample
print("###Work on Single GPU card, Use Fast Decode.###")
train_beam = getattr(hparams, 'train_beam', None)
if mrt_samples:
samples, _ = model_class._fast_decode(features, decode_length=50,
beam_size=mrt_samples, top_beams=mrt_samples)
inputs = tf.squeeze(tf.squeeze(features["inputs"], axis=-1), axis=-1)
targets = tf.squeeze(tf.squeeze(features["targets"], axis=-1), axis=-1)
batch_size = tf.shape(inputs)[0]
inputs_len = tf.shape(inputs)[1]
targets_len = tf.shape(targets)[1]
inputs_tile = tf.tile(inputs, [1, mrt_samples])
targets_tile = tf.tile(targets, [1, mrt_samples])
inputs_reshape = tf.reshape(inputs_tile, [batch_size*mrt_samples, inputs_len])
targets_reshape = tf.reshape(targets_tile, [batch_size*mrt_samples, targets_len])
inputs_feed = tf.expand_dims(tf.expand_dims(inputs_reshape, axis=-1), axis=-1)
targets_feed = tf.expand_dims(tf.expand_dims(targets_reshape, axis=-1), axis=-1)
features["inputs"] = inputs_feed
features["targets"] = targets_feed
elif train_beam and train_beam != 1: # beam search with hparams.train_beam size and return the top 1 sample
samples, _ = model_class._fast_decode(features, decode_length=50, beam_size=hparams.train_beam)
else:
targets_beam = getattr(hparams, 'targets_beam', None)
if targets_beam:
targets_samples, _ = model_class._fast_decode(features, decode_length=50,
beam_size=4, sampling_method='argmax')
targets_samples = tf.reshape(targets_samples, [tf.shape(targets_samples)[0], tf.shape(targets_samples)[1], 1, 1])
features["targets"] = targets_samples
samples, _ = model_class._fast_decode(features, decode_length=50)
samples = tf.expand_dims(samples, axis=-1)
samples = tf.expand_dims(samples, axis=-1) # add two additional dimensions to make it compatible.
else: # work on multi GPU cards, only support slow sample
print("###Work on Multi GPU cards, Use Slow Decode.###")
samples, _, _ = model_class._slow_greedy_infer(features, decode_length=50) # default decode_length = 50
samples = tf.stop_gradient(samples)
# calculate bleu score use metric_fn
# train_metric_fn = "approx_bleu_train_score"
train_metric_fn = metrics.METRICS_FNS[metrics.Metrics.APPROX_BLEU_TRAIN]
labels = features.get("targets", None)
samples.set_shape([None, None, 1, 1])
# haprams.delta_reward = True for delta reward; False for total reward
metric_value = train_metric_fn(samples, labels, delta_reward=hparams.delta_reward)
metric_value = tf.stop_gradient(metric_value) # to be more strict of the gradient
metric_value.set_shape([None, None, 1, 1])
"""Accodring to the metrics.py: The tf.metrics.mean function assures correct aggregation."""
# metric_value is total_reward: scalar
features["samples"] = samples
features["values"] = metric_value
# del samples
# del labels
sharded_logits, losses_dict = model_class.model_fn(
features, skip=(skipping_is_on and skip_this_one), mrt=mrt_samples)
# if hparams.rl:
# training_loss = losses_dict["training"] * metric_value # losses_dict["training"]: [batch, timesteps]
# training_loss_sum = tf.reduce_sum(training_loss) # sum the training_loss
# losses_dict["training"] = training_loss_sum # log_prob * r (current r is total_reward)
with tf.variable_scope("losses_avg"):
total_loss, ops = 0.0, []
for loss_key, loss_value in six.iteritems(losses_dict):
if hparams.rl:
baseline_loss_weight = getattr(hparams, 'baseline_loss_weight', 1.0)
training_loss_weight = getattr(hparams, 'training_loss_weight', 1.0)
mle_training_loss_weight = getattr(hparams, 'mle_training_loss_weight', 0.3)
if loss_key == "training":
loss_value = loss_value * training_loss_weight
elif loss_key == "training_baseline":
loss_value = loss_value * baseline_loss_weight
elif loss_key == "mle_training":
loss_value = loss_value * mle_training_loss_weight
loss_name = "problem_%d/%s_loss" % (n, loss_key)
loss_moving_avg = tf.get_variable(
loss_name, initializer=100.0, trainable=False)
loss_variable_names.append(loss_name)
ops.append(
loss_moving_avg.assign(loss_moving_avg * 0.9 + loss_value * 0.1))
total_loss += loss_value
try: # Total loss avg might be reused or not, we try both.
with tf.variable_scope(tf.get_variable_scope(), reuse=True):
# Total loss was already constructed on input.
loss_moving_avg = tf.get_variable("problem_%d/total_loss" % n)
except ValueError:
loss_moving_avg = tf.get_variable(
"problem_%d/total_loss" % n, initializer=100.0, trainable=False)
ops.append(
loss_moving_avg.assign(loss_moving_avg * 0.9 + total_loss * 0.1))
with tf.variable_scope("train_stats"): # Count steps for this problem.
problem_steps = tf.get_variable(
"problem_%d_steps" % n, initializer=0, trainable=False)
ops.append(problem_steps.assign_add(1))
with tf.control_dependencies(ops): # Make sure the ops run.
# Ensure the loss is a scalar here.
total_loss = tf.reshape(total_loss, [], name="total_loss_control_id")
return [total_loss, tf.concat(sharded_logits, 0)]
model_output = input_fn_builder.cond_on_index(
nth_model,
index_tensor=features["problem_choice"],
max_idx=len(hparams.problems) - 1) # total_loss and shared_logits
if mode == tf.estimator.ModeKeys.PREDICT:
# If beam searching, model_output will be a dict with keys "outputs" and
# "scores".
if isinstance(model_output, dict): # beam search
outputs = model_output["outputs"]
scores = model_output["scores"]
else:
outputs = model_output
scores = None
batched_problem_choice = (
features["problem_choice"] * tf.ones(
(tf.shape(features["inputs"])[0],), dtype=tf.int32))
predictions = {
"outputs": outputs,
"scores": scores,
"inputs": features.get("inputs", None),
"targets": features.get("infer_targets", None),
"problem_choice": batched_problem_choice,
}
_del_dict_nones(predictions) # delete the empty ones in predictions
export_out = {"outputs": predictions["outputs"]}
if "scores" in predictions:
export_out["scores"] = predictions["scores"]
return tf.estimator.EstimatorSpec(
mode,
predictions=predictions,
export_outputs={
"output": tf.estimator.export.PredictOutput(export_out)
})
total_loss, logits = model_output
if mode == tf.estimator.ModeKeys.EVAL:
eval_metrics_fns = metrics.create_evaluation_metrics(
hparams.problem_instances, hparams)
eval_metrics = {}
for metric_name, metric_fn in six.iteritems(eval_metrics_fns):
eval_metrics[metric_name] = metric_fn(logits, features)
return tf.estimator.EstimatorSpec(
mode,
predictions={"predictions": logits},
eval_metric_ops=eval_metrics,
loss=total_loss)
assert mode == tf.estimator.ModeKeys.TRAIN
# Set learning rate
learning_rate = hparams.learning_rate * optimize.learning_rate_decay(
hparams, num_worker_replicas=worker_replicas, num_train_steps=train_steps)
learning_rate /= math.sqrt(float(worker_replicas))
# Get global step
global_step = tf.train.get_or_create_global_step()
# Some training statistics.
with tf.name_scope("training_stats"):
tf.summary.scalar("learning_rate", learning_rate)
for n in xrange(len(hparams.problems)):
names_and_vars = []
with tf.variable_scope("losses_avg", reuse=True):
total_loss_var = tf.get_variable("problem_%d/total_loss" % n)
names_and_vars.append(("total_loss", total_loss_var))
with tf.variable_scope("losses_avg", reuse=True):
for loss_name in loss_variable_names:
if loss_name.startswith("problem_%d/" % n):
loss_var = tf.get_variable(loss_name)
loss_suffix = loss_name[loss_name.index("/") + 1:]
names_and_vars.append((loss_suffix, loss_var))
for (loss_name, loss_var) in names_and_vars:
tf.summary.scalar("loss_avg_%d/%s" % (n, loss_name), loss_var)
with tf.variable_scope("train_stats", reuse=True):
nth_steps = tf.get_variable("problem_%d_steps" % n, dtype=tf.int32)
tf.summary.scalar("problem_%d_frequency" % n,
tf.to_float(nth_steps) /
(tf.to_float(global_step) + 1.0))
# Add weight decay and noise.
total_size, weight_decay_loss = 0, 0.0
all_weights = {v.name: v for v in tf.trainable_variables()}
for v_name in sorted(list(all_weights)):
v = all_weights[v_name]
v_size = int(np.prod(np.array(v.shape.as_list())))
total_size += v_size
if hparams.weight_decay > 0.0 and len(v.shape.as_list()) > 1:
# Add weight regularization if set and the weight is not a bias (dim>1).
with tf.device(v._ref().device): # pylint: disable=protected-access
v_loss = tf.nn.l2_loss(v) / v_size
weight_decay_loss += v_loss
is_body = len(v_name) > 5 and v_name[:5] == "body/"
if hparams.weight_noise > 0.0 and is_body:
# Add weight noise if set in hparams.
with tf.device(v._ref().device): # pylint: disable=protected-access
scale = learning_rate * 0.001
noise = tf.truncated_normal(v.shape) * hparams.weight_noise * scale
noise_op = v.assign_add(noise)
with tf.control_dependencies([noise_op]):
total_loss = tf.identity(total_loss)
if hparams.weight_decay > 0.0:
total_loss += weight_decay_loss * hparams.weight_decay
# The new data reader occasionally emits very small batches, which
# cause the examples in those batches to be grossly overweighted.
# We decrease the loss proportionally to the ratio of the size of this
# batch to the size of the largest training batch ever.
# TODO(noam): to be more sophisticated, we could keep separate
# maxima based on problem choice.
max_nonpadding_var = tf.get_variable(
"max_nonpadding",
shape=[],
initializer=tf.ones_initializer(),
trainable=False)
max_nonpadding = tf.maximum(max_nonpadding_var, targets_nonpadding_tokens)
with tf.control_dependencies([tf.assign(max_nonpadding_var, max_nonpadding)]):
small_batch_multiplier = targets_nonpadding_tokens / max_nonpadding
tf.summary.scalar("small_batch_multiplier", small_batch_multiplier)
total_loss *= small_batch_multiplier
# Log variable sizes
_log_variable_sizes(tf.trainable_variables(), "Trainable Variables")
diet_vars = [
v for v in tf.global_variables() if v.dtype == dtypes.float16_ref
]
_log_variable_sizes(diet_vars, "Diet Variables")
# Optimize
train_op = optimize.optimize(total_loss, learning_rate, hparams)
# Remove summaries that will fail to run because they are in conditionals.
# TODO(cwhipkey): Test with this code removed, later in 2017.
summaries = tf.get_collection_ref(tf.GraphKeys.SUMMARIES)
for i in reversed(range(len(summaries))):
if summaries[i].name.startswith("cond_"):
del summaries[i]
tf.logging.info("Global model_fn finished.")
return tf.estimator.EstimatorSpec(
mode,
predictions={"problem_choice": features["problem_choice"]},
loss=total_loss,
train_op=train_op)
def build_model_fn(model, **kwargs):
"""Returns a function to build the model. See model_fn."""
# Model function as expected by Estimator
def wrapping_model_fn(features, labels, mode, params):
# Deep-copy the model hparams between modes to eliminate
# side-effects caused by abuse of the linked problem_hparams
# objects which are used to share modality objects between
# problems. We do not want to share the modality objects between
# modes, since the modality objects may decide to do something
# mode-specific. A better fix would be to stop abusing the
# hparams in this way and instead use a separate dictionary to
# share the modality objects between problems. This dictionary
# could be created once per mode and passed to the constructor of
# t2t_model.
hparams = copy.deepcopy(params)
del params
if labels is not None:
features["targets"] = labels
del labels
return model_fn(model, features, mode, hparams, **kwargs)
return wrapping_model_fn
def _log_variable_sizes(var_list, tag):
"""Log the sizes and shapes of variables, and the total size.
Args:
var_list: a list of varaibles
tag: a string
"""
name_to_var = {v.name: v for v in var_list}
total_size = 0
for v_name in sorted(list(name_to_var)):
v = name_to_var[v_name]
v_size = int(np.prod(np.array(v.shape.as_list())))
tf.logging.info("Weight %s\tshape %s\tsize %d",
v.name[:-2].ljust(80),
str(v.shape).ljust(20), v_size)
total_size += v_size
tf.logging.info("%s Total size: %d", tag, total_size)
def _get_variable_initializer(hparams):
if hparams.initializer == "orthogonal":
return tf.orthogonal_initializer(gain=hparams.initializer_gain)
elif hparams.initializer == "uniform":
max_val = 0.1 * hparams.initializer_gain
return tf.random_uniform_initializer(-max_val, max_val)
elif hparams.initializer == "normal_unit_scaling":
return tf.variance_scaling_initializer(
hparams.initializer_gain, mode="fan_avg", distribution="normal")
elif hparams.initializer == "uniform_unit_scaling":
return tf.variance_scaling_initializer(
hparams.initializer_gain, mode="fan_avg", distribution="uniform")
else:
raise ValueError("Unrecognized initializer: %s" % hparams.initializer)
def _del_dict_nones(d):
for k in list(d.keys()):
if d[k] is None:
del d[k]