Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions python/paddle/distributed/fleet/base/fleet_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import warnings
import paddle
import os
from types import MethodType
import numpy as np
from paddle.fluid.framework import dygraph_only, _global_flags
from paddle.fluid import compiler
Expand All @@ -33,7 +34,7 @@
from ..meta_parallel import TensorParallel, model_parallel_random_seed
from ..meta_parallel import PipelineParallel, ShardingParallel
from ..meta_optimizers import HybridParallelOptimizer
from ..meta_optimizers import HybridParallelGradScaler
from paddle import _C_ops

__all__ = []

Expand Down Expand Up @@ -1540,4 +1541,36 @@ def minimize(self,

@dygraph_only
def distributed_scaler(self, scaler):
return HybridParallelGradScaler(scaler, self._hcg)
def unscale_method(self, optimizer):
if not self._enable:
return
if getattr(optimizer, '_param_groups', None) and isinstance(
optimizer._param_groups[0], dict):
param_grads = []
for group in optimizer._param_groups:
for param in group['params']:
if param._grad_ivar() is not None:
param_grads.append(param._grad_ivar())
else:
param_grads = [
param._grad_ivar() for param in optimizer._parameter_list
if param._grad_ivar() is not None
]
_C_ops.check_finite_and_unscale(param_grads, self._scale,
param_grads, self._found_inf)

self._found_inf = paddle.cast(self._found_inf, dtype="int32")

# TODO(shenliang03) Since dp allreduce in the optimizer is
# after the gradscaler, check_finite needs to synchronize global
# information. In the future, we should use check_group to speed.
paddle.distributed.all_reduce(
self._found_inf, op=paddle.distributed.ReduceOp.MAX, group=None)
self._found_inf = paddle.cast(self._found_inf, dtype="bool")

# Only tensor_parallel and pipeline_parallel need to modify scaler
if self._hcg.get_parallel_mode() in (ParallelMode.TENSOR_PARALLEL,
ParallelMode.PIPELINE_PARALLEL):
scaler._unscale = MethodType(unscale_method, scaler)

return scaler
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@
__all__ = []


def _obtain_optimizer_parameters_list(optimizer):
if getattr(optimizer, '_param_groups', None) and isinstance(
optimizer._param_groups[0], dict):
parameters_list = []
for group in optimizer._param_groups:
for param in group['params']:
parameters_list.append(param)
else:
parameters_list = [param for param in optimizer._parameter_list]

return parameters_list


class HybridParallelClipGrad:
def __init__(self, clip, hcg):
self._clip = clip
Expand Down Expand Up @@ -98,27 +111,31 @@ def __init__(self, optimizer, hcg, strategy):

self._need_dp = (self._hcg.get_data_parallel_world_size() > 1)

# NOTE(shenliang03): Because of the pure DataParallel mode, the gradient synchronization
# is achieved through reducer, so there is no need to call fuse_allreduce in oprimizer.
self._dp_enable = not self._use_dp_mode and self._need_dp

self._sharding_enable = (
self._hcg.get_sharding_parallel_world_size() > 1)

if isinstance(self._inner_opt._grad_clip,
ClipGradByGlobalNorm) and not self._use_dp_mode:
logger.warning("using ClipGradByGlobalNorm in TensorParallel, the origin " \
"optmizer'grad clip will be changed.")

self._inner_opt._grad_clip = HybridParallelClipGrad(
self._inner_opt._grad_clip, hcg)

@imperative_base.no_grad
@framework.dygraph_only
def step(self):

parameters_list = _obtain_optimizer_parameters_list(self._inner_opt)
if self._sharding_enable:
sharding_reduce_gradients(
list(self._inner_opt._parameter_list), self._hcg)
sharding_reduce_gradients(list(parameters_list), self._hcg)

if self._dp_enable:
fused_allreduce_gradients(list(parameters_list), self._hcg)

if not self._use_dp_mode and self._need_dp:
fused_allreduce_gradients(
list(self._inner_opt._parameter_list), self._hcg)
self._inner_opt.step()

@imperative_base.no_grad
Expand All @@ -128,16 +145,18 @@ def minimize(self,
parameters=None,
no_grad_set=None):

# minimize does not support parameters in the form of param_group,
# so no need use _obtain_optimizer_parameters_list
parameter_list = parameters if parameters \
else self._inner_opt._parameter_list

# Here shardinng should use global parameter list
# Here sharding should use global parameter list
if self._sharding_enable:
sharding_reduce_gradients(
list(self._inner_opt._parameter_list), self._hcg)
sharding_reduce_gradients(list(parameter_list), self._hcg)

if not self._use_dp_mode and self._need_dp:
if self._dp_enable:
fused_allreduce_gradients(list(parameter_list), self._hcg)

return self._inner_opt.minimize(loss, startup_program, parameter_list,
no_grad_set)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ def __init__(self, layers, hcg, strategy):
def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None):
assert isinstance(optimizer, HybridParallelOptimizer), (
'optimizer should be HybridParallelOptimizer subclass.')
if scaler is not None:
assert isinstance(scaler, HybridParallelGradScaler), (
'scaler should be HybridParallelGradScaler subclass or None.')

assert fluid.framework._dygraph_tracer()._has_grad, (
'Please enable the generation of gradients.')

Expand Down Expand Up @@ -212,7 +210,12 @@ def eval_batch(self, data, compute_loss=False):
if not last_iter:
input_tensor = p2p.recv_forward()

return self.total_loss if self._compute_loss else output_buffers
if self._compute_loss:
self.train_loss = self._broadcast_final_loss()
else:
self.train_loss = output_buffers

return self.train_loss

def _forward_step(self, input_tensor):
if self.stage_id == 0:
Expand Down Expand Up @@ -325,7 +328,7 @@ def _broadcast_final_loss(self):

def _optimizer_step(self):
if self.scaler:
self.scaler.minimize(self.optimizer, self.train_loss)
self.scaler.step(self.optimizer)
else:
self.optimizer.step()

Expand Down
8 changes: 6 additions & 2 deletions python/paddle/fluid/tests/unittests/hybrid_parallel_mp_amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ def build_optimizer(self, model):
learning_rate=0.001, gamma=0.999, verbose=True)
optimizer = paddle.optimizer.SGD(scheduler,
grad_clip=grad_clip,
parameters=model.parameters())
parameters=[{
'params': model.parameters(),
'weight_decay': 0.001,
'learning_rate': 0.1
}])
return optimizer

def train_batch(self, batch, model, optimizer, is_mp):
Expand All @@ -43,7 +47,7 @@ def train_batch(self, batch, model, optimizer, is_mp):
scaled = scaler.scale(loss) # scale the loss
scaled.backward() # do backward

scaler.minimize(optimizer, scaled) # update parameters
scaler.step(optimizer) # update parameters
optimizer.clear_grad()
return scaled

Expand Down