Skip to content

Commit 504e58a

Browse files
committed
Rename CPU barrier APIs, and change gloo release behavior to not assert its init status.
1 parent 27908ce commit 504e58a

4 files changed

Lines changed: 62 additions & 49 deletions

File tree

paddle/fluid/platform/gloo_context.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@ void GlooParallelContext::Init() {
3030

3131
void GlooParallelContext::Barrier() {
3232
auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance();
33-
CHECK_EQ(gloo_ptr->IsInitialized(), true);
33+
PADDLE_ENFORCE_EQ(gloo_ptr->IsInitialized(), true,
34+
paddle::platform::errors::Unavailable(
35+
"Gloo context is not initialized."));
3436
gloo_ptr->Barrier();
3537
}
3638

3739
void GlooParallelContext::ReleaseContext() {
3840
auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance();
39-
CHECK_EQ(gloo_ptr->IsInitialized(), true);
40-
gloo_ptr.reset();
41+
if (gloo_ptr->IsInitialized() == true) {
42+
gloo_ptr.reset();
43+
}
4144
}
4245
#endif
4346

python/paddle/distributed/__init__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
from .parallel import get_rank # noqa: F401
1919
from .parallel import get_world_size # noqa: F401
2020

21-
from .parallel_with_gloo import init_gloo_parallel_env
22-
from .parallel_with_gloo import barrier_func
23-
from .parallel_with_gloo import release_gloo
21+
from .parallel_with_gloo import gloo_init_parallel_env
22+
from .parallel_with_gloo import gloo_barrier
23+
from .parallel_with_gloo import gloo_release
2424

2525
from paddle.distributed.fleet.dataset import InMemoryDataset # noqa: F401
2626
from paddle.distributed.fleet.dataset import QueueDataset # noqa: F401
@@ -64,9 +64,9 @@
6464
"ParallelEnv",
6565
"new_group",
6666
"init_parallel_env",
67-
"init_gloo_parallel_env",
68-
"barrier_func",
69-
"release_gloo",
67+
"gloo_init_parallel_env",
68+
"gloo_barrier",
69+
"gloo_release",
7070
"QueueDataset",
7171
"split",
7272
"CountFilterEntry",

python/paddle/distributed/parallel_with_gloo.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def _start_kv_server(port, http_server_d, size):
3737
http_server.stop()
3838

3939

40-
def init_gloo_parallel_env(rank_id, rank_num, server_endpoint):
40+
def gloo_init_parallel_env(rank_id, rank_num, server_endpoint):
4141
"""
4242
Initialize parallel environment with gloo for cpu only.
4343
@@ -132,7 +132,7 @@ def test_init_gloo_with_multiprocess(num_of_ranks):
132132
http_server_proc.join()
133133

134134

135-
def barrier_func():
135+
def gloo_barrier():
136136
"""
137137
Call barrier function with initialized gloo context.
138138
@@ -186,10 +186,11 @@ def test_barrier_with_multiprocess(num_of_ranks):
186186
test_barrier_with_multiprocess(2)
187187
"""
188188

189+
assert _global_gloo_ctx is not None, "gloo context is not initialzed."
189190
_global_gloo_ctx.barrier()
190191

191192

192-
def release_gloo(rank_id):
193+
def gloo_release():
193194
"""
194195
Release the parallel environment initialized by gloo
195196
@@ -244,4 +245,5 @@ def test_release_gloo_with_multiprocess(num_of_ranks):
244245
test_release_gloo_with_multiprocess(2)
245246
"""
246247

247-
_global_gloo_ctx.release()
248+
if _global_gloo_ctx is not None:
249+
_global_gloo_ctx.release()

python/paddle/fluid/tests/unittests/test_collective_cpu_barrier_with_gloo.py

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
paddle.enable_static()
2828

2929

30-
class CollectiveCPUBarrierText(unittest.TestCase):
30+
class CollectiveCPUBarrierWithGlooTest(unittest.TestCase):
3131
def find_free_port(self):
3232
def _free_port():
3333
with closing(socket.socket(socket.AF_INET,
@@ -42,42 +42,50 @@ def _free_port():
4242
return port
4343

4444
def barrier_func(self, id, rank_num, server_endpoint, out_dict, sleep_time):
45-
paddle.distributed.init_gloo_parallel_env(id, rank_num, server_endpoint)
46-
# 1st barrier
47-
# Run barrier to synchronize processes after starting
48-
paddle.distributed.barrier_func()
49-
# 2nd barrier
50-
# Let rank 0 sleep for one second and check that all processes
51-
# saw that artificial delay through the barrier
52-
start = time.time()
53-
if (id == 0):
54-
time.sleep(sleep_time)
55-
paddle.distributed.barrier_func()
56-
end = time.time()
57-
out_dict[id] = end - start
58-
# Release
59-
paddle.distributed.release_gloo(id)
45+
try:
46+
paddle.distributed.gloo_init_parallel_env(id, rank_num,
47+
server_endpoint)
48+
# 1st barrier
49+
# Run barrier to synchronize processes after starting
50+
paddle.distributed.gloo_barrier()
51+
# 2nd barrier
52+
# Let rank 0 sleep for one second and check that all processes
53+
# saw that artificial delay through the barrier
54+
start = time.time()
55+
if (id == 0):
56+
time.sleep(sleep_time)
57+
paddle.distributed.gloo_barrier()
58+
end = time.time()
59+
out_dict[id] = end - start
60+
# Release
61+
paddle.distributed.gloo_release()
62+
except:
63+
out_dict[id] = 0
6064

6165
def barrier_op(self, id, rank_num, server_endpoint, out_dict, sleep_time):
62-
main_prog = fluid.Program()
63-
startup_prog = fluid.Program()
64-
paddle.distributed.init_gloo_parallel_env(id, rank_num, server_endpoint)
65-
place = fluid.CPUPlace()
66-
with fluid.program_guard(main_prog, startup_prog):
67-
paddle.distributed.barrier()
68-
exe = fluid.Executor(place)
69-
# Run barrier to synchronize processes after starting
70-
exe.run(main_prog)
71-
# Let rank 0 sleep for one second and check that all processes
72-
# saw that artificial delay through the barrier
73-
start = time.time()
74-
if (id == 0):
75-
time.sleep(sleep_time)
76-
exe.run(main_prog)
77-
end = time.time()
78-
out_dict[id] = end - start
79-
# Release
80-
paddle.distributed.release_gloo(id)
66+
try:
67+
main_prog = fluid.Program()
68+
startup_prog = fluid.Program()
69+
paddle.distributed.gloo_init_parallel_env(id, rank_num,
70+
server_endpoint)
71+
place = fluid.CPUPlace()
72+
with fluid.program_guard(main_prog, startup_prog):
73+
paddle.distributed.barrier()
74+
exe = fluid.Executor(place)
75+
# Run barrier to synchronize processes after starting
76+
exe.run(main_prog)
77+
# Let rank 0 sleep for one second and check that all processes
78+
# saw that artificial delay through the barrier
79+
start = time.time()
80+
if (id == 0):
81+
time.sleep(sleep_time)
82+
exe.run(main_prog)
83+
end = time.time()
84+
out_dict[id] = end - start
85+
# Release
86+
paddle.distributed.gloo_release()
87+
except:
88+
out_dict[id] = 0
8189

8290
def test_barrier_func_with_multiprocess(self):
8391
num_of_ranks = 4
@@ -90,7 +98,7 @@ def test_barrier_func_with_multiprocess(self):
9098
jobs = []
9199
for id in range(num_of_ranks):
92100
p = multiprocessing.Process(
93-
target=self.barrier_op,
101+
target=self.barrier_func,
94102
args=(id, num_of_ranks, ep_str, procs_out_dict, sleep_time))
95103
jobs.append(p)
96104
p.start()

0 commit comments

Comments
 (0)