Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions paddle/fluid/imperative/reducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,21 @@ void Reducer::PrepareDeps(const std::unordered_set<GradOpNode *> &init_nodes) {
PADDLE_ENFORCE_NOT_NULL(
grad_pending_node,
platform::errors::NotFound("Grad pending node should not be null"));
// py_layer is not supported in DataParallel
auto begin = grad_pending_node->begin();
auto end = grad_pending_node->end();
for (auto op_base = begin; op_base != end; op_base++) {
PADDLE_ENFORCE_EQ(
op_base->Type() != "py_layer", true,
platform::errors::PreconditionNotMet(
"Note: Currently PyLayer is not supported in DataParallel. For "
"using PyLayer in a DataParallel model, you can skip gradient "
"synchronization among multiple cards by 'no_sync', and "
"manually implement 'all_reduce' before model optimization. "
"There is an example showing specific implemetation processing "
"in offical docs: https://www.paddlepaddle.org.cn/documentation"
"/docs/api/paddle/DataParallel_cn.html"));
}
++node_deps_[grad_pending_node.get()];
if (visited.count(grad_pending_node.get()) == 0) {
visited.insert(grad_pending_node.get());
Expand Down
12 changes: 9 additions & 3 deletions python/paddle/distributed/fleet/utils/hybrid_parallel_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from paddle import framework
import paddle
from paddle.fluid import core
import paddle.distributed as dist
from paddle.fluid.dygraph.parallel import _split_tensors, sync_params_buffers, build_groups
from collections import OrderedDict
from .log_util import logger
Expand All @@ -44,8 +45,13 @@ def _apply_collective_grads(parameters, comm_group):

for coalesced_grad, _, _ in coalesced_grads_and_vars:
# need to div nranks
div_factor = paddle.to_tensor(
comm_group.nranks, dtype=coalesced_grad.dtype)
if comm_group is None:
# support for DataParallel
div_factor = paddle.to_tensor(
dist.get_world_size(), dtype=coalesced_grad.dtype)
else:
div_factor = paddle.to_tensor(
comm_group.nranks, dtype=coalesced_grad.dtype)
paddle.fluid.framework._dygraph_tracer().trace_op(
type="elementwise_div",
inputs={'X': coalesced_grad,
Expand Down Expand Up @@ -115,7 +121,7 @@ def broadcast_dp_parameters(model, hcg):


def fused_allreduce_gradients(parameter_list, hcg):
data_parallel_group = hcg.get_data_parallel_group()
data_parallel_group = None if hcg is None else hcg.get_data_parallel_group()
logger.debug("dp start fuse allreduce gradients")
with framework.no_grad():
_apply_collective_grads(parameter_list, data_parallel_group)
Expand Down
148 changes: 108 additions & 40 deletions python/paddle/fluid/dygraph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,51 +426,119 @@ class DataParallel(layers.Layer):
Layer: The data paralleled module.

Examples:

.. code-block:: python

# required: distributed
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
:name: dp-example

# required: distributed
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist

class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)

def forward(self, x):
return self._linear2(self._linear1(x))

def train():
# 1. initialize parallel environment
dist.init_parallel_env()

# 2. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)

loss_fn = nn.MSELoss()
adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())

class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
# 3. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)

def forward(self, x):
return self._linear2(self._linear1(x))
loss.backward()

def train():
# 1. initialize parallel environment
dist.init_parallel_env()
adam.step()
adam.clear_grad()

if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()


.. note::
``PyLayer`` is not supported in DataParallel. To solve problems of this kind,
it's recommended to skip gradient synchronization among multiple cards by 'no_sync',
and manually implement 'all_reduce' before model optimization. There is an example
showing specific implemetation processing.

Examples:

.. code-block:: python
:name: dp-pylayer-example

# required: distributed
import numpy
import paddle
import paddle.distributed as dist
from paddle.autograd import PyLayer
from paddle.distributed.fleet.utils.hybrid_parallel_util import fused_allreduce_gradients

class cus_tanh(PyLayer):
@staticmethod
def forward(ctx, x):
y = paddle.tanh(x)
ctx.save_for_backward(y)
return y

@staticmethod
def backward(ctx, dy):
y, = ctx.saved_tensor()
grad = dy * (1 - paddle.square(y))
return grad

class SimpleNet(paddle.nn.Layer):
def __init__(self):
super(SimpleNet, self).__init__()
self.linear = paddle.nn.Linear(2, 2)

def forward(self, inputs):
inputs = cus_tanh.apply(inputs)
return self.linear(inputs)

if __name__ == '__main__':
dist.init_parallel_env()

model = SimpleNet()
model = paddle.DataParallel(model)
opt = paddle.optimizer.SGD(learning_rate=0.01, parameters=model.parameters())

for step in range(10):
x_data = numpy.random.randn(2, 2).astype(numpy.float32)
x = paddle.to_tensor(x_data)
x.stop_gradient = False

# step 1 : skip gradient synchronization by 'no_sync'
with model.no_sync():
y_pred = model(x)
loss = y_pred.mean()
loss.backward()

# step 2 : fuse + allreduce manually before optimization
fused_allreduce_gradients(list(model.parameters()), None)

opt.step()
opt.clear_grad()

# 2. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)

loss_fn = nn.MSELoss()
adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())

# 3. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)

loss.backward()

adam.step()
adam.clear_grad()

if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
"""

def __init__(self,
Expand Down
3 changes: 3 additions & 0 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync_gradient_check)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel_with_pylayer)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_parallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_tensor_parallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_sharding_parallel)
Expand Down Expand Up @@ -203,6 +204,7 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM))
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync_gradient_check)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel_with_pylayer)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_parallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_tensor_parallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sharding_parallel)
Expand Down Expand Up @@ -953,6 +955,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)
set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_no_sync PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_no_sync_gradient_check PROPERTIES TIMEOUT 30)
set_tests_properties(test_parallel_dygraph_dataparallel_with_pylayer PROPERTIES TIMEOUT 30)
set_tests_properties(test_parallel_dygraph_pipeline_parallel PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_sharding_parallel PROPERTIES TIMEOUT 120)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import division
from __future__ import print_function

import unittest

import paddle
import numpy as np
import paddle.distributed as dist
from paddle.fluid.dygraph.nn import Linear
from paddle.autograd import PyLayer
from paddle.distributed.fleet.utils.hybrid_parallel_util import fused_allreduce_gradients

batch = 5
in_dim = 20
out_dim = 10


class cus_tanh(PyLayer):
@staticmethod
def forward(ctx, x):
y = paddle.tanh(x)
ctx.save_for_backward(y)
return y

@staticmethod
def backward(ctx, dy):
y, = ctx.saved_tensor()
grad = dy * (1 - paddle.square(y))
return grad


class SimpleNet(paddle.nn.Layer):
def __init__(self, train_id, model_id):
super(SimpleNet, self).__init__()
self.w = self.create_parameter(shape=[in_dim, batch], dtype="float32")
self.linear = paddle.nn.Linear(in_dim, out_dim)
self.tanh = paddle.tanh

self.trainer_id = train_id
self.model_id = model_id

def forward(self, inputs):
if self.model_id == 0:
inputs = cus_tanh.apply(inputs)
else:
inputs = self.tanh(inputs)

inputs = paddle.matmul(self.w, inputs)
return self.linear(inputs)


class TestDistTraning(unittest.TestCase):
def test_multiple_gpus(self):
self.trainer_id = dist.get_rank()
dist.init_parallel_env()

model_a = SimpleNet(self.trainer_id, 0)
model_b = SimpleNet(self.trainer_id, 1)

state_dict = model_a.state_dict()
model_b.set_state_dict(state_dict)

model_a = paddle.DataParallel(model_a)
model_b = paddle.DataParallel(model_b)

for step in range(10):
x_data = np.random.randn(batch, in_dim).astype(np.float32)
x = paddle.to_tensor(x_data)
x.stop_gradient = False

with model_a.no_sync():
y_pred_a = model_a(x)
loss_a = y_pred_a.mean()
loss_a.backward()
fused_allreduce_gradients(list(model_a.parameters()), None)

y_pred_b = model_b(x)
loss_b = y_pred_b.mean()
loss_b.backward()

self.check_gradient(model_a.parameters())
self.check_gradient(model_b.parameters())

self.check_acc(model_a._layers.w.grad, model_b._layers.w.grad)

model_a.clear_gradients()
model_b.clear_gradients()

def check_acc(self, grad, acc_grad):
grad = grad.numpy() if grad is not None else None
acc_grad = acc_grad.numpy() if acc_grad is not None else None
return np.testing.assert_allclose(grad, acc_grad, rtol=1e-6)

def broadcast_param(self, param, root):
paddle.distributed.broadcast(param, root)
return param

def check_gradient(self, params):
other_param = []
for param in params:
if param.trainable and (param._grad_ivar() is not None):
grad = param._grad_ivar()
other_grad = self.broadcast_param(grad.clone(), root=1)
if self.trainer_id == 0:
np.testing.assert_allclose(other_grad.numpy(), grad.numpy())


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import unittest
import paddle.fluid as fluid

from test_parallel_dygraph_dataparallel import TestMultipleGpus


class TestDataParallelLayer(TestMultipleGpus):
def test_parallel_dygraph_dataparallel_with_pylayer(self):
self.run_mnist_2gpu('parallel_dygraph_dataparallel_with_pylayer.py')


if __name__ == "__main__":
unittest.main()
Loading