Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
268163c
Add gloo-based barrier op for CPU device, and support to call it as f…
bliu3650 Aug 13, 2021
c54a762
Add example code: Example -> Examples.
bliu3650 Aug 14, 2021
16e4fdf
Enable both barrier op and barrier func, share same init process and …
bliu3650 Aug 14, 2021
1044b71
Correct gloo barrier examples which should not be pseudo code.
bliu3650 Aug 16, 2021
f7dbe46
Update Gloo barrier api examples with multiprocessing.
bliu3650 Aug 16, 2021
ab13104
fix typo in release_gloo examples.
bliu3650 Aug 16, 2021
2e2a64d
Implement gloo barrier test as standard python unittest.
bliu3650 Aug 16, 2021
35d241a
Config CMakelist.txt for gloo barrier unittest to be called with DIST…
bliu3650 Aug 16, 2021
1aceaff
Debug CI why test_collective_cpu_barrier_with_gloo failed.
bliu3650 Aug 16, 2021
1b2b81b
Update docs in new APIs and keep sync with FluidDoc.
bliu3650 Aug 18, 2021
27908ce
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
bliu3650 Aug 18, 2021
504e58a
Rename CPU barrier APIs, and change gloo release behavior to not asse…
bliu3650 Aug 19, 2021
a736ebb
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
bliu3650 Aug 19, 2021
f4cd745
Update api examples according to the changes of the names of gloo bar…
bliu3650 Aug 19, 2021
06cac4c
Reduce CPU barrier unittest TIMEOUT.
bliu3650 Aug 20, 2021
a9e6665
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
bliu3650 Aug 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions paddle/fluid/platform/gloo_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ void GlooParallelContext::Init() {
strategy_.scope);
gloo_ptr->Init();
}

void GlooParallelContext::Barrier() {
auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance();
PADDLE_ENFORCE_EQ(gloo_ptr->IsInitialized(), true,
paddle::platform::errors::Unavailable(
"Gloo context is not initialized."));
gloo_ptr->Barrier();
}

void GlooParallelContext::ReleaseContext() {
auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance();
if (gloo_ptr->IsInitialized() == true) {
gloo_ptr.reset();
}
}
#endif

} // namespace platform
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/platform/gloo_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class GlooParallelContext {

virtual void Init();

virtual void Barrier();

virtual void ReleaseContext();

protected:
GlooParallelStrategy strategy_;
};
Expand Down
6 changes: 5 additions & 1 deletion paddle/fluid/pybind/gloo_context_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ void BindGlooContext(py::module *m) {

py::class_<platform::GlooParallelContext> gloo_ctx(*m, "GlooParallelContext");
gloo_ctx.def(py::init<const platform::GlooParallelStrategy &>())
.def("init", [](platform::GlooParallelContext &self) { self.Init(); });
.def("init", [](platform::GlooParallelContext &self) { self.Init(); })
.def("barrier",
[](platform::GlooParallelContext &self) { self.Barrier(); })
.def("release",
[](platform::GlooParallelContext &self) { self.ReleaseContext(); });
#endif
}

Expand Down
7 changes: 7 additions & 0 deletions python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from .parallel import get_rank # noqa: F401
from .parallel import get_world_size # noqa: F401

from .parallel_with_gloo import gloo_init_parallel_env
from .parallel_with_gloo import gloo_barrier
from .parallel_with_gloo import gloo_release

from paddle.distributed.fleet.dataset import InMemoryDataset # noqa: F401
from paddle.distributed.fleet.dataset import QueueDataset # noqa: F401

Expand Down Expand Up @@ -60,6 +64,9 @@
"ParallelEnv",
"new_group",
"init_parallel_env",
"gloo_init_parallel_env",
"gloo_barrier",
"gloo_release",
"QueueDataset",
"split",
"CountFilterEntry",
Expand Down
249 changes: 249 additions & 0 deletions python/paddle/distributed/parallel_with_gloo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except jin compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time
import warnings
from multiprocessing import Process, Manager

# deprecated module import
from paddle.fluid import core
from paddle.distributed.fleet.base.private_helper_function import wait_server_ready

__all__ = []

_global_gloo_ctx = None


def _start_kv_server(port, http_server_d, size):
from paddle.distributed.fleet.utils.http_server import KVServer
http_server = KVServer(int(port), size=size)
http_server.start()
wait_seconds = 3
while http_server_d.get("running", False) or not http_server.should_stop():
time.sleep(wait_seconds)
http_server.stop()


