From 26ce6f9ca887cdb2e82abaf321694724bc341292 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 10 Aug 2021 18:43:33 +0800 Subject: [PATCH 1/8] add save/load for pipelineparallel --- .../parallel_layers/pp_layers.py | 55 ++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index a3c6a5b5fb665f..31d09392b3bcf8 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -11,12 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import math -import paddle import re +import glob +import os +from functools import partial + +import paddle from paddle.fluid.dygraph.layers import Layer from ...utils.log_util import logger, layer_to_str -from functools import partial __all__ = [] @@ -310,3 +314,50 @@ def forward(self, input): for layer in self.run_function: input = layer(input) return input + + def save_state_dict(self, path): + if self._topo.get_coord(self.global_rank).data != 0: + return + + def _offset_dirname(ckpt_dir, local_layer_idx): + idx = local_layer_idx + self._start_pos + model_rank = self._topo.get_coord(self.global_rank).model + rank_message = "-tensor_" + "{:0>2d}".format(model_rank) + layer_save_path = os.path.join(ckpt_dir, + 'layer_{:0>2d}'.format(idx)) + layer_save_path = layer_save_path + rank_message + '-model_states.pdparams' + return layer_save_path + + os.makedirs(path, exist_ok=True) + for idx, layer in enumerate(self.run_function): + model_save_path = _offset_dirname(path, idx) + if not hasattr(layer, 'state_dict'): + continue + paddle.save(layer.state_dict(), model_save_path) + + logger.info("save model state successfully...") + + def set_state_dir(self, path): + assert os.path.exists( + path), "{} not found, please check the path".format(path) + + for idx, layer in enumerate(self.run_function): + if not hasattr(layer, 'set_state_dict'): + continue + layer_idx = idx + self._start_pos + layer_save_path = os.path.join(path, + 'layer_{0:0>2d}'.format(layer_idx)) + model_files = glob.glob(layer_save_path + "*model_states.pdparams") + model_files.sort() + mp_rank = self._topo.get_coord(self.global_rank).model + mp_world_size = self._topo.get_dim('model') + num_files = len(model_files) + assert num_files == mp_world_size, "Currently, only support num_files({}) is equal to mp_world_size({})".format( + num_files, mp_world_size) + load_param_path = model_files[mp_rank] + model_state_dict = paddle.load(load_param_path) + layer.set_state_dict(model_state_dict) + + self._synchronize_shared_weights() + + logger.info("load model state successfully...") From 764dc74e380825531971baceebe0b0ac2aeac833 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 10 Aug 2021 21:12:31 +0800 Subject: [PATCH 2/8] add save/load --- .../fleet/meta_parallel/parallel_layers/pp_layers.py | 3 ++- .../fluid/tests/unittests/hybrid_parallel_pp_transformer.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 31d09392b3bcf8..3b85724185ec9a 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -315,7 +315,7 @@ def forward(self, input): input = layer(input) return input - def save_state_dict(self, path): + def save_state_dict(self, path, save_rng=False): if self._topo.get_coord(self.global_rank).data != 0: return @@ -335,6 +335,7 @@ def _offset_dirname(ckpt_dir, local_layer_idx): continue paddle.save(layer.state_dict(), model_save_path) + # save rng state logger.info("save model state successfully...") def set_state_dir(self, path): diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py index 62b1a8b1da6797..b2041423ff3b4b 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py @@ -86,7 +86,7 @@ def forward(self, x, mask): product = layers.matmul(x=q, y=k, transpose_y=True, alpha=d_model**-0.5) weights = F.softmax(product + mask) - weights = F.dropout(weights, 0.2) + # weights = F.dropout(weights, 0.2) tgt = layers.matmul(weights, v) residual = tgt tgt = self.norm1(tgt) From 12b4b452727f24762e7078240aeb791e80274a9c Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 10 Aug 2021 21:12:53 +0800 Subject: [PATCH 3/8] add save/load --- .../unittests/hybrid_parallel_pp_save_load.py | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py new file mode 100644 index 00000000000000..bfb925c6adec7d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py @@ -0,0 +1,118 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import unittest +import paddle +import numpy as np +import random +import paddle.distributed as dist +import paddle.distributed.fleet as fleet +from paddle.fluid import layers +import paddle.nn.functional as F +from paddle.distributed.fleet.meta_parallel import PipelineLayer, LayerDesc +from paddle.fluid.dygraph.layers import Layer +import paddle.nn as nn +from hybrid_parallel_pp_transformer import ModelPipe, set_random_seed +import os + +batch_size = 8 +length = 8 +micro_batch_size = 2 +vocab_size = 128 +hidden_size = 16 +d_model = hidden_size +dim_feedforward = 4 * d_model + + +class TestDistPPSaveLoadTraning(unittest.TestCase): + def setUp(self): + strategy = fleet.DistributedStrategy() + self.model_parallel_size = 1 + self.data_parallel_size = 1 + self.pipeline_parallel_size = 2 + strategy.hybrid_configs = { + "dp_degree": self.data_parallel_size, + "mp_degree": self.model_parallel_size, + "pp_degree": self.pipeline_parallel_size, + } + strategy.pipeline_configs = { + "accumulate_steps": batch_size // micro_batch_size, + "micro_batch_size": micro_batch_size + } + fleet.init(is_collective=True, strategy=strategy) + + def test_pp_model(self): + hcg = fleet.get_hybrid_communicate_group() + word_size = hcg.get_model_parallel_world_size() + dp_id = hcg.get_data_parallel_rank() + pp_id = hcg.get_stage_id() + rank_id = dist.get_rank() + topology = hcg.topology() + set_random_seed(1024, dp_id, rank_id) + + model = ModelPipe(topology) + scheduler = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2], values=[0.001, 0.002], verbose=True) + optimizer = paddle.optimizer.SGD(learning_rate=scheduler, + parameters=model.parameters()) + + model = fleet.distributed_model(model) + optimizer = fleet.distributed_optimizer(optimizer) + + output_dir = "./pp_save_load" + # warmup step + for step_id in range(3): + x_data = np.random.randint(0, vocab_size, size=[batch_size, length]) + x = paddle.to_tensor(x_data) + x.stop_gradient = True + loss = model.train_batch([x, x], optimizer, scheduler) + + model._layers.save_state_dict(output_dir) + paddle.save(optimizer.state_dict(), + os.path.join(output_dir, "model_state.pdopt")) + + # construct data + test_steps = 5 + np_data = np.random.randint( + 0, vocab_size, size=[test_steps, batch_size, length]) + + origin_loss = [] + for step_id in range(5): + x_data = np_data[step_id, :] + x = paddle.to_tensor(x_data) + x.stop_gradient = True + loss = model.train_batch([x, x], optimizer, scheduler) + # print("loss: ", loss.numpy()) + origin_loss.append(loss.numpy()) + + # test step + model._layers.set_state_dir(output_dir) + opt_dict = paddle.load(os.path.join(output_dir, "model_state.pdopt")) + optimizer.set_state_dict(opt_dict) + + for step_id in range(5): + x_data = np_data[step_id, :] + x = paddle.to_tensor(x_data) + x.stop_gradient = True + loss = model.train_batch([x, x], optimizer, scheduler) + print("origin loss: ", origin_loss[step_id], "current loss: ", + loss.numpy()) + np.testing.assert_allclose(loss.numpy(), origin_loss[step_id]) + + +if __name__ == "__main__": + unittest.main() From 8db02aed101855e586006e3fc9b11541e9d7b665 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 10 Aug 2021 22:31:21 +0800 Subject: [PATCH 4/8] add rng for save/load --- .../parallel_layers/pp_layers.py | 37 ++++++++++++++++++- .../meta_parallel/parallel_layers/random.py | 12 ++++++ .../unittests/hybrid_parallel_pp_save_load.py | 2 +- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 3b85724185ec9a..a2bff8467e6635 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -16,11 +16,14 @@ import re import glob import os +import numpy as np +import random from functools import partial import paddle from paddle.fluid.dygraph.layers import Layer from ...utils.log_util import logger, layer_to_str +from .random import get_rng_state_tracker __all__ = [] @@ -316,6 +319,24 @@ def forward(self, input): return input def save_state_dict(self, path, save_rng=False): + # save rng state + dp_rank = self._topo.get_coord(self.global_rank).data + pp_rank = self._topo.get_coord(self.global_rank).pipe + mp_rank = self._topo.get_coord(self.global_rank).model + + if save_rng: + rng_dict = {} + rng_dict['random_rng_state'] = random.getstate() + rng_dict['np_rng_state'] = np.random.get_state() + rng_dict['cuda_rng_state'] = paddle.get_cuda_rng_state() + rng_dict['rng_tracker_states'] \ + = get_rng_state_tracker().get_states_tracker() + paddle.save(rng_dict, + os.path.join( + path, + "dp{:0>2d}_pp{:0>2d}_mp{:0>2d}_rng.pdparams".format( + dp_rank, pp_rank, mp_rank))) + if self._topo.get_coord(self.global_rank).data != 0: return @@ -335,7 +356,6 @@ def _offset_dirname(ckpt_dir, local_layer_idx): continue paddle.save(layer.state_dict(), model_save_path) - # save rng state logger.info("save model state successfully...") def set_state_dir(self, path): @@ -361,4 +381,19 @@ def set_state_dir(self, path): self._synchronize_shared_weights() + # set rng + dp_rank = self._topo.get_coord(self.global_rank).data + pp_rank = self._topo.get_coord(self.global_rank).pipe + mp_rank = self._topo.get_coord(self.global_rank).model + rng_save_path = os.path.join( + path, "dp{:0>2d}_pp{:0>2d}_mp{:0>2d}_rng.pdparams".format( + dp_rank, pp_rank, mp_rank)) + if os.path.exists(rng_save_path): + rng_dict = paddle.load(rng_save_path) + random.setstate(rng_dict['random_rng_state']) + np.random.set_state(rng_dict['np_rng_state']) + paddle.set_cuda_rng_state(rng_dict['cuda_rng_state']) + get_rng_state_tracker().set_states_tracker(rng_dict[ + 'cuda_rng_state']) + logger.info("load model state successfully...") diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py index 70daa3b25365e4..ec80ba71036c06 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py @@ -20,6 +20,9 @@ MODEL_PARALLEL_RNG = 'model_parallel_rng' +# This file is inspired by Megatron to control random states for MP: +# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/mpu/random.py + class RNGStatesTracker: """ @@ -46,6 +49,15 @@ def add(self, name, seed): self.states_[name] = paddle.get_cuda_rng_state() paddle.set_cuda_rng_state(orig_rng_state) + def get_states_tracker(self): + states = {} + for name in self.states_: + states[name] = self.states_[name] + return states + + def set_states_tracker(self, states): + self.states_ = states + @contextlib.contextmanager def rng_state(self, name=MODEL_PARALLEL_RNG): if name not in self.states_: diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py index bfb925c6adec7d..ea4fa477ae3cd2 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py @@ -81,7 +81,7 @@ def test_pp_model(self): x.stop_gradient = True loss = model.train_batch([x, x], optimizer, scheduler) - model._layers.save_state_dict(output_dir) + model._layers.save_state_dict(output_dir, save_rng=True) paddle.save(optimizer.state_dict(), os.path.join(output_dir, "model_state.pdopt")) From b0fe11dd3bbde2e1df1bd6925be74fe7216c70cf Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 10 Aug 2021 22:32:51 +0800 Subject: [PATCH 5/8] add save/load --- .../parallel_layers/pp_layers.py | 34 ------------------- .../meta_parallel/parallel_layers/random.py | 12 ------- 2 files changed, 46 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index a2bff8467e6635..fe0e51344cd654 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -319,24 +319,6 @@ def forward(self, input): return input def save_state_dict(self, path, save_rng=False): - # save rng state - dp_rank = self._topo.get_coord(self.global_rank).data - pp_rank = self._topo.get_coord(self.global_rank).pipe - mp_rank = self._topo.get_coord(self.global_rank).model - - if save_rng: - rng_dict = {} - rng_dict['random_rng_state'] = random.getstate() - rng_dict['np_rng_state'] = np.random.get_state() - rng_dict['cuda_rng_state'] = paddle.get_cuda_rng_state() - rng_dict['rng_tracker_states'] \ - = get_rng_state_tracker().get_states_tracker() - paddle.save(rng_dict, - os.path.join( - path, - "dp{:0>2d}_pp{:0>2d}_mp{:0>2d}_rng.pdparams".format( - dp_rank, pp_rank, mp_rank))) - if self._topo.get_coord(self.global_rank).data != 0: return @@ -380,20 +362,4 @@ def set_state_dir(self, path): layer.set_state_dict(model_state_dict) self._synchronize_shared_weights() - - # set rng - dp_rank = self._topo.get_coord(self.global_rank).data - pp_rank = self._topo.get_coord(self.global_rank).pipe - mp_rank = self._topo.get_coord(self.global_rank).model - rng_save_path = os.path.join( - path, "dp{:0>2d}_pp{:0>2d}_mp{:0>2d}_rng.pdparams".format( - dp_rank, pp_rank, mp_rank)) - if os.path.exists(rng_save_path): - rng_dict = paddle.load(rng_save_path) - random.setstate(rng_dict['random_rng_state']) - np.random.set_state(rng_dict['np_rng_state']) - paddle.set_cuda_rng_state(rng_dict['cuda_rng_state']) - get_rng_state_tracker().set_states_tracker(rng_dict[ - 'cuda_rng_state']) - logger.info("load model state successfully...") diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py index ec80ba71036c06..70daa3b25365e4 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py @@ -20,9 +20,6 @@ MODEL_PARALLEL_RNG = 'model_parallel_rng' -# This file is inspired by Megatron to control random states for MP: -# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/mpu/random.py - class RNGStatesTracker: """ @@ -49,15 +46,6 @@ def add(self, name, seed): self.states_[name] = paddle.get_cuda_rng_state() paddle.set_cuda_rng_state(orig_rng_state) - def get_states_tracker(self): - states = {} - for name in self.states_: - states[name] = self.states_[name] - return states - - def set_states_tracker(self, states): - self.states_ = states - @contextlib.contextmanager def rng_state(self, name=MODEL_PARALLEL_RNG): if name not in self.states_: From 70e4f2ddb114298ce142bb9d9b8e36abe39e0632 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 10 Aug 2021 22:47:45 +0800 Subject: [PATCH 6/8] add utest for save/load --- .../parallel_layers/pp_layers.py | 2 +- .../unittests/hybrid_parallel_pp_save_load.py | 22 ++++++++----------- ...test_parallel_dygraph_pipeline_parallel.py | 3 +++ 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index fe0e51344cd654..79664f0ed4ffa9 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -318,7 +318,7 @@ def forward(self, input): input = layer(input) return input - def save_state_dict(self, path, save_rng=False): + def save_state_dict(self, path): if self._topo.get_coord(self.global_rank).data != 0: return diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py index ea4fa477ae3cd2..e6e27bbb41a8a4 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_save_load.py @@ -19,23 +19,17 @@ import paddle import numpy as np import random +import os +import shutil +import tempfile import paddle.distributed as dist import paddle.distributed.fleet as fleet -from paddle.fluid import layers -import paddle.nn.functional as F -from paddle.distributed.fleet.meta_parallel import PipelineLayer, LayerDesc -from paddle.fluid.dygraph.layers import Layer -import paddle.nn as nn from hybrid_parallel_pp_transformer import ModelPipe, set_random_seed -import os batch_size = 8 length = 8 micro_batch_size = 2 vocab_size = 128 -hidden_size = 16 -d_model = hidden_size -dim_feedforward = 4 * d_model class TestDistPPSaveLoadTraning(unittest.TestCase): @@ -72,16 +66,16 @@ def test_pp_model(self): model = fleet.distributed_model(model) optimizer = fleet.distributed_optimizer(optimizer) + output_dir = tempfile.mkdtemp() - output_dir = "./pp_save_load" # warmup step - for step_id in range(3): + for step_id in range(2): x_data = np.random.randint(0, vocab_size, size=[batch_size, length]) x = paddle.to_tensor(x_data) x.stop_gradient = True loss = model.train_batch([x, x], optimizer, scheduler) - model._layers.save_state_dict(output_dir, save_rng=True) + model._layers.save_state_dict(output_dir) paddle.save(optimizer.state_dict(), os.path.join(output_dir, "model_state.pdopt")) @@ -96,7 +90,6 @@ def test_pp_model(self): x = paddle.to_tensor(x_data) x.stop_gradient = True loss = model.train_batch([x, x], optimizer, scheduler) - # print("loss: ", loss.numpy()) origin_loss.append(loss.numpy()) # test step @@ -113,6 +106,9 @@ def test_pp_model(self): loss.numpy()) np.testing.assert_allclose(loss.numpy(), origin_loss[step_id]) + # finally, remove the model/optimizer path + shutil.rmtree(output_dir) + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py index 62e781678c9fc8..003e0c1685cae7 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py @@ -36,6 +36,9 @@ def test_pipeline_parallel(self): def test_hybrid_parallel_transformer(self): self.run_mnist_2gpu('hybrid_parallel_pp_transformer.py') + def test_hybrid_parallel_transformer(self): + self.run_mnist_2gpu('hybrid_parallel_pp_save_load.py') + if __name__ == "__main__": unittest.main() From ab50feb704c67fdf87f65cd2b50548d960da0124 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Tue, 10 Aug 2021 22:51:16 +0800 Subject: [PATCH 7/8] add utest for save/load --- .../distributed/fleet/meta_parallel/parallel_layers/pp_layers.py | 1 - .../fluid/tests/unittests/hybrid_parallel_pp_transformer.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 79664f0ed4ffa9..8da0668f319289 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -23,7 +23,6 @@ import paddle from paddle.fluid.dygraph.layers import Layer from ...utils.log_util import logger, layer_to_str -from .random import get_rng_state_tracker __all__ = [] diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py index b2041423ff3b4b..524099c6ab05e8 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_transformer.py @@ -86,6 +86,7 @@ def forward(self, x, mask): product = layers.matmul(x=q, y=k, transpose_y=True, alpha=d_model**-0.5) weights = F.softmax(product + mask) + # TODO(shenliang03) For save/load in PipeLineParallel, can’t support dropout temporarily. # weights = F.dropout(weights, 0.2) tgt = layers.matmul(weights, v) residual = tgt From 994b694203df15891166be7dbbdd50a154f60965 Mon Sep 17 00:00:00 2001 From: shenliang03 Date: Wed, 11 Aug 2021 12:12:15 +0800 Subject: [PATCH 8/8] add save/load --- .../fleet/meta_parallel/parallel_layers/pp_layers.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 8da0668f319289..f546adc65ea714 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -354,9 +354,8 @@ def set_state_dir(self, path): mp_rank = self._topo.get_coord(self.global_rank).model mp_world_size = self._topo.get_dim('model') num_files = len(model_files) - assert num_files == mp_world_size, "Currently, only support num_files({}) is equal to mp_world_size({})".format( - num_files, mp_world_size) - load_param_path = model_files[mp_rank] + + load_param_path = model_files[mp_rank * num_files // mp_world_size] model_state_dict = paddle.load(load_param_path) layer.set_state_dict(model_state_dict)