From 924c909e340912623886dc5abbaebc96636fbbc9 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 15 Jun 2021 10:49:57 +0800 Subject: [PATCH 1/4] add pplayer --- python/paddle/distributed/collective.py | 4 +- .../paddle/distributed/fleet/base/topology.py | 15 +- .../fleet/meta_parallel/__init__.py | 1 + .../meta_parallel/parallel_layers/__init__.py | 1 + .../parallel_layers/pp_layers.py | 110 +++++++++ .../fleet/meta_parallel/pipeline_parallel.py | 2 + .../hybrid_parallel_shared_weight.py | 233 ++++++++++++++++++ 7 files changed, 361 insertions(+), 5 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/hybrid_parallel_shared_weight.py diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index f10b0736ef97fe..1bf30fd4a598aa 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -267,7 +267,9 @@ def new_group(ranks=None, backend=None): # TODO(shenliang03): This is a temporary solution to solve the problem of # hang caused by cross-creation of new_group - tmp = fill_constant([0], dtype="int32", value="1") + tmp = paddle.to_tensor( + [1], dtype="int32") if in_dygraph_mode() else fill_constant( + [0], dtype="int32", value="1") paddle.distributed.all_reduce(tmp, use_calc_stream=True) paddle.distributed.wait(tmp) return gp diff --git a/python/paddle/distributed/fleet/base/topology.py b/python/paddle/distributed/fleet/base/topology.py index 04d8417fdcbf3f..2108203d48cd64 100644 --- a/python/paddle/distributed/fleet/base/topology.py +++ b/python/paddle/distributed/fleet/base/topology.py @@ -107,6 +107,11 @@ def get_comm_list(self, axis_name): return all_result + def get_rank_from_stage(self, global_rank, **kwargs): + coord = self.get_coord(global_rank) + tf = coord._replace(**kwargs)._asdict() + return self.get_rank(**tf) + class HybridCommunicateGroup(object): def __init__(self, topology): @@ -254,7 +259,9 @@ def get_pipe_parallel_group(self): def get_check_parallel_group(self): return self._check_comm_group - def get_rank_from_stage(self, stage_id): - coord = self._topo.get_coord(self.global_rank) - tf = coord._replace(pipe=stage_id)._asdict() - return self._topo.get_rank(**tf) + def get_rank_from_stage(self, stage_id, **kwargs): + return self._topo.get_rank_from_stage( + self.global_rank, pipe=stage_id, **kwargs) + # coord = self._topo.get_coord(self.global_rank) + # tf = coord._replace(pipe=stage_id, **kwargs)._asdict() + # return self._topo.get_rank(**tf) diff --git a/python/paddle/distributed/fleet/meta_parallel/__init__.py b/python/paddle/distributed/fleet/meta_parallel/__init__.py index 0750c2c250e2bb..4e32ff5723c418 100644 --- a/python/paddle/distributed/fleet/meta_parallel/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/__init__.py @@ -17,6 +17,7 @@ from .parallel_layers import RowParallelLinear # noqa: F401 from .parallel_layers import ParallelCrossEntropy # noqa: F401 from .parallel_layers import LayerDesc # noqa: F401 +from .parallel_layers import SharedLayerDesc # noqa: F401 from .parallel_layers import PipelineLayer # noqa: F401 from .parallel_layers import RNGStatesTracker # noqa: F401 from .parallel_layers import model_parallel_random_seed # noqa: F401 diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py index 72da962b8914eb..fd977857490737 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/__init__.py @@ -17,6 +17,7 @@ from .mp_layers import RowParallelLinear # noqa: F401 from .mp_layers import ParallelCrossEntropy # noqa: F401 from .pp_layers import LayerDesc # noqa: F401 +from .pp_layers import SharedLayerDesc # noqa: F401 from .pp_layers import PipelineLayer # noqa: F401 from .random import RNGStatesTracker # noqa: F401 from .random import model_parallel_random_seed # noqa: F401 diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 77be62ae6cf4b4..c0e484ed6feb1c 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -15,6 +15,7 @@ import paddle from paddle.fluid.dygraph.layers import Layer from ...utils.log_util import logger, layer_to_str +from functools import partial __all__ = [] @@ -58,6 +59,20 @@ def __repr__(self): **self.kwargs) +class SharedLayerDesc(LayerDesc): + def __init__(self, + key, + layer_func, + forward_func=None, + shared_weight_attr='weight', + *inputs, + **kwargs): + super(SharedLayerDesc, self).__init__(layer_func, *inputs, **kwargs) + self.key = key + self.forward_func = forward_func + self.shared_weight_attr = shared_weight_attr + + class PipelineLayer(Layer): def __init__(self, layers, @@ -104,11 +119,92 @@ def __init__(self, self._start_pos = 0 self._end_pos = self._num_layers - 1 self._segment_network(seg_method) + self.shared_layers = paddle.nn.LayerDict() + self.shared_weight_attrs = {} # construct layer self.run_function = [] self._build_layer() + self.shared_comm = self._index_shared_layers() + self._synchronize_shared_weights() + + def stage_owner(self, layer_idx): + assert 0 <= layer_idx < self._num_layers + for stage in range(self._topo.get_dim('pipe')): + if self.segment_parts[stage] <= layer_idx < self.segment_parts[stage + + 1]: + return stage + + def _index_shared_layers(self): + shared_comm = {} + if self._topo.get_dim("pipe") == 1: + return + + layers_desc = self._layers_desc + shared_keys = set( + s.key for s in layers_desc if isinstance(s, SharedLayerDesc)) + for key in shared_keys: + # Find the layers that the tied module appears in + shared_layers = [] + for idx, layer in enumerate(layers_desc): + if isinstance(layer, SharedLayerDesc) and layer.key == key: + shared_layers.append(idx) + + shared_stages = set(self.stage_owner(idx) for idx in shared_layers) + self._dp_degree = self._topo.get_dim('data') + self._mp_degree = self._topo.get_dim('model') + + shared_ranks = [] + for dp in range(self._dp_degree): + for mp in range(self._mp_degree): + shared_ranks = [] + for s in sorted(shared_stages): + shared_ranks.append( + self._topo.get_rank_from_stage( + self.global_rank, pipe=s, data=dp, model=mp)) + + group = paddle.distributed.new_group(ranks=shared_ranks) + # logger.info("info of shared layer: ", group) + # Record this tied module if we own a local copy of it. + if self.global_rank in shared_ranks: + assert key in self.shared_layers + if key in self.shared_layers: + shared_comm[key] = { + 'ranks': shared_ranks, + 'group': group, + 'weight_attr': self.shared_weight_attrs[key], + 'layer': self.shared_layers[key], + } + # Only count the tied module once in the eyes of the FP16 optimizer + # if self.global_rank != shared_ranks[0]: + # for p in self.shared_layers[key].parameters(): + # p.is_distributed = False + return shared_comm + + def _synchronize_shared_weights(self): + for key, comm in self.shared_comm.items(): + paddle.distributed.broadcast( + getattr(comm['layer'], comm['weight_attr']), + src=min(comm['ranks']), + group=comm['group'], ) + + def allreduce_shared_weight_gradients(self): + '''All reduce the gradients of the tied weights between tied stages''' + for key, comm in self.shared_comm.items(): + param = getattr(self.shared_layers[key], comm['weight_attr']) + with paddle.framework.no_grad(): + paddle.fluid.framework._dygraph_tracer().trace_op( + type="c_allreduce_sum", + inputs={'X': param._grad_ivar()}, + outputs={'Out': param._grad_ivar()}, + attrs={ + 'ring_id': comm['group'].id, + 'use_calc_stream': True + }) + + # paddle.distributed.all_reduce(weight.grad, group=comm['group']) + def _segment_network(self, seg_method): logger.info("start segment network..") seg = SegmentLayers( @@ -142,6 +238,20 @@ def _build_layer(self): if isinstance(layer, Layer): self.run_function.append(layer) self.add_sublayer(str(layer_index), layer) + elif isinstance(layer, SharedLayerDesc): + if layer.key not in self.shared_layers: + self.shared_layers[layer.key] = layer.build_layer() + self.shared_weight_attrs[ + layer.key] = layer.shared_weight_attr + + if layer.forward_func is None: + self.run_function.append(self.shared_layers[layer.key]) + + else: + self.run_function.append( + partial(layer.forward_func, self.shared_layers[ + layer.key])) + elif isinstance(layer, LayerDesc): model = layer.build_layer() self.run_function.append(model) diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index 54324b389336d0..0bb6315290ed72 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -138,6 +138,8 @@ def train_batch(self, data, optimizer, lr_scheduler=None): self._backward(cache_id=backward_steps) backward_steps += 1 + self._layers.allreduce_shared_weight_gradients() + # optimizer self._step() self.train_loss = self._reduce_final_loss() diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_shared_weight.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_shared_weight.py new file mode 100644 index 00000000000000..9253f737bf9429 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_shared_weight.py @@ -0,0 +1,233 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import unittest +import paddle +import numpy as np +import random +import paddle +import paddle.distributed as dist +import paddle.distributed.fleet as fleet +from paddle.fluid.dygraph.container import Sequential +from paddle.distributed.fleet.meta_parallel import PipelineLayer +from paddle.fluid.dygraph.layers import Layer +import paddle.nn as nn +import paddle.fluid as fluid +from paddle.distributed.fleet.meta_parallel import LayerDesc, SharedLayerDesc + + +def print_hook_fn(grad): + print(grad) + + +def set_random_seed(seed, dp_id, rank_id): + """Set random seed for reproducability.""" + random.seed(seed) + np.random.seed(seed + dp_id) + paddle.seed(seed + dp_id) + + +batch_size = 8 +micro_batch_size = 2 +vocab_size = 128 +hidden_size = 16 + + +class SimpleNet(Layer): + def __init__(self): + super(SimpleNet, self).__init__() + self.word_embeddings = nn.Embedding(vocab_size, hidden_size) + + self.softmax_weight = self.create_parameter( + shape=[hidden_size, vocab_size]) + self.softmax_bias = self.create_parameter( + shape=[vocab_size], is_bias=False) + + def forward(self, x1, x2, y1): + x_emb = self.word_embeddings(x1) + fc = fluid.layers.matmul(x_emb, self.softmax_weight) + fc = fluid.layers.elementwise_add(fc, self.softmax_bias) + projection = fluid.layers.reshape(fc, shape=[-1, vocab_size]) + + projection = paddle.matmul(projection, self.word_embeddings.weight) + + loss = fluid.layers.softmax_with_cross_entropy( + logits=projection, label=y1, soft_label=False) + return loss.mean() + + +class EmbeddingPipe(Layer): + def __init__(self): + super(EmbeddingPipe, self).__init__() + self.word_embeddings = nn.Embedding(vocab_size, hidden_size) + + @property + def embedding_weight(self): + return self.word_embeddings.weight + + def forward(self, args): + x1, x2 = args + x_emb = self.word_embeddings(x1) + return x_emb, x2 + + +class MatmulNet(Layer): + def __init__(self): + super(MatmulNet, self).__init__() + self.softmax_weight = self.create_parameter( + shape=[hidden_size, vocab_size]) + + def forward(self, args): + x1, x2 = args + fc = fluid.layers.matmul(x1, self.softmax_weight) + + return fc, x2 + + +class BiasNet(Layer): + def __init__(self): + super(BiasNet, self).__init__() + self.softmax_bias = self.create_parameter(shape=[vocab_size]) + + def forward(self, args): + fc, x2 = args + fc = fluid.layers.elementwise_add(fc, self.softmax_bias) + projection = fluid.layers.reshape(fc, shape=[-1, vocab_size]) + return projection, x2 + + +class LossNet(Layer): + def __init__(self): + super(LossNet, self).__init__() + + def forward(self, args, y1): + projection = args + loss = fluid.layers.softmax_with_cross_entropy( + logits=projection, label=y1[0], soft_label=False) + return loss.mean() + + +class SimpleNetPipe(PipelineLayer): + def __init__(self, **kwargs): + self.descs = [] + self.descs.append( + SharedLayerDesc( + 'embed', EmbeddingPipe, shared_weight_attr='embedding_weight')) + self.descs.append(LayerDesc(MatmulNet)) + + self.descs.append(LayerDesc(BiasNet)) + + def _logits_helper(embedding, output): + return paddle.matmul(output[0], embedding.embedding_weight) + + self.descs.append( + SharedLayerDesc( + 'embed', + EmbeddingPipe, + forward_func=_logits_helper, + shared_weight_attr='embedding_weight')) + + super(SimpleNetPipe, self).__init__( + layers=self.descs, loss_fn=LossNet(), **kwargs) + + +class TestDistEmbeddingTraning(unittest.TestCase): + def setUp(self): + strategy = fleet.DistributedStrategy() + self.model_parallel_size = 1 + self.data_parallel_size = 1 + self.pipeline_parallel_size = 2 + strategy.hybrid_configs = { + "dp_degree": self.data_parallel_size, + "mp_degree": self.model_parallel_size, + "pp_degree": self.pipeline_parallel_size, + } + strategy.pipeline_configs = { + "accumulate_steps": batch_size // micro_batch_size, + "micro_batch_size": micro_batch_size + } + fleet.init(is_collective=True, strategy=strategy) + + def test_pp_model(self): + hcg = fleet.get_hybrid_communicate_group() + word_size = hcg.get_model_parallel_world_size() + dp_id = hcg.get_data_parallel_rank() + pp_id = hcg.get_stage_id() + rank_id = dist.get_rank() + set_random_seed(1024, dp_id, rank_id) + + #construct model a + model_a = SimpleNet() + scheduler_a = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True) + optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a, + parameters=model_a.parameters()) + + model_b = SimpleNetPipe(topology=hcg.topology()) + + scheduler_b = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2, 3, 4], values=[0.01, 0.02, 0.03, 0.04], verbose=True) + optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b, + parameters=model_b.parameters()) + model_b = fleet.distributed_model(model_b) + optimizer_b = fleet.distributed_optimizer(optimizer_b) + + param_len = len(model_a.parameters()) + + parameters = [] + for param in model_a.parameters(): + parameters.append(param.numpy()) + + model_b_params = model_b.parameters() + + if pp_id == 0: + model_b_params[0].set_value(parameters[2]) + model_b_params[1].set_value(parameters[0]) + + else: + model_b_params[0].set_value(parameters[2]) + model_b_params[1].set_value(parameters[1]) + + for step in range(5): + x1_data = np.random.randint(0, vocab_size, size=[batch_size, 1]) + x2_data = np.random.randint(0, vocab_size, size=[batch_size, 1]) + y1_data = np.random.randint(0, hidden_size, size=[batch_size, 1]) + + x1 = paddle.to_tensor(x1_data) + x2 = paddle.to_tensor(x2_data) + y1 = paddle.to_tensor(y1_data) + + x1.stop_gradient = True + x2.stop_gradient = True + y1.stop_gradient = True + + loss_a = model_a(x1, x2, y1) + loss_a.backward() + + optimizer_a.step() + optimizer_a.clear_grad() + scheduler_a.step() + + loss_b = model_b.train_batch([(x1, x2), (y1, )], optimizer_b, + scheduler_b) + + print("loss", loss_a.numpy(), loss_b.numpy()) + np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy()) + + +if __name__ == "__main__": + unittest.main() From a345e2613eab26378c793987e9febacb9ca44db9 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 15 Jun 2021 16:24:49 +0800 Subject: [PATCH 2/4] add sharedlayerdesc --- .../paddle/distributed/fleet/base/topology.py | 3 -- .../parallel_layers/pp_layers.py | 53 +++++++++---------- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/python/paddle/distributed/fleet/base/topology.py b/python/paddle/distributed/fleet/base/topology.py index 2108203d48cd64..850f3581421705 100644 --- a/python/paddle/distributed/fleet/base/topology.py +++ b/python/paddle/distributed/fleet/base/topology.py @@ -262,6 +262,3 @@ def get_check_parallel_group(self): def get_rank_from_stage(self, stage_id, **kwargs): return self._topo.get_rank_from_stage( self.global_rank, pipe=stage_id, **kwargs) - # coord = self._topo.get_coord(self.global_rank) - # tf = coord._replace(pipe=stage_id, **kwargs)._asdict() - # return self._topo.get_rank(**tf) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index c0e484ed6feb1c..b31b2939695b33 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -68,7 +68,7 @@ def __init__(self, *inputs, **kwargs): super(SharedLayerDesc, self).__init__(layer_func, *inputs, **kwargs) - self.key = key + self.layer_name = key self.forward_func = forward_func self.shared_weight_attr = shared_weight_attr @@ -126,32 +126,33 @@ def __init__(self, self.run_function = [] self._build_layer() - self.shared_comm = self._index_shared_layers() + self.shared_comm = self._construct_shared_comm() self._synchronize_shared_weights() - def stage_owner(self, layer_idx): - assert 0 <= layer_idx < self._num_layers + def get_stage_from_index(self, layer_idx): + assert 0 <= layer_idx < self._num_layers, "layer_idx is out of bound" for stage in range(self._topo.get_dim('pipe')): if self.segment_parts[stage] <= layer_idx < self.segment_parts[stage + 1]: return stage - def _index_shared_layers(self): + def _construct_shared_comm(self): shared_comm = {} if self._topo.get_dim("pipe") == 1: return layers_desc = self._layers_desc - shared_keys = set( - s.key for s in layers_desc if isinstance(s, SharedLayerDesc)) - for key in shared_keys: - # Find the layers that the tied module appears in + shared_layer_names = set( + s.layer_name for s in layers_desc if isinstance(s, SharedLayerDesc)) + for key in shared_layer_names: shared_layers = [] for idx, layer in enumerate(layers_desc): - if isinstance(layer, SharedLayerDesc) and layer.key == key: + if isinstance(layer, + SharedLayerDesc) and layer.layer_name == key: shared_layers.append(idx) - shared_stages = set(self.stage_owner(idx) for idx in shared_layers) + shared_stages = set( + self.get_stage_from_index(idx) for idx in shared_layers) self._dp_degree = self._topo.get_dim('data') self._mp_degree = self._topo.get_dim('model') @@ -165,8 +166,6 @@ def _index_shared_layers(self): self.global_rank, pipe=s, data=dp, model=mp)) group = paddle.distributed.new_group(ranks=shared_ranks) - # logger.info("info of shared layer: ", group) - # Record this tied module if we own a local copy of it. if self.global_rank in shared_ranks: assert key in self.shared_layers if key in self.shared_layers: @@ -176,23 +175,20 @@ def _index_shared_layers(self): 'weight_attr': self.shared_weight_attrs[key], 'layer': self.shared_layers[key], } - # Only count the tied module once in the eyes of the FP16 optimizer - # if self.global_rank != shared_ranks[0]: - # for p in self.shared_layers[key].parameters(): - # p.is_distributed = False return shared_comm def _synchronize_shared_weights(self): for key, comm in self.shared_comm.items(): - paddle.distributed.broadcast( - getattr(comm['layer'], comm['weight_attr']), - src=min(comm['ranks']), - group=comm['group'], ) + with paddle.framework.no_grad(): + paddle.distributed.broadcast( + getattr(comm['layer'], comm['weight_attr']), + src=min(comm['ranks']), + group=comm['group']) def allreduce_shared_weight_gradients(self): - '''All reduce the gradients of the tied weights between tied stages''' for key, comm in self.shared_comm.items(): param = getattr(self.shared_layers[key], comm['weight_attr']) + # need use trace_op to allreduce weight with paddle.framework.no_grad(): paddle.fluid.framework._dygraph_tracer().trace_op( type="c_allreduce_sum", @@ -203,8 +199,6 @@ def allreduce_shared_weight_gradients(self): 'use_calc_stream': True }) - # paddle.distributed.all_reduce(weight.grad, group=comm['group']) - def _segment_network(self, seg_method): logger.info("start segment network..") seg = SegmentLayers( @@ -239,18 +233,19 @@ def _build_layer(self): self.run_function.append(layer) self.add_sublayer(str(layer_index), layer) elif isinstance(layer, SharedLayerDesc): - if layer.key not in self.shared_layers: - self.shared_layers[layer.key] = layer.build_layer() + if layer.layer_name not in self.shared_layers: + self.shared_layers[layer.layer_name] = layer.build_layer() self.shared_weight_attrs[ - layer.key] = layer.shared_weight_attr + layer.layer_name] = layer.shared_weight_attr if layer.forward_func is None: - self.run_function.append(self.shared_layers[layer.key]) + self.run_function.append(self.shared_layers[ + layer.layer_name]) else: self.run_function.append( partial(layer.forward_func, self.shared_layers[ - layer.key])) + layer.layer_name])) elif isinstance(layer, LayerDesc): model = layer.build_layer() From cc9fbd8bfa1678fc5d971eb7c9472c938bfa7894 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 15 Jun 2021 16:26:51 +0800 Subject: [PATCH 3/4] add sharedlayerdesc --- .../tests/unittests/test_parallel_dygraph_pipeline_parallel.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py index 1d06e168208b27..ef8ee2e4ad4454 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py @@ -27,6 +27,9 @@ def test_hybrid_parallel_pp_layer(self): def test_hybrid_parallel_pp_tuple_inputs(self): self.run_mnist_2gpu('hybrid_parallel_pp_embedding.py') + def test_hybrid_parallel_pp_tuple_inputs(self): + self.run_mnist_2gpu('hybrid_parallel_shared_weight.py') + if __name__ == "__main__": unittest.main() From 2959e06215434121afc396336034852b24df68b1 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 15 Jun 2021 16:28:23 +0800 Subject: [PATCH 4/4] add sharedlayerdesc --- .../paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py index 23dae317386918..317eb14ad069e2 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py @@ -270,8 +270,8 @@ def test_parallel_embedding(self): np.testing.assert_allclose(loss_a.numpy(), loss_b.numpy()) def test_parallel_cross_entropy(self): - batch_size = 2 - seq_length = 1 + batch_size = 8 + seq_length = 16 class_size_per_card = 2 vocab_size = class_size_per_card * self.model_parallel_size seed = 1025