Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions python/paddle/fluid/incubate/fleet/collective/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import os
import sys
import six


class LambConfig(object):
Expand Down Expand Up @@ -99,14 +100,16 @@ def __init__(self):
self.use_local_sgd = False
self.use_dist_fc = False

self.local_sgd_config = None # LocalSGDConfig
self.dist_fc_config = None # DistFCConfig
self.mode = "nccl2" # or collective
self.collective_mode = None # local_sgd or grad_allreduce
self.nccl_comm_num = 1

self.exec_strategy = fluid.ExecutionStrategy()

# configurations below are used for unit test
self._ut4grad_allreduce = False


class CollectiveOpBasedOptimizer(DistributedOptimizer):
"""
Expand Down Expand Up @@ -161,7 +164,7 @@ def apply_gradients(self, params_grads):
return self._optimizer.apply_gradients(params_grads)

def _check_condition(self, name, **kwargs):
for k, v in kwargs.iterms():
for k, v in six.iteritems(kwargs):
if v is True:
assert False, "you can't use %s and %s together" % (name, k)

Expand All @@ -170,12 +173,13 @@ def _check_collective_mode(self, main_program, optimizer, strategy):
Check the conflict condtions.
"""
if strategy.use_local_sgd:
strategy.mode = "collective"
strategy.collective_mode = "local_sgd"
self._check_condition(
"use_local_sgd",
use_dgc=main_program._enable_dgc,
use_dist_fc=strategy.use_dist_fc,
use_lamb=main_program._use_lamb)
assert strategy.local_sgd_config is not None, "DistributedStrategy.local_sgd_config should be set"

if strategy.use_dist_fc:
self._check_condition(
Expand All @@ -185,6 +189,14 @@ def _check_collective_mode(self, main_program, optimizer, strategy):
use_lamb=main_program._use_lamb)
assert strategy.dist_fc_config is not None, "DistributedStrategy.dist_fc_config should be set"

if strategy._ut4grad_allreduce:
strategy.mode = "collective"
strategy.collective_mode = "grad_allreduce"
self._check_condition(
"_ut4grad_allreduce",
use_dgc=main_program._enable_dgc,
use_lamb=main_program._use_lamb)

if self._strategy.collective_mode=="local_sgd" \
or self._strategy.collective_mode == "grad_allreduce":
assert self._strategy.mode == "collective", \
Expand Down
12 changes: 12 additions & 0 deletions python/paddle/fluid/tests/unittests/test_dist_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def run_gpu_fleet_api_trainer(self, args):
dist_strategy.exec_strategy = exec_strategy
dist_strategy.fuse_memory_size = 1 #MB
dist_strategy.fuse_laryer_size = 1
if args.use_local_sgd:
dist_strategy.use_local_sgd = True
if args.ut4grad_allreduce:
dist_strategy._ut4grad_allreduce = True

role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
Expand Down Expand Up @@ -396,6 +400,8 @@ def runtime_main(test_class):
parser.add_argument('--enable_backward_deps', action='store_true')
parser.add_argument('--use_hallreduce', action='store_true')
parser.add_argument('--gpu_fleet_api', action='store_true')
parser.add_argument('--use_local_sgd', action='store_true')
parser.add_argument('--ut4grad_allreduce', action='store_true')
parser.add_argument(
'--hallreduce_inter_nranks', type=int, required=False, default=2)
parser.add_argument(
Expand Down Expand Up @@ -478,6 +484,8 @@ def setUp(self):
self._nccl_comm_num = 1
self._enable_backward_deps = False
self._gpu_fleet_api = False
self._use_local_sgd = False
self._ut4grad_allreduce = False
self._use_hallreduce = False
self._setup_config()
self._after_setup_config()
Expand Down Expand Up @@ -731,6 +739,10 @@ def _get_nccl2_trainer_cmd(self, model, ep, update_method, trainer_id,

if self._gpu_fleet_api:
tr_cmd += " --gpu_fleet_api"
if self._use_local_sgd:
tr_cmd += " --use_local_sgd"
if self._ut4grad_allreduce:
tr_cmd += " --ut4grad_allreduce"

if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
import unittest
from test_dist_base import TestDistBase


class TestDistMnistLocalSGDFleetApi(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._nccl2_mode = True
self._gpu_fleet_api = True
self._use_local_sgd = True

def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place("dist_mnist.py", delta=1e-5)


class TestDistMnistGradAllReduceFleetApi(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._nccl2_mode = True
self._gpu_fleet_api = True
self._ut4grad_allreduce = True

def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place("dist_mnist.py", delta=1e-5)


if __name__ == "__main__":
unittest.main()
6 changes: 4 additions & 2 deletions python/paddle/fluid/transpiler/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,12 @@ def _transpile_startup_program(self):
Collective._transpile_startup_program(self)

block = self.startup_program.global_block()
non_dist_params = []
for param in block.iter_parameters():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix failure in python35 ut. For the ordered set in block should not be modified in iteration.

if param.is_distributed:
continue
if not param.is_distributed:
non_dist_params.append(param)

for param in non_dist_params:
snapshot = block.create_var(
name=self.snapshot_name(param.name),
shape=param.shape,
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/fluid/transpiler/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ def _transpile_collective(self,

transpiler = None
if collective_mode == 'grad_allreduce':
transpiler = collective.GradAllReduce()
transpiler = collective.GradAllReduce(self.config.nccl_comm_num)
elif collective_mode == 'local_sgd':
transpiler = collective.LocalSGD()
transpiler = collective.LocalSGD(self.config.nccl_comm_num)
else:
raise ValueError('invalid collective_mode: %s' % collective_mode)

Expand Down