Skip to content
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
30744a5
add base classes
seiriosPlus Apr 10, 2019
593bfd3
implement distributed transpiler with fleet
seiriosPlus Apr 11, 2019
ce85700
implement distributed transpiler with fleet
seiriosPlus Apr 12, 2019
1391b01
implement pslib with fleet
seiriosPlus Apr 15, 2019
30835b0
implement pslib with fleet
seiriosPlus Apr 15, 2019
f03164b
implement pslib with fleet
seiriosPlus Apr 15, 2019
126ba5d
code structure update
seiriosPlus Apr 16, 2019
d13dfe3
code structure update, setup.py update
seiriosPlus Apr 16, 2019
9c1b1b9
update import in python
seiriosPlus Apr 17, 2019
29cf469
add the implementation for fleet collective
Apr 17, 2019
e0ad7d5
Merge branch 'feature/fleet' of https://github.com/seiriosPlus/Paddle…
Apr 17, 2019
02ce432
update role in pslib
seiriosPlus Apr 17, 2019
31c0f83
export 'get_worker_endpoints' and 'get_current_id'
Apr 17, 2019
61cda9b
update role in pslib
seiriosPlus Apr 17, 2019
24bbdf3
add split files in base
seiriosPlus Apr 17, 2019
0259d40
Merge branch 'feature/fleet' of ssh.github.com:seiriosPlus/Paddle int…
seiriosPlus Apr 17, 2019
de2a8f1
role maker fix, test=develop
seiriosPlus Apr 18, 2019
3206781
unify public apis for fleet collective mode
Apr 18, 2019
24c93d7
add API annotation, test=develop
seiriosPlus Apr 18, 2019
436fedd
add API annotation, test=develop
seiriosPlus Apr 18, 2019
f84a866
fix in rename, test=develop
seiriosPlus Apr 19, 2019
f51f727
upadte API.spec
seiriosPlus Apr 22, 2019
9d17d48
rename raise
seiriosPlus Apr 22, 2019
136f7a1
Merge branch 'develop' of github.com:PaddlePaddle/Paddle into feature…
seiriosPlus Apr 22, 2019
122de19
upadte API.spec, test=develop
seiriosPlus Apr 22, 2019
a0a3047
fix in test base, test=develop
seiriosPlus Apr 22, 2019
cffce47
fix sync in test_listen_and_serv_op
seiriosPlus Apr 23, 2019
03fa38f
fix sync in test_listen_and_serv_op, test=develop
seiriosPlus Apr 23, 2019
5d0719d
test=develop
seiriosPlus Apr 23, 2019
80049f9
upadte API.spec, test=develop
seiriosPlus Apr 23, 2019
2325f59
Merge branch 'feature/fleet' of ssh.github.com:seiriosPlus/Paddle int…
seiriosPlus Apr 23, 2019
b79d393
upadte comment, test=develop
seiriosPlus Apr 23, 2019
10f041a
remove Optimizer, test=develop
seiriosPlus Apr 24, 2019
b0f19f6
remove Optimizer, reformated comments, test=develop
seiriosPlus Apr 24, 2019
578e746
reformated comments, test=develop
seiriosPlus Apr 24, 2019
fabda81
update instance of Optimizer, test=develop
seiriosPlus Apr 24, 2019
16d1030
sync_mode in transpile compatibility, test=develop
seiriosPlus Apr 24, 2019
7ef165f
reformated api spec, test=develop
seiriosPlus Apr 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions paddle/fluid/API.spec
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ paddle.fluid.DistributeTranspiler.get_pserver_program (ArgSpec(args=['self', 'en
paddle.fluid.DistributeTranspiler.get_pserver_programs (ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None), ('document', '78f4949aedf317666a89ca74b3748ba8'))
paddle.fluid.DistributeTranspiler.get_startup_program (ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)), ('document', 'd796fc0c8d51503b556fcf6dc15c4f0c'))
paddle.fluid.DistributeTranspiler.get_trainer_program (ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)), ('document', '736330e31a7a54abccc0c7fd9119d9ff'))
paddle.fluid.DistributeTranspiler.transpile (ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174')), ('document', '06ce55338dfe96311ad1078235ab3bf4'))
paddle.fluid.DistributeTranspiler.transpile (ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, None, '127.0.0.1:6174')), ('document', '951af0a910f9c264723da78ad555f3df'))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change may affect previous examples I guess, could you check the API change for test_case or public examples?

paddle.fluid.memory_optimize (ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level', 'skip_grads'], varargs=None, keywords=None, defaults=(None, False, 0, False)), ('document', 'eda17d0f1639bc6ca215cecf87f588a4'))
paddle.fluid.release_memory (ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ac4114d3df16264f1946deb3a8434a6f'))
paddle.fluid.DistributeTranspilerConfig.__init__
Expand Down Expand Up @@ -429,7 +429,7 @@ paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program (ArgSpec(args=[
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs (ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None), ('document', '78f4949aedf317666a89ca74b3748ba8'))
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program (ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)), ('document', 'd796fc0c8d51503b556fcf6dc15c4f0c'))
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program (ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)), ('document', '736330e31a7a54abccc0c7fd9119d9ff'))
paddle.fluid.transpiler.DistributeTranspiler.transpile (ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174')), ('document', '06ce55338dfe96311ad1078235ab3bf4'))
paddle.fluid.transpiler.DistributeTranspiler.transpile (ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, None, '127.0.0.1:6174')), ('document', '951af0a910f9c264723da78ad555f3df'))
paddle.fluid.transpiler.memory_optimize (ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level', 'skip_grads'], varargs=None, keywords=None, defaults=(None, False, 0, False)), ('document', 'eda17d0f1639bc6ca215cecf87f588a4'))
paddle.fluid.transpiler.release_memory (ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ac4114d3df16264f1946deb3a8434a6f'))
paddle.fluid.transpiler.HashName.__init__ (ArgSpec(args=['self', 'pserver_endpoints'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
Expand All @@ -444,6 +444,12 @@ paddle.fluid.nets.sequence_conv_pool (ArgSpec(args=['input', 'num_filters', 'fil
paddle.fluid.nets.glu (ArgSpec(args=['input', 'dim'], varargs=None, keywords=None, defaults=(-1,)), ('document', '08c1c57e1db6b20bf87b264cb7cf3ca8'))
paddle.fluid.nets.scaled_dot_product_attention (ArgSpec(args=['queries', 'keys', 'values', 'num_heads', 'dropout_rate'], varargs=None, keywords=None, defaults=(1, 0.0)), ('document', '921714c9bfb351b41403418265393203'))
paddle.fluid.nets.img_conv_group (ArgSpec(args=['input', 'conv_num_filter', 'pool_size', 'conv_padding', 'conv_filter_size', 'conv_act', 'param_attr', 'conv_with_batchnorm', 'conv_batchnorm_drop_rate', 'pool_stride', 'pool_type', 'use_cudnn'], varargs=None, keywords=None, defaults=(1, 3, None, None, False, 0.0, 1, 'max', True)), ('document', '3802be78fbfb206dae64a2d9f8480970'))
paddle.fluid.optimizer.Optimizer.__init__ (ArgSpec(args=['self', 'learning_rate', 'regularization', 'name'], varargs=None, keywords=None, defaults=(None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.Optimizer.apply_gradients (ArgSpec(args=['self', 'params_grads'], varargs=None, keywords=None, defaults=None), ('document', 'bfe7305918552aaecfdaa22411dbe871'))
paddle.fluid.optimizer.Optimizer.apply_optimize (ArgSpec(args=['self', 'loss', 'startup_program', 'params_grads'], varargs=None, keywords=None, defaults=None), ('document', '5c46d1926a40f1f873ffe9f37ac89dae'))
paddle.fluid.optimizer.Optimizer.backward (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set', 'callbacks'], varargs=None, keywords=None, defaults=(None, None, None, None)), ('document', 'ba3a113d0229ff7bc9d39bda0a6d947f'))
paddle.fluid.optimizer.Optimizer.get_opti_var_name_list (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.Optimizer.minimize (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '35fd5d3330c97903528c7e0dacc7f6ea'))
paddle.fluid.optimizer.SGDOptimizer.__init__ (ArgSpec(args=['self', 'learning_rate', 'regularization', 'name'], varargs=None, keywords=None, defaults=(None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.SGDOptimizer.apply_gradients (ArgSpec(args=['self', 'params_grads'], varargs=None, keywords=None, defaults=None), ('document', 'bfe7305918552aaecfdaa22411dbe871'))
paddle.fluid.optimizer.SGDOptimizer.apply_optimize (ArgSpec(args=['self', 'loss', 'startup_program', 'params_grads'], varargs=None, keywords=None, defaults=None), ('document', '5c46d1926a40f1f873ffe9f37ac89dae'))
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/fluid/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def global_shuffle(self, fleet=None):

Examples:
>>> import paddle.fluid as fluid
>>> import paddle.fluid.incubate.fleet.parameter_server as fleet
>>> from paddle.fluid.incubate.fleet.pslib import fleet
>>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset")
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
Expand Down
303 changes: 303 additions & 0 deletions python/paddle/fluid/incubate/fleet/base/fleet_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import abc
import sys

from enum import Enum

from paddle.fluid.optimizer import Optimizer

from role_maker import RoleMakerBase, Role
from role_maker import MPISymetricRoleMaker
from role_maker import UserDefinedRoleMaker


class Mode(Enum):
TRANSPILER = 1,
PSLIB = 2,
COLLECTIVE = 3
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the difference between those modes?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three distributed training mode.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add more information in Mode .



class Fleet(object):
"""
Fleet is the base class, transpiler and pslib are implementation of Fleet.
"""
__metaclass__ = abc.ABCMeta

def __init__(self, mode):
"""
Init Fleet with mode, mode must be an instance of Mode.

Args:
mode(Mode): the implementation of Fleet's mode.

Returns:
None
"""

assert isinstance(mode, Mode)
self.is_initialized = False
self.mode = mode
self.workers = 0
self.servers = 0
self.worker_endpoints = []
self.server_endpoints = []
self.role = Role.WORKER
self.current_endpoint = None
self.current_id = 0
self.optimizer = None
self.role_maker_ = None

def is_first_worker(self):
"""
Check whether the node is the first instance of worker.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add Examples here if you want to make this public to users.


Returns:
bool: True if this is the first node of worker,
False if not.
"""
return self.is_worker() and self.current_id == 0

def worker_id(self):
"""
Get current worker id.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think worker_id should be clarified here. In Collective mode, a worker id corresponds a gpu device card. In Parameter Server mode, worker id should be a pod that runs multi-thread training.


Returns:
int: node id
"""
return self.current_id

def get_workers(self):
"""
Get current total worker number.

Returns:
int: worker number
"""
return self.workers

def is_worker(self):
"""
Check whether the node is an instance of worker.

Returns:
bool: True if this is a node of worker,
False if not.
"""
return self.role == Role.WORKER

def is_server(self):
"""
Check whether the node is an instance of server.

Returns:
bool: True if this is a node of server,
False if not.
"""
return self.role == Role.SERVER

def split_files(self, files):
"""
split files before distributed training,
for example, files is [a, b, c ,d, e] and trainer_num = 2,
then trainer 0 gets [a, b, c] and trainer 1 gets [d, e]

Args:
files(list): file list need to be read.

Returns:
list: files belongs to this worker.
"""
file_num = len(files)
trainer_id = self.worker_id()
trainer_num = self.get_workers()
if trainer_num > file_num:
raise ValueError("trainer_num should be <= file_num : "
"%s > %s" % (trainer_num, file_num))
start = 0
end = 0
for i in range(0, trainer_id + 1):
length = file_num / trainer_num + (i < (file_num % trainer_num))
start = end
end += length
return files[start:end]

def init(self, role_maker=None):
"""
should be called only once in user's python scripts,
init() will initialize RoleMaker which is used for identifying
current node's role, e.g. worker, server, etc.

Args:
role_maker(RoleMakerBase): subclass of RoleMakerBase.

Returns:
None
"""

if role_maker and not isinstance(role_maker, RoleMakerBase):
raise ValueError("role_maker must be an instance of RoleMakerBase")

self.role_maker_ = role_maker

if isinstance(role_maker, MPISymetricRoleMaker):
self.role_maker_._generate_role()
self.role = Role.WORKER if role_maker._is_worker() else Role.SERVER
self.workers = role_maker._worker_num()
self.servers = role_maker._server_num()
self.server_endpoints = role_maker._get_pserver_endpoints()
self.worker_endpoints = role_maker._get_trainer_endpoints()
self.current_id = role_maker._worker_index(
) if role_maker._is_worker() else role_maker._server_index()
self.current_endpoint = self.worker_endpoints[self.current_id] \
if role_maker._is_worker() else self.server_endpoints[self.current_id]

elif isinstance(role_maker, UserDefinedRoleMaker):
self.current_id = role_maker.current_id
self.current_endpoint = role_maker.current_endpoint
self.workers = role_maker.workers
self.worker_endpoints = role_maker.worker_endpoints
self.servers = role_maker.servers
self.server_endpoints = role_maker.server_endpoints
self.role = role_maker.role

else:
raise ValueError(
"role_maker must be an instance of UserDefinedRoleMaker/MPISymetricRoleMaker"
)

self.is_initialized = True

@abc.abstractmethod
def init_worker(self, executor):
pass

@abc.abstractmethod
def run_worker(self, executor, main_program=None):
pass

@abc.abstractmethod
def init_server(self, executor, model_dir=None):
pass

@abc.abstractmethod
def run_server(self, executor):
pass

@abc.abstractmethod
def stop_worker(self):
pass

@abc.abstractmethod
def stop(self, executor):
pass

@abc.abstractmethod
def distributed_optimizer(self, optimizer, strategy=None):
pass

@abc.abstractmethod
def save_inference_model(self,
executor,
dirname,
feeded_var_names,
target_vars,
main_program=None,
export_for_deployment=True):
pass

@abc.abstractmethod
def save_persistables(self, executor, dirname, main_program=None):
pass

def to_string(self):
infos = """
mode = {}
workers = {}
server_endpoints = {}
role = {}
current_endpoint = {}
current_id = {}
""".format(self.mode, self.workers, self.server_endpoints, self.role,
self.current_endpoint, self.current_id)
return infos


class DistributedOptimizer(object):
"""
DistributedOptimizer is a wrapper for paddle.fluid.optimizer
A user should pass a paddle.fluid.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
"""
__metaclass__ = abc.ABCMeta

def __init__(self, optimizer, strategy=None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain the args.

if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer")

if strategy and not isinstance(strategy, dict):
raise ValueError("strategy must be an instance of Dict")

self._optimizer = optimizer
self._strategy = strategy

@abc.abstractmethod
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None):
"""
Currently, backward function can not be called through DistributedOptimizer
"""
pass

@abc.abstractmethod
def apply_gradients(self, params_grads):
"""
Currently, apply_gradients function can not be called through DistributedOptimizer
"""
pass

@abc.abstractmethod
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
"""
minimize a program through loss, loss can be a list in DistributedOptimizer
Args:
loss (Variable|Variable List): loss variable or loss variable list to run optimization.
startup_program (Program): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
Returns:
tuple: (optimize_ops, params_grads) which are, list of operators appended;
and list of (param, grad) Variables pair for optimization.
Note that in parameter server mode, a worker will not get anything about optimize_os
Because optmizer algorithms run on pserver side. We will make this usable in pserver
process, but currently the optimization part is written into Fleet(). A user does not
need to care about how to startup a pserver node.
"""
pass
Loading