def gloo_init_parallel_env(rank_id, rank_num, server_endpoint):
"""
Initialize parallel environment with gloo for cpu only.

Args:
- rank_id(int, required) - the index of current rank;
- rank_num (int, required) - the number of ranks in this parallel env;
- server_endpoint (str, required) - endpoint of server to init gloo context in ip:port format;

Returns:
None

Examples:
.. code-block:: python

import paddle
import multiprocessing
from contextlib import closing
import socket

port_set = set()

def find_free_port():
def _free_port():
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
while True:
port = _free_port()
if port not in port_set:
port_set.add(port)
return port

def test_gloo_init(id, rank_num, server_endpoint):
paddle.distributed.gloo_init_parallel_env(
id, rank_num, server_endpoint)

def test_gloo_init_with_multiprocess(num_of_ranks):
jobs = []
server_endpoint = "127.0.0.1:%s" % (find_free_port())
for id in range(num_of_ranks):
p = multiprocessing.Process(
target=test_gloo_init,
args=(id, num_of_ranks, server_endpoint))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()

if __name__ == '__main__':
# Arg: number of ranks (processes)
test_gloo_init_with_multiprocess(2)
"""

assert (rank_num < 2) is False, \
"rank_num should greater than or equal to 2 for parallel environment initialzation."

# init gloo context
manager = Manager()
# global dict to store status
http_server_status = manager.dict()
http_server_status["running"] = False
if rank_id == 0:
# The scope for worker used by http server is '_worker'
size = {'_worker': rank_num}
http_server_proc = Process(
target=_start_kv_server,
args=(int(server_endpoint.split(":")[1]), http_server_status, size))
http_server_proc.daemon = True
http_server_status["running"] = True
http_server_proc.start()

# all processes in this parallel environment should wait until server is ready
wait_server_ready([server_endpoint])

gloo_strategy = core.GlooParallelStrategy()
gloo_strategy.rank = rank_id
gloo_strategy.rank_num = rank_num
gloo_strategy.ip_address = server_endpoint.split(":")[0]
gloo_strategy.ip_port = int(server_endpoint.split(":")[1])
# default_init_timeout_seconds
gloo_strategy.init_seconds = 3600
# default_run_timeout_seconds
gloo_strategy.run_seconds = 9999999

global _global_gloo_ctx
_global_gloo_ctx = core.GlooParallelContext(gloo_strategy)
_global_gloo_ctx.init()

if rank_id == 0:
http_server_status["running"] = False
http_server_proc.join()


def gloo_barrier():
"""
Call barrier function with initialized gloo context.

Args:
None

Returns:
None

Examples:
.. code-block:: python

import paddle
import multiprocessing
from contextlib import closing
import socket

port_set = set()

def find_free_port():
def _free_port():
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
while True:
port = _free_port()
if port not in port_set:
port_set.add(port)
return port

def test_gloo_barrier(id, rank_num, server_endpoint):
paddle.distributed.gloo_init_parallel_env(
id, rank_num, server_endpoint)
paddle.distributed.gloo_barrier()

def test_gloo_barrier_with_multiprocess(num_of_ranks):
jobs = []
server_endpoint = "127.0.0.1:%s" % (find_free_port())
for id in range(num_of_ranks):
p = multiprocessing.Process(
target=test_gloo_barrier,
args=(id, num_of_ranks, server_endpoint))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()

if __name__ == '__main__':
# Arg: number of ranks (processes)
test_gloo_barrier_with_multiprocess(2)
"""

assert _global_gloo_ctx is not None, "gloo context is not initialzed."
_global_gloo_ctx.barrier()


def gloo_release():
"""
Release the parallel environment initialized by gloo

Args:
None

Returns:
None

Examples:
.. code-block:: python

import paddle
import multiprocessing
from contextlib import closing
import socket

port_set = set()

def find_free_port():
def _free_port():
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
while True:
port = _free_port()
if port not in port_set:
port_set.add(port)
return port

def test_gloo_release(id, rank_num, server_endpoint):
paddle.distributed.gloo_init_parallel_env(
id, rank_num, server_endpoint)
paddle.distributed.gloo_barrier()
paddle.distributed.gloo_release()

def test_gloo_release_with_multiprocess(num_of_ranks):
jobs = []
server_endpoint = "127.0.0.1:%s" % (find_free_port())
for id in range(num_of_ranks):
p = multiprocessing.Process(
target=test_gloo_release,
args=(id, num_of_ranks, server_endpoint))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()

if __name__ == '__main__':
# Arg: number of ranks (processes)
test_gloo_release_with_multiprocess(2)
"""

if _global_gloo_ctx is not None:
_global_gloo_ctx.release()
2 changes: 2 additions & 0 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ if(NOT WITH_DISTRIBUTE OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_fleet_ps)
LIST(REMOVE_ITEM TEST_OPS test_fleet_rolemaker_2)
LIST(REMOVE_ITEM TEST_OPS test_fleet_utils)
LIST(REMOVE_ITEM TEST_OPS test_collective_cpu_barrier_with_gloo)

# TODO: Fix these unittests failed on Windows
list(REMOVE_ITEM TEST_OPS test_fake_init_op)
Expand Down Expand Up @@ -740,6 +741,7 @@ endif()

if (WITH_DISTRIBUTE AND NOT WIN32)
set_tests_properties(test_fleet_utils PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_cpu_barrier_with_gloo PROPERTIES TIMEOUT 40)
endif()

if (WITH_DISTRIBUTE)
Expand Down
Loading