From 9e95cf72951b7b1f74129cc8821fee1dc396ea5b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 12 Nov 2021 15:47:54 +0800 Subject: [PATCH 01/20] ping pong with brpc --- .../test/interceptor_ping_pong_test.cc | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc index 856bbb4754738e..ef5ae91cf3f0f8 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include #include #include @@ -66,5 +67,36 @@ TEST(InterceptorTest, PingPong) { a->Send(1, msg); } +TEST(InterceptorTestRemote, RemotePingPong) { + std::cout << "Test ping pong through brpc."; + pid_t pid = fork(); + if (pid < 0) { + std::cout << "Fork error, exit remote ping pong test." << std::endl; + } else if (pid == 0) { + MessageBus& msg_bus = MessageBus::Instance(); + msg_bus.Init({{0, 0}, {1, 0}}, + {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, + "127.0.0.1:8001"); + + Carrier& carrier = Carrier::Instance(); + + Interceptor* a = carrier.SetInterceptor( + 1, std::make_unique(1, nullptr)); + } else { + MessageBus& msg_bus = MessageBus::Instance(); + msg_bus.Init({{0, 0}, {1, 1}}, + {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, + "127.0.0.1:8000"); + + Carrier& carrier = Carrier::Instance(); + + Interceptor* a = carrier.SetInterceptor( + 0, std::make_unique(0, nullptr)); + + InterceptorMessage msg; + a->Send(1, msg); + } +} + } // namespace distributed } // namespace paddle From b291662d83769dfb3b45fe632d6641e506fdbc50 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 12 Nov 2021 15:59:51 +0800 Subject: [PATCH 02/20] modify --- .../fleet_executor/test/interceptor_ping_pong_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc index ef5ae91cf3f0f8..bec2b6c2ceedd9 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc @@ -80,8 +80,8 @@ TEST(InterceptorTestRemote, RemotePingPong) { Carrier& carrier = Carrier::Instance(); - Interceptor* a = carrier.SetInterceptor( - 1, std::make_unique(1, nullptr)); + carrier.SetInterceptor(1, + std::make_unique(1, nullptr)); } else { MessageBus& msg_bus = MessageBus::Instance(); msg_bus.Init({{0, 0}, {1, 1}}, From fe276d0302715668954eaecaf9f3f13e4c30862e Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 12 Nov 2021 16:06:16 +0800 Subject: [PATCH 03/20] add compile flag --- .../fleet_executor/test/interceptor_ping_pong_test.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc index bec2b6c2ceedd9..0734a8a700615a 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc @@ -67,6 +67,8 @@ TEST(InterceptorTest, PingPong) { a->Send(1, msg); } +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) TEST(InterceptorTestRemote, RemotePingPong) { std::cout << "Test ping pong through brpc."; pid_t pid = fork(); @@ -97,6 +99,7 @@ TEST(InterceptorTestRemote, RemotePingPong) { a->Send(1, msg); } } +#endif } // namespace distributed } // namespace paddle From 642d8df49fefe6d5cdc6b8271e0400ab1bfb1172 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 12 Nov 2021 16:38:52 +0800 Subject: [PATCH 04/20] move the multi devices out of single device --- .../fleet_executor/test/CMakeLists.txt | 1 + .../test/interceptor_ping_pong_test.cc | 34 ------- ...nterceptor_ping_pong_test_multi_devices.cc | 90 +++++++++++++++++++ 3 files changed, 91 insertions(+), 34 deletions(-) create mode 100644 paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc diff --git a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt index 524aebe3b959f5..364982c528b834 100644 --- a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt @@ -1,2 +1,3 @@ set_source_files_properties(interceptor_ping_pong_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(interceptor_ping_pong_test SRCS interceptor_ping_pong_test.cc DEPS fleet_executor ${BRPC_DEPS}) +cc_test(interceptor_ping_pong_test_multi_devices SRCS interceptor_ping_pong_test_multi_devices.cc DEPS fleet_executor ${BRPC_DEPS}) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc index 0734a8a700615a..e97e6dd87e4e4f 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc @@ -67,39 +67,5 @@ TEST(InterceptorTest, PingPong) { a->Send(1, msg); } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) -TEST(InterceptorTestRemote, RemotePingPong) { - std::cout << "Test ping pong through brpc."; - pid_t pid = fork(); - if (pid < 0) { - std::cout << "Fork error, exit remote ping pong test." << std::endl; - } else if (pid == 0) { - MessageBus& msg_bus = MessageBus::Instance(); - msg_bus.Init({{0, 0}, {1, 0}}, - {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, - "127.0.0.1:8001"); - - Carrier& carrier = Carrier::Instance(); - - carrier.SetInterceptor(1, - std::make_unique(1, nullptr)); - } else { - MessageBus& msg_bus = MessageBus::Instance(); - msg_bus.Init({{0, 0}, {1, 1}}, - {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, - "127.0.0.1:8000"); - - Carrier& carrier = Carrier::Instance(); - - Interceptor* a = carrier.SetInterceptor( - 0, std::make_unique(0, nullptr)); - - InterceptorMessage msg; - a->Send(1, msg); - } -} -#endif - } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc new file mode 100644 index 00000000000000..42829472ef7b34 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc @@ -0,0 +1,90 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include + +#include "gtest/gtest.h" + +#include "paddle/fluid/distributed/fleet_executor/carrier.h" +#include "paddle/fluid/distributed/fleet_executor/interceptor.h" +#include "paddle/fluid/distributed/fleet_executor/message_bus.h" + +namespace paddle { +namespace distributed { + +class PingPongInterceptor : public Interceptor { + public: + PingPongInterceptor(int64_t interceptor_id, TaskNode* node) + : Interceptor(interceptor_id, node) { + RegisterMsgHandle([this](const InterceptorMessage& msg) { PingPong(msg); }); + } + + void PingPong(const InterceptorMessage& msg) { + std::cout << GetInterceptorId() << " recv msg, count=" << count_ + << std::endl; + ++count_; + if (count_ == 20) { + InterceptorMessage stop; + stop.set_message_type(STOP); + Send(0, stop); + Send(1, stop); + return; + } + + InterceptorMessage resp; + Send(msg.src_id(), resp); + } + + private: + int count_{0}; +}; + +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) +TEST(InterceptorTestRemote, RemotePingPong) { + std::cout << "Test ping pong through brpc."; + pid_t pid = fork(); + if (pid < 0) { + std::cout << "Fork error, exit remote ping pong test." << std::endl; + } else if (pid == 0) { + MessageBus& msg_bus = MessageBus::Instance(); + msg_bus.Init({{0, 0}, {1, 0}}, + {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, + "127.0.0.1:8001"); + + Carrier& carrier = Carrier::Instance(); + + carrier.SetInterceptor(1, + std::make_unique(1, nullptr)); + } else { + MessageBus& msg_bus = MessageBus::Instance(); + msg_bus.Init({{0, 0}, {1, 1}}, + {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, + "127.0.0.1:8000"); + + Carrier& carrier = Carrier::Instance(); + + Interceptor* a = carrier.SetInterceptor( + 0, std::make_unique(0, nullptr)); + + InterceptorMessage msg; + a->Send(1, msg); + } +} +#endif + +} // namespace distributed +} // namespace paddle From f6185d2a9aad5faf06f64016b3d29fbb40e1dd81 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 12 Nov 2021 16:47:13 +0800 Subject: [PATCH 05/20] add return value for send --- paddle/fluid/distributed/fleet_executor/interceptor.cc | 4 ++-- paddle/fluid/distributed/fleet_executor/interceptor.h | 2 +- paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt | 1 + .../test/interceptor_ping_pong_test_multi_devices.cc | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.cc b/paddle/fluid/distributed/fleet_executor/interceptor.cc index dbee46afcf86fa..2b301a2ea35022 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/interceptor.cc @@ -56,10 +56,10 @@ bool Interceptor::EnqueueRemoteInterceptorMessage( return true; } -void Interceptor::Send(int64_t dst_id, InterceptorMessage& msg) { +bool Interceptor::Send(int64_t dst_id, InterceptorMessage& msg) { msg.set_src_id(interceptor_id_); msg.set_dst_id(dst_id); - MessageBus::Instance().Send(msg); + return MessageBus::Instance().Send(msg); } void Interceptor::PoolTheMailbox() { diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.h b/paddle/fluid/distributed/fleet_executor/interceptor.h index 24fad8331863e2..2e86dc2fe525d4 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor.h +++ b/paddle/fluid/distributed/fleet_executor/interceptor.h @@ -58,7 +58,7 @@ class Interceptor { bool EnqueueRemoteInterceptorMessage( const InterceptorMessage& interceptor_message); - void Send(int64_t dst_id, InterceptorMessage& msg); // NOLINT + bool Send(int64_t dst_id, InterceptorMessage& msg); // NOLINT DISABLE_COPY_AND_ASSIGN(Interceptor); diff --git a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt index 364982c528b834..82a1b028b32893 100644 --- a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt @@ -1,3 +1,4 @@ set_source_files_properties(interceptor_ping_pong_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(interceptor_ping_pong_test SRCS interceptor_ping_pong_test.cc DEPS fleet_executor ${BRPC_DEPS}) +set_source_files_properties(interceptor_ping_pong_test_multi_devices.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(interceptor_ping_pong_test_multi_devices SRCS interceptor_ping_pong_test_multi_devices.cc DEPS fleet_executor ${BRPC_DEPS}) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc index 42829472ef7b34..b739da59692b4d 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc @@ -81,7 +81,8 @@ TEST(InterceptorTestRemote, RemotePingPong) { 0, std::make_unique(0, nullptr)); InterceptorMessage msg; - a->Send(1, msg); + while (!a->Send(1, msg)) { + } } } #endif From 9431f2b206e09d001f09bdfe1119f00dec75feb9 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 09:37:36 +0800 Subject: [PATCH 06/20] update ut --- .../test/interceptor_ping_pong_test.cc | 1 - ...nterceptor_ping_pong_test_multi_devices.cc | 21 +++++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc index e97e6dd87e4e4f..856bbb4754738e 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc @@ -12,7 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include #include #include diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc index b739da59692b4d..7ae9758f03a33b 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc @@ -39,8 +39,7 @@ class PingPongInterceptor : public Interceptor { if (count_ == 20) { InterceptorMessage stop; stop.set_message_type(STOP); - Send(0, stop); - Send(1, stop); + Send(GetInterceptorId() == 0 ? 1 : 0, stop); return; } @@ -61,19 +60,23 @@ TEST(InterceptorTestRemote, RemotePingPong) { std::cout << "Fork error, exit remote ping pong test." << std::endl; } else if (pid == 0) { MessageBus& msg_bus = MessageBus::Instance(); - msg_bus.Init({{0, 0}, {1, 0}}, - {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, - "127.0.0.1:8001"); + msg_bus.Init({{0, 0}, {1, 1}}, + {{0, "127.0.0.1:6912"}, {1, "127.0.0.1:7950"}}, + "127.0.0.1:7950"); Carrier& carrier = Carrier::Instance(); - carrier.SetInterceptor(1, - std::make_unique(1, nullptr)); + Interceptor* a = carrier.SetInterceptor( + 1, std::make_unique(1, nullptr)); + + InterceptorMessage msg; + while (!a->Send(0, msg)) { + } } else { MessageBus& msg_bus = MessageBus::Instance(); msg_bus.Init({{0, 0}, {1, 1}}, - {{0, "127.0.0.1:8000"}, {1, "127.0.0.1:8001"}}, - "127.0.0.1:8000"); + {{0, "127.0.0.1:6912"}, {1, "127.0.0.1:7950"}}, + "127.0.0.1:6912"); Carrier& carrier = Carrier::Instance(); From a3ad0faaff73b1aeb0deda6a591e69f7fd650806 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 09:47:54 +0800 Subject: [PATCH 07/20] add sync method --- paddle/fluid/distributed/fleet_executor/carrier.cc | 8 ++++++++ paddle/fluid/distributed/fleet_executor/carrier.h | 3 +++ .../interceptor_ping_pong_test_multi_devices.cc | 13 ++++++------- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 0e79656edea091..3c5456852b6032 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -33,6 +33,10 @@ bool Carrier::EnqueueInterceptorMessage( // handle control message return true; } else { + if (creating_interceptors_) { + tmp_stack_.emplace_back(interceptor_message); + return true; + } int64_t dst_id = interceptor_message.dst_id(); Interceptor* dst_interceptor = GetInterceptor(dst_id); bool rst = @@ -81,6 +85,10 @@ void Carrier::CreateInterceptors() { SetInterceptor(interceptor_id, std::move(interceptor)); VLOG(3) << "Create Interceptor for " << interceptor_id; } + creating_interceptors_ = false; + for (const auto& msg : tmp_stack_) { + EnqueueInterceptorMessage(msg); + } } } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 64974714f7b1c7..2e3f2e9bd2b470 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -67,6 +67,9 @@ class Carrier final { // interceptor logic id to actually interceptor std::unordered_map> interceptor_idx_to_interceptor_; + + std::vector tmp_stack_; + bool creating_interceptors_{true}; }; } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc index 7ae9758f03a33b..8f6a72daac5eaa 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc @@ -39,7 +39,8 @@ class PingPongInterceptor : public Interceptor { if (count_ == 20) { InterceptorMessage stop; stop.set_message_type(STOP); - Send(GetInterceptorId() == 0 ? 1 : 0, stop); + Send(0, stop); + Send(1, stop); return; } @@ -65,9 +66,8 @@ TEST(InterceptorTestRemote, RemotePingPong) { "127.0.0.1:7950"); Carrier& carrier = Carrier::Instance(); - - Interceptor* a = carrier.SetInterceptor( - 1, std::make_unique(1, nullptr)); + carrier.Init({{1, nullptr}}); + Interceptor* a = carrier.GetInterceptor(1); InterceptorMessage msg; while (!a->Send(0, msg)) { @@ -79,9 +79,8 @@ TEST(InterceptorTestRemote, RemotePingPong) { "127.0.0.1:6912"); Carrier& carrier = Carrier::Instance(); - - Interceptor* a = carrier.SetInterceptor( - 0, std::make_unique(0, nullptr)); + carrier.Init({{0, nullptr}}); + Interceptor* a = carrier.GetInterceptor(0); InterceptorMessage msg; while (!a->Send(1, msg)) { From adafddcd3c9c6303f3588955d845b97c565244df Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 10:01:54 +0800 Subject: [PATCH 08/20] udpate vlog --- paddle/fluid/distributed/fleet_executor/carrier.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 3c5456852b6032..96354f31e73917 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -34,6 +34,7 @@ bool Carrier::EnqueueInterceptorMessage( return true; } else { if (creating_interceptors_) { + VLOG(3) << "Receiving message while creating interceptors."; tmp_stack_.emplace_back(interceptor_message); return true; } @@ -83,10 +84,13 @@ void Carrier::CreateInterceptors() { // TODO(wangxi): use node_type to select different Interceptor auto interceptor = std::make_unique(interceptor_id, task_node); SetInterceptor(interceptor_id, std::move(interceptor)); - VLOG(3) << "Create Interceptor for " << interceptor_id; + VLOG(3) << "Create Interceptor with interceptor id: " << interceptor_id + << "."; } creating_interceptors_ = false; for (const auto& msg : tmp_stack_) { + VLOG(3) << "Received " << tmp_stack_.size() + << " messages during creating interceptors."; EnqueueInterceptorMessage(msg); } } From 84b8d7ee2a027a2dcfa1026162d2c627e77cd4d3 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 10:04:12 +0800 Subject: [PATCH 09/20] update vlog --- .../fleet_executor/interceptor_message_service.cc | 4 ++-- paddle/fluid/distributed/fleet_executor/message_bus.cc | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/interceptor_message_service.cc b/paddle/fluid/distributed/fleet_executor/interceptor_message_service.cc index 2205c6e5544bb5..44195467045c34 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor_message_service.cc +++ b/paddle/fluid/distributed/fleet_executor/interceptor_message_service.cc @@ -26,8 +26,8 @@ void InterceptorMessageServiceImpl::InterceptorMessageService( const InterceptorMessage* request, InterceptorResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); - VLOG(3) << "Interceptor Message Service receives a message from: " - << request->src_id() + VLOG(3) << "Interceptor Message Service receives a message from interceptor " + << request->src_id() << " to interceptor " << request->dst_id() << ", with the message: " << request->message_type(); response->set_rst(true); // call interceptor manager's method to handle the message diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 27a1f90767fe6e..e25b5406c032f5 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -55,12 +55,12 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) { int64_t src_id = interceptor_message.src_id(); int64_t dst_id = interceptor_message.dst_id(); if (IsSameRank(src_id, dst_id)) { - VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id - << ", which are same ranks."; + VLOG(3) << "Send a message from interceptor " << src_id + << " to interceptor " << dst_id << ", which are same ranks."; return SendIntraRank(interceptor_message); } else { - VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id - << ", which are different ranks."; + VLOG(3) << "Send a message from interceptor " << src_id + << " to interceptor " << dst_id << ", which are different ranks."; #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) int retry_time = 0; // message bus will retry sending for 10 times From 244a5ae1e28f8cff9c2d40a7e9358ef139054b33 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 10:16:22 +0800 Subject: [PATCH 10/20] update vlog --- paddle/fluid/distributed/fleet_executor/carrier.cc | 6 +++--- paddle/fluid/distributed/fleet_executor/interceptor.cc | 5 +++-- paddle/fluid/distributed/fleet_executor/message_bus.cc | 5 +++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 96354f31e73917..ebbdc038702e67 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -88,10 +88,10 @@ void Carrier::CreateInterceptors() { << "."; } creating_interceptors_ = false; + VLOG(3) << "Carrier has received " << tmp_stack_.size() + << " messages during creating interceptors."; for (const auto& msg : tmp_stack_) { - VLOG(3) << "Received " << tmp_stack_.size() - << " messages during creating interceptors."; - EnqueueInterceptorMessage(msg); + EnqueueInterceptorMessage(std::move(msg)); } } diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.cc b/paddle/fluid/distributed/fleet_executor/interceptor.cc index 2b301a2ea35022..d263af77fc705f 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/interceptor.cc @@ -76,8 +76,9 @@ void Interceptor::PoolTheMailbox() { const InterceptorMessage interceptor_message = local_mailbox_.front(); local_mailbox_.pop(); const MessageType message_type = interceptor_message.message_type(); - VLOG(3) << interceptor_id_ << " has received a message: " << message_type - << "."; + VLOG(3) << "Interceptor " << interceptor_id_ + << " has received a message: " << message_type + << " from interceptor " << interceptor_message.src_id() << "."; if (message_type == STOP) { // break the pooling thread break; diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index e25b5406c032f5..67c66103dceaaf 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -56,11 +56,12 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) { int64_t dst_id = interceptor_message.dst_id(); if (IsSameRank(src_id, dst_id)) { VLOG(3) << "Send a message from interceptor " << src_id - << " to interceptor " << dst_id << ", which are same ranks."; + << " to interceptor " << dst_id << ", which are in the same ranks."; return SendIntraRank(interceptor_message); } else { VLOG(3) << "Send a message from interceptor " << src_id - << " to interceptor " << dst_id << ", which are different ranks."; + << " to interceptor " << dst_id + << ", which are in different ranks."; #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) int retry_time = 0; // message bus will retry sending for 10 times From d61ab36ba3c6950e6be65fb842cd83ec659399c2 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 10:21:51 +0800 Subject: [PATCH 11/20] add finish flag --- .../interceptor_ping_pong_test_multi_devices.cc | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc index 8f6a72daac5eaa..c6e3ca1ef62699 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc @@ -41,6 +41,7 @@ class PingPongInterceptor : public Interceptor { stop.set_message_type(STOP); Send(0, stop); Send(1, stop); + finish_ = true; return; } @@ -48,8 +49,11 @@ class PingPongInterceptor : public Interceptor { Send(msg.src_id(), resp); } + bool IsFinish() { return finish_; } + private: int count_{0}; + bool finish_{false}; }; #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ @@ -66,12 +70,14 @@ TEST(InterceptorTestRemote, RemotePingPong) { "127.0.0.1:7950"); Carrier& carrier = Carrier::Instance(); - carrier.Init({{1, nullptr}}); - Interceptor* a = carrier.GetInterceptor(1); + Interceptor* a = carrier.SetInterceptor( + 1, std::make_unique(1, nullptr)); InterceptorMessage msg; while (!a->Send(0, msg)) { } + while (!a.IsFinish()) { + } } else { MessageBus& msg_bus = MessageBus::Instance(); msg_bus.Init({{0, 0}, {1, 1}}, @@ -79,12 +85,14 @@ TEST(InterceptorTestRemote, RemotePingPong) { "127.0.0.1:6912"); Carrier& carrier = Carrier::Instance(); - carrier.Init({{0, nullptr}}); - Interceptor* a = carrier.GetInterceptor(0); + Interceptor* a = carrier.SetInterceptor( + 0, std::make_unique(0, nullptr)); InterceptorMessage msg; while (!a->Send(1, msg)) { } + while (!a.IsFinish()) { + } } } #endif From 7e3ec1f155917376636970060c2f45183be9d67b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 10:36:48 +0800 Subject: [PATCH 12/20] update sync logic --- .../distributed/fleet_executor/carrier.cc | 40 ++++++++++++------- .../distributed/fleet_executor/carrier.h | 2 + ...nterceptor_ping_pong_test_multi_devices.cc | 4 +- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index ebbdc038702e67..363962120ecd85 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -34,6 +34,7 @@ bool Carrier::EnqueueInterceptorMessage( return true; } else { if (creating_interceptors_) { + // cannot handle the messag to interceptor since sting creating VLOG(3) << "Receiving message while creating interceptors."; tmp_stack_.emplace_back(interceptor_message); return true; @@ -75,23 +76,34 @@ Interceptor* Carrier::SetInterceptor(int64_t interceptor_id, return ptr; } +void Carrier::SetCreatingFlag(bool flag) { + // set the creating flag + creating_interceptors_ = flag; +} + void Carrier::CreateInterceptors() { // create each Interceptor - for (const auto& item : interceptor_id_to_node_) { - int64_t interceptor_id = item.first; - TaskNode* task_node = item.second; + if (!interceptor_id_to_node_.empty()) { + // no auto init since there is no config + for (const auto& item : interceptor_id_to_node_) { + int64_t interceptor_id = item.first; + TaskNode* task_node = item.second; - // TODO(wangxi): use node_type to select different Interceptor - auto interceptor = std::make_unique(interceptor_id, task_node); - SetInterceptor(interceptor_id, std::move(interceptor)); - VLOG(3) << "Create Interceptor with interceptor id: " << interceptor_id - << "."; - } - creating_interceptors_ = false; - VLOG(3) << "Carrier has received " << tmp_stack_.size() - << " messages during creating interceptors."; - for (const auto& msg : tmp_stack_) { - EnqueueInterceptorMessage(std::move(msg)); + // TODO(wangxi): use node_type to select different Interceptor + auto interceptor = + std::make_unique(interceptor_id, task_node); + SetInterceptor(interceptor_id, std::move(interceptor)); + VLOG(3) << "Create Interceptor with interceptor id: " << interceptor_id + << "."; + } + // The carrier will be always waiting for outside initializer + // since there is no interceptor has been created during auto init + creating_interceptors_ = false; + VLOG(3) << "Carrier has received " << tmp_stack_.size() + << " messages during creating interceptors."; + for (const auto& msg : tmp_stack_) { + EnqueueInterceptorMessage(std::move(msg)); + } } } diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 2e3f2e9bd2b470..46cd3dbe0a8f2e 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -53,6 +53,8 @@ class Carrier final { Interceptor* SetInterceptor(int64_t interceptor_id, std::unique_ptr); + void SetCreatingFlag(bool flag); + DISABLE_COPY_AND_ASSIGN(Carrier); private: diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc index c6e3ca1ef62699..ba5845bfe816d0 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc @@ -76,7 +76,7 @@ TEST(InterceptorTestRemote, RemotePingPong) { InterceptorMessage msg; while (!a->Send(0, msg)) { } - while (!a.IsFinish()) { + while (!(static_cast(a))->IsFinish()) { } } else { MessageBus& msg_bus = MessageBus::Instance(); @@ -91,7 +91,7 @@ TEST(InterceptorTestRemote, RemotePingPong) { InterceptorMessage msg; while (!a->Send(1, msg)) { } - while (!a.IsFinish()) { + while (!(static_cast(a))->IsFinish()) { } } } From a51256b45ec9d8de3e4b6e81519df07e177ad293 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 10:50:39 +0800 Subject: [PATCH 13/20] add more vlog --- paddle/fluid/distributed/fleet_executor/carrier.cc | 3 ++- paddle/fluid/distributed/fleet_executor/message_bus.cc | 1 + .../fleet_executor/test/interceptor_ping_pong_test.cc | 2 +- .../test/interceptor_ping_pong_test_multi_devices.cc | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 363962120ecd85..a253ba04c26f62 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -34,7 +34,8 @@ bool Carrier::EnqueueInterceptorMessage( return true; } else { if (creating_interceptors_) { - // cannot handle the messag to interceptor since sting creating + // Cannot handle the message to interceptor since interceptors + // are still under creating. Will enqueue into a tmp stack. VLOG(3) << "Receiving message while creating interceptors."; tmp_stack_.emplace_back(interceptor_message); return true; diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 67c66103dceaaf..309982bc04bebd 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -156,6 +156,7 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) { "Cannot find rank for dst interceptor id %lld. " "Init error.", dst_id)); + VLOG(3) << "Message bus sending to addr: " << dst_ip->second; const char* dst_ip_for_brpc = dst_ip->second.c_str(); brpc::Channel channel; brpc::ChannelOptions options; diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc index 856bbb4754738e..783c924398a703 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc @@ -59,8 +59,8 @@ TEST(InterceptorTest, PingPong) { Interceptor* a = carrier.SetInterceptor( 0, std::make_unique(0, nullptr)); - carrier.SetInterceptor(1, std::make_unique(1, nullptr)); + carrier.SetCreatingFlag(false); InterceptorMessage msg; a->Send(1, msg); diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc index ba5845bfe816d0..3452d2c7cdc177 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc @@ -72,6 +72,7 @@ TEST(InterceptorTestRemote, RemotePingPong) { Carrier& carrier = Carrier::Instance(); Interceptor* a = carrier.SetInterceptor( 1, std::make_unique(1, nullptr)); + carrier.SetCreatingFlag(false); InterceptorMessage msg; while (!a->Send(0, msg)) { @@ -87,6 +88,7 @@ TEST(InterceptorTestRemote, RemotePingPong) { Carrier& carrier = Carrier::Instance(); Interceptor* a = carrier.SetInterceptor( 0, std::make_unique(0, nullptr)); + carrier.SetCreatingFlag(false); InterceptorMessage msg; while (!a->Send(1, msg)) { From 22eb9d437df52ef7ffcac42f10e51fe7217dcc43 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 11:05:59 +0800 Subject: [PATCH 14/20] remove ut --- .../fleet_executor/test/CMakeLists.txt | 2 - ...nterceptor_ping_pong_test_multi_devices.cc | 103 ------------------ 2 files changed, 105 deletions(-) delete mode 100644 paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc diff --git a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt index 82a1b028b32893..524aebe3b959f5 100644 --- a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt @@ -1,4 +1,2 @@ set_source_files_properties(interceptor_ping_pong_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(interceptor_ping_pong_test SRCS interceptor_ping_pong_test.cc DEPS fleet_executor ${BRPC_DEPS}) -set_source_files_properties(interceptor_ping_pong_test_multi_devices.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) -cc_test(interceptor_ping_pong_test_multi_devices SRCS interceptor_ping_pong_test_multi_devices.cc DEPS fleet_executor ${BRPC_DEPS}) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc deleted file mode 100644 index 3452d2c7cdc177..00000000000000 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test_multi_devices.cc +++ /dev/null @@ -1,103 +0,0 @@ -/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include -#include -#include - -#include "gtest/gtest.h" - -#include "paddle/fluid/distributed/fleet_executor/carrier.h" -#include "paddle/fluid/distributed/fleet_executor/interceptor.h" -#include "paddle/fluid/distributed/fleet_executor/message_bus.h" - -namespace paddle { -namespace distributed { - -class PingPongInterceptor : public Interceptor { - public: - PingPongInterceptor(int64_t interceptor_id, TaskNode* node) - : Interceptor(interceptor_id, node) { - RegisterMsgHandle([this](const InterceptorMessage& msg) { PingPong(msg); }); - } - - void PingPong(const InterceptorMessage& msg) { - std::cout << GetInterceptorId() << " recv msg, count=" << count_ - << std::endl; - ++count_; - if (count_ == 20) { - InterceptorMessage stop; - stop.set_message_type(STOP); - Send(0, stop); - Send(1, stop); - finish_ = true; - return; - } - - InterceptorMessage resp; - Send(msg.src_id(), resp); - } - - bool IsFinish() { return finish_; } - - private: - int count_{0}; - bool finish_{false}; -}; - -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) -TEST(InterceptorTestRemote, RemotePingPong) { - std::cout << "Test ping pong through brpc."; - pid_t pid = fork(); - if (pid < 0) { - std::cout << "Fork error, exit remote ping pong test." << std::endl; - } else if (pid == 0) { - MessageBus& msg_bus = MessageBus::Instance(); - msg_bus.Init({{0, 0}, {1, 1}}, - {{0, "127.0.0.1:6912"}, {1, "127.0.0.1:7950"}}, - "127.0.0.1:7950"); - - Carrier& carrier = Carrier::Instance(); - Interceptor* a = carrier.SetInterceptor( - 1, std::make_unique(1, nullptr)); - carrier.SetCreatingFlag(false); - - InterceptorMessage msg; - while (!a->Send(0, msg)) { - } - while (!(static_cast(a))->IsFinish()) { - } - } else { - MessageBus& msg_bus = MessageBus::Instance(); - msg_bus.Init({{0, 0}, {1, 1}}, - {{0, "127.0.0.1:6912"}, {1, "127.0.0.1:7950"}}, - "127.0.0.1:6912"); - - Carrier& carrier = Carrier::Instance(); - Interceptor* a = carrier.SetInterceptor( - 0, std::make_unique(0, nullptr)); - carrier.SetCreatingFlag(false); - - InterceptorMessage msg; - while (!a->Send(1, msg)) { - } - while (!(static_cast(a))->IsFinish()) { - } - } -} -#endif - -} // namespace distributed -} // namespace paddle From 9b7acff96094c7e0356d2137f35d08d801e62a8d Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 11:11:18 +0800 Subject: [PATCH 15/20] update vlog --- paddle/fluid/distributed/fleet_executor/interceptor.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.cc b/paddle/fluid/distributed/fleet_executor/interceptor.cc index d263af77fc705f..31ab0c4ab09025 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/interceptor.cc @@ -76,9 +76,9 @@ void Interceptor::PoolTheMailbox() { const InterceptorMessage interceptor_message = local_mailbox_.front(); local_mailbox_.pop(); const MessageType message_type = interceptor_message.message_type(); - VLOG(3) << "Interceptor " << interceptor_id_ - << " has received a message: " << message_type - << " from interceptor " << interceptor_message.src_id() << "."; + VLOG(3) << "Interceptor " << interceptor_id_ << " has received a message" + << " from interceptor " << interceptor_message.src_id() + << " with message: " << message_type << "."; if (message_type == STOP) { // break the pooling thread break; From ea55c68016be32b892e14adcf71749d9839a8b45 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 11:13:07 +0800 Subject: [PATCH 16/20] update vlog --- paddle/fluid/distributed/fleet_executor/carrier.cc | 2 ++ paddle/fluid/distributed/fleet_executor/interceptor.cc | 1 + 2 files changed, 3 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index a253ba04c26f62..ef1631aaf92c33 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -79,6 +79,8 @@ Interceptor* Carrier::SetInterceptor(int64_t interceptor_id, void Carrier::SetCreatingFlag(bool flag) { // set the creating flag + VLOG(3) << "Carrier is set the creating flag from " << creating_interceptors_ + << " to " << flag << "."; creating_interceptors_ = flag; } diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.cc b/paddle/fluid/distributed/fleet_executor/interceptor.cc index 31ab0c4ab09025..6b606290fa160e 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/interceptor.cc @@ -81,6 +81,7 @@ void Interceptor::PoolTheMailbox() { << " with message: " << message_type << "."; if (message_type == STOP) { // break the pooling thread + VLOG(3) << "Interceptor " << interceptor_id_ << " is quiting."; break; } From 20ac7fdea61d08c9f483c91ead30fa6d4a32d1ce Mon Sep 17 00:00:00 2001 From: liuyuang Date: Sat, 13 Nov 2021 11:15:40 +0800 Subject: [PATCH 17/20] update vlog, rename var --- paddle/fluid/distributed/fleet_executor/carrier.cc | 6 +++--- paddle/fluid/distributed/fleet_executor/carrier.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index ef1631aaf92c33..c4ed63bf31cae2 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -37,7 +37,7 @@ bool Carrier::EnqueueInterceptorMessage( // Cannot handle the message to interceptor since interceptors // are still under creating. Will enqueue into a tmp stack. VLOG(3) << "Receiving message while creating interceptors."; - tmp_stack_.emplace_back(interceptor_message); + message_tmp_queue_.emplace_back(interceptor_message); return true; } int64_t dst_id = interceptor_message.dst_id(); @@ -102,9 +102,9 @@ void Carrier::CreateInterceptors() { // The carrier will be always waiting for outside initializer // since there is no interceptor has been created during auto init creating_interceptors_ = false; - VLOG(3) << "Carrier has received " << tmp_stack_.size() + VLOG(3) << "Carrier has received " << message_tmp_queue_.size() << " messages during creating interceptors."; - for (const auto& msg : tmp_stack_) { + for (const auto& msg : message_tmp_queue_) { EnqueueInterceptorMessage(std::move(msg)); } } diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 46cd3dbe0a8f2e..7704dea4649542 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -70,7 +70,7 @@ class Carrier final { std::unordered_map> interceptor_idx_to_interceptor_; - std::vector tmp_stack_; + std::vector message_tmp_queue_{}; bool creating_interceptors_{true}; }; From 4c6822b21bd8917f2989ad5d540f7908735724c2 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 15 Nov 2021 09:19:09 +0800 Subject: [PATCH 18/20] bug fix --- .../fluid/distributed/fleet_executor/carrier.cc | 17 +++++++++++------ .../fluid/distributed/fleet_executor/carrier.h | 5 ++++- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index c4ed63bf31cae2..3202d89950a3a1 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -37,7 +37,7 @@ bool Carrier::EnqueueInterceptorMessage( // Cannot handle the message to interceptor since interceptors // are still under creating. Will enqueue into a tmp stack. VLOG(3) << "Receiving message while creating interceptors."; - message_tmp_queue_.emplace_back(interceptor_message); + message_tmp_.emplace_back(interceptor_message); return true; } int64_t dst_id = interceptor_message.dst_id(); @@ -82,6 +82,15 @@ void Carrier::SetCreatingFlag(bool flag) { VLOG(3) << "Carrier is set the creating flag from " << creating_interceptors_ << " to " << flag << "."; creating_interceptors_ = flag; + HandleTmpMessages(); +} + +void Carrier::HandleTmpMessages() { + VLOG(3) << "Carrier has received " << message_tmp_.size() + << " messages during creating interceptors."; + for (const auto& msg : message_tmp_) { + EnqueueInterceptorMessage(std::move(msg)); + } } void Carrier::CreateInterceptors() { @@ -102,11 +111,7 @@ void Carrier::CreateInterceptors() { // The carrier will be always waiting for outside initializer // since there is no interceptor has been created during auto init creating_interceptors_ = false; - VLOG(3) << "Carrier has received " << message_tmp_queue_.size() - << " messages during creating interceptors."; - for (const auto& msg : message_tmp_queue_) { - EnqueueInterceptorMessage(std::move(msg)); - } + HandleTmpMessages(); } } diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 7704dea4649542..95f9ffcdf4960f 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" @@ -63,6 +64,8 @@ class Carrier final { // create each Interceptor void CreateInterceptors(); + void HandleTmpMessages(); + // interceptor logic id to the Nodes info std::unordered_map interceptor_id_to_node_; @@ -70,7 +73,7 @@ class Carrier final { std::unordered_map> interceptor_idx_to_interceptor_; - std::vector message_tmp_queue_{}; + std::vector message_tmp_{}; bool creating_interceptors_{true}; }; From 97c5b7622f3f2f2f6dceb6c94ad5ba7c7203b9e9 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 15 Nov 2021 09:25:12 +0800 Subject: [PATCH 19/20] add more logic --- paddle/fluid/distributed/fleet_executor/carrier.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 3202d89950a3a1..6126e551946c30 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -82,7 +82,10 @@ void Carrier::SetCreatingFlag(bool flag) { VLOG(3) << "Carrier is set the creating flag from " << creating_interceptors_ << " to " << flag << "."; creating_interceptors_ = flag; - HandleTmpMessages(); + if (!flag) { + // finish create interceptors outside, handle tmp messsages + HandleTmpMessages(); + } } void Carrier::HandleTmpMessages() { From 3261b605b855a93832db14c27915c4bf90571dbe Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 15 Nov 2021 16:50:13 +0800 Subject: [PATCH 20/20] refine code --- paddle/fluid/distributed/fleet_executor/carrier.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 6126e551946c30..b87f48bc27c544 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -92,8 +92,9 @@ void Carrier::HandleTmpMessages() { VLOG(3) << "Carrier has received " << message_tmp_.size() << " messages during creating interceptors."; for (const auto& msg : message_tmp_) { - EnqueueInterceptorMessage(std::move(msg)); + EnqueueInterceptorMessage(msg); } + message_tmp_.clear(); } void Carrier::CreateInterceptors() {