From d98be69971160359874419d32cb88a406108e731 Mon Sep 17 00:00:00 2001 From: seemingwang Date: Mon, 15 Mar 2021 06:47:25 +0000 Subject: [PATCH] separate server and client --- .../distributed/service/graph_py_service.cc | 176 ++++++++++++-- .../distributed/service/graph_py_service.h | 214 +++++------------- .../fluid/distributed/test/graph_node_test.cc | 61 ++++- paddle/fluid/pybind/fleet_py.cc | 26 ++- paddle/fluid/pybind/fleet_py.h | 5 +- paddle/fluid/pybind/pybind.cc | 4 +- 6 files changed, 290 insertions(+), 196 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_py_service.cc b/paddle/fluid/distributed/service/graph_py_service.cc index 04c04930e43e8d..2df3843c96afec 100644 --- a/paddle/fluid/distributed/service/graph_py_service.cc +++ b/paddle/fluid/distributed/service/graph_py_service.cc @@ -13,9 +13,15 @@ // limitations under the License. #include "paddle/fluid/distributed/service/graph_py_service.h" +#include // NOLINT +#include "butil/endpoint.h" +#include "iomanip" +#include "paddle/fluid/distributed/table/table.h" +#include "paddle/fluid/framework/archive.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace distributed { -std::vector GraphPyService::split(std::string &str, +std::vector GraphPyService::split(std::string& str, const char pattern) { std::vector res; std::stringstream input(str); @@ -26,18 +32,17 @@ std::vector GraphPyService::split(std::string &str, return res; } -void GraphPyService::set_up(std::string ips_str, int shard_num, int rank, - int client_id, std::vector edge_types) { +void GraphPyService::set_up(std::string ips_str, int shard_num, + std::vector edge_types) { set_shard_num(shard_num); - set_client_Id(client_id); - set_rank(rank); + // set_client_Id(client_id); + // set_rank(rank); - this -> table_id_map[std::string("")] = 0; + this->table_id_map[std::string("")] = 0; // Table 0 are for nodes - for(size_t table_id = 0; table_id < edge_types.size(); table_id ++ ) { - this -> table_id_map[edge_types[table_id]] = int(table_id + 1); - } - server_thread = client_thread = NULL; + for (size_t table_id = 0; table_id < edge_types.size(); table_id++) { + this->table_id_map[edge_types[table_id]] = int(table_id + 1); + } std::istringstream stream(ips_str); std::string ip; server_size = 0; @@ -52,10 +57,153 @@ void GraphPyService::set_up(std::string ips_str, int shard_num, int rank, host_sign_list.push_back(ph_host.serialize_to_string()); index++; } - //VLOG(0) << "IN set up rank = " << rank; - start_client(); - start_server(server_list[rank], std::stoul(port_list[rank])); - sleep(1); + // VLOG(0) << "IN set up rank = " << rank; + // start_client(); + // start_server(server_list[rank], std::stoul(port_list[rank])); +} +void GraphPyClient::start_client() { + std::map> dense_regions; + dense_regions.insert( + std::pair>(0, {})); + auto regions = dense_regions[0]; + ::paddle::distributed::PSParameter worker_proto = GetWorkerProto(); + paddle::distributed::PaddlePSEnvironment _ps_env; + auto servers_ = host_sign_list.size(); + _ps_env = paddle::distributed::PaddlePSEnvironment(); + _ps_env.set_ps_servers(&host_sign_list, servers_); + worker_ptr = std::shared_ptr( + paddle::distributed::PSClientFactory::create(worker_proto)); + worker_ptr->configure(worker_proto, dense_regions, _ps_env, client_id); +} +void GraphPyServer::start_server() { + std::string ip = server_list[rank]; + uint32_t port = std::stoul(port_list[rank]); + server_thread = new std::thread([this, &ip, &port]() { + std::function func = [this, &ip, &port]() { + ::paddle::distributed::PSParameter server_proto = this->GetServerProto(); + + auto _ps_env = paddle::distributed::PaddlePSEnvironment(); + _ps_env.set_ps_servers(&this->host_sign_list, + this->host_sign_list.size()); // test + pserver_ptr = std::shared_ptr( + paddle::distributed::PSServerFactory::create(server_proto)); + VLOG(0) << "pserver-ptr created "; + std::vector empty_vec; + framework::ProgramDesc empty_prog; + empty_vec.push_back(empty_prog); + pserver_ptr->configure(server_proto, _ps_env, rank, empty_vec); + pserver_ptr->start(ip, port); + }; + std::thread t1(func); + t1.join(); + }); + sleep(3); +} +::paddle::distributed::PSParameter GraphPyServer::GetServerProto() { + // Generate server proto desc + ::paddle::distributed::PSParameter server_fleet_desc; + server_fleet_desc.set_shard_num(get_shard_num()); + ::paddle::distributed::ServerParameter* server_proto = + server_fleet_desc.mutable_server_param(); + ::paddle::distributed::DownpourServerParameter* downpour_server_proto = + server_proto->mutable_downpour_server_param(); + ::paddle::distributed::ServerServiceParameter* server_service_proto = + downpour_server_proto->mutable_service_param(); + server_service_proto->set_service_class("GraphBrpcService"); + server_service_proto->set_server_class("GraphBrpcServer"); + server_service_proto->set_client_class("GraphBrpcClient"); + server_service_proto->set_start_server_port(0); + server_service_proto->set_server_thread_num(12); + + for (auto& tuple : this->table_id_map) { + VLOG(0) << " make a new table " << tuple.second; + ::paddle::distributed::TableParameter* sparse_table_proto = + downpour_server_proto->add_downpour_table_param(); + GetDownpourSparseTableProto(sparse_table_proto, tuple.second); + } + + return server_fleet_desc; +} + +::paddle::distributed::PSParameter GraphPyClient::GetWorkerProto() { + ::paddle::distributed::PSParameter worker_fleet_desc; + worker_fleet_desc.set_shard_num(get_shard_num()); + ::paddle::distributed::WorkerParameter* worker_proto = + worker_fleet_desc.mutable_worker_param(); + + ::paddle::distributed::DownpourWorkerParameter* downpour_worker_proto = + worker_proto->mutable_downpour_worker_param(); + + for (auto& tuple : this->table_id_map) { + ::paddle::distributed::TableParameter* worker_sparse_table_proto = + downpour_worker_proto->add_downpour_table_param(); + GetDownpourSparseTableProto(worker_sparse_table_proto, tuple.second); + } + + ::paddle::distributed::ServerParameter* server_proto = + worker_fleet_desc.mutable_server_param(); + ::paddle::distributed::DownpourServerParameter* downpour_server_proto = + server_proto->mutable_downpour_server_param(); + ::paddle::distributed::ServerServiceParameter* server_service_proto = + downpour_server_proto->mutable_service_param(); + server_service_proto->set_service_class("GraphBrpcService"); + server_service_proto->set_server_class("GraphBrpcServer"); + server_service_proto->set_client_class("GraphBrpcClient"); + server_service_proto->set_start_server_port(0); + server_service_proto->set_server_thread_num(12); + + for (auto& tuple : this->table_id_map) { + ::paddle::distributed::TableParameter* sparse_table_proto = + downpour_server_proto->add_downpour_table_param(); + GetDownpourSparseTableProto(sparse_table_proto, tuple.second); + } + + return worker_fleet_desc; +} +void GraphPyClient::load_edge_file(std::string name, std::string filepath, + bool reverse) { + std::string params = "edge"; + if (reverse) { + params += "|reverse"; + } + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = + get_ps_client()->load(table_id, std::string(filepath), params); + status.wait(); + } +} + +void GraphPyClient::load_node_file(std::string name, std::string filepath) { + std::string params = "node"; + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = + get_ps_client()->load(table_id, std::string(filepath), params); + status.wait(); + } +} +std::vector> GraphPyClient::sample_k( + std::string name, uint64_t node_id, int sample_size) { + std::vector> v; + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = worker_ptr->sample(table_id, node_id, sample_size, v); + status.wait(); + } + return v; +} +std::vector GraphPyClient::pull_graph_list(std::string name, + int server_index, + int start, int size) { + std::vector res; + if (this->table_id_map.count(name)) { + uint32_t table_id = this->table_id_map[name]; + auto status = + worker_ptr->pull_graph_list(table_id, server_index, start, size, res); + status.wait(); + } + return res; } } } diff --git a/paddle/fluid/distributed/service/graph_py_service.h b/paddle/fluid/distributed/service/graph_py_service.h index 82272deabf721e..56b47d71093f63 100644 --- a/paddle/fluid/distributed/service/graph_py_service.h +++ b/paddle/fluid/distributed/service/graph_py_service.h @@ -44,27 +44,23 @@ namespace paddle { namespace distributed { class GraphPyService { - std::vector keys; + protected: std::vector server_list, port_list, host_sign_list; - int server_size, shard_num, rank, client_id; + int server_size, shard_num; std::unordered_map table_id_map; - std::thread *server_thread, *client_thread; + // std::thread *server_thread, *client_thread; - std::shared_ptr pserver_ptr; + // std::shared_ptr pserver_ptr; - std::shared_ptr worker_ptr; + // std::shared_ptr worker_ptr; public: - std::shared_ptr get_ps_server() { - return pserver_ptr; - } - std::shared_ptr get_ps_client() { - return worker_ptr; - } - int get_client_id() { return client_id; } - void set_client_Id(int client_Id) { this->client_id = client_id; } - int get_rank() { return rank; } - void set_rank(int rank) { this->rank = rank; } + // std::shared_ptr get_ps_server() { + // return pserver_ptr; + // } + // std::shared_ptr get_ps_client() { + // return worker_ptr; + // } int get_shard_num() { return shard_num; } void set_shard_num(int shard_num) { this->shard_num = shard_num; } void GetDownpourSparseTableProto( @@ -82,162 +78,58 @@ class GraphPyService { accessor_proto->set_accessor_class("CommMergeAccessor"); } - ::paddle::distributed::PSParameter GetServerProto() { - // Generate server proto desc - ::paddle::distributed::PSParameter server_fleet_desc; - server_fleet_desc.set_shard_num(shard_num); - ::paddle::distributed::ServerParameter* server_proto = - server_fleet_desc.mutable_server_param(); - ::paddle::distributed::DownpourServerParameter* downpour_server_proto = - server_proto->mutable_downpour_server_param(); - ::paddle::distributed::ServerServiceParameter* server_service_proto = - downpour_server_proto->mutable_service_param(); - server_service_proto->set_service_class("GraphBrpcService"); - server_service_proto->set_server_class("GraphBrpcServer"); - server_service_proto->set_client_class("GraphBrpcClient"); - server_service_proto->set_start_server_port(0); - server_service_proto->set_server_thread_num(12); - - for (auto& tuple : this->table_id_map) { - ::paddle::distributed::TableParameter* sparse_table_proto = - downpour_server_proto->add_downpour_table_param(); - GetDownpourSparseTableProto(sparse_table_proto, tuple.second); - } - - return server_fleet_desc; - } - - ::paddle::distributed::PSParameter GetWorkerProto() { - ::paddle::distributed::PSParameter worker_fleet_desc; - worker_fleet_desc.set_shard_num(shard_num); - ::paddle::distributed::WorkerParameter* worker_proto = - worker_fleet_desc.mutable_worker_param(); - - ::paddle::distributed::DownpourWorkerParameter* downpour_worker_proto = - worker_proto->mutable_downpour_worker_param(); - - for (auto& tuple : this->table_id_map) { - ::paddle::distributed::TableParameter* worker_sparse_table_proto = - downpour_worker_proto->add_downpour_table_param(); - GetDownpourSparseTableProto(worker_sparse_table_proto, tuple.second); - } - - ::paddle::distributed::ServerParameter* server_proto = - worker_fleet_desc.mutable_server_param(); - ::paddle::distributed::DownpourServerParameter* downpour_server_proto = - server_proto->mutable_downpour_server_param(); - ::paddle::distributed::ServerServiceParameter* server_service_proto = - downpour_server_proto->mutable_service_param(); - server_service_proto->set_service_class("GraphBrpcService"); - server_service_proto->set_server_class("GraphBrpcServer"); - server_service_proto->set_client_class("GraphBrpcClient"); - server_service_proto->set_start_server_port(0); - server_service_proto->set_server_thread_num(12); - - for (auto& tuple : this->table_id_map) { - ::paddle::distributed::TableParameter* sparse_table_proto = - downpour_server_proto->add_downpour_table_param(); - GetDownpourSparseTableProto(sparse_table_proto, tuple.second); - } - - return worker_fleet_desc; - } void set_server_size(int server_size) { this->server_size = server_size; } int get_server_size(int server_size) { return server_size; } std::vector split(std::string& str, const char pattern); - - void load_edge_file(std::string name, std::string filepath, bool reverse) { - std::string params = "edge"; - if (reverse) { - params += "|reverse"; - } - if (this->table_id_map.count(name)) { - uint32_t table_id = this->table_id_map[name]; - auto status = - get_ps_client()->load(table_id, std::string(filepath), params); - status.wait(); - } + void set_up(std::string ips_str, int shard_num, + std::vector edge_types); +}; +class GraphPyServer : public GraphPyService { + public: + void set_up(std::string ips_str, int shard_num, + std::vector edge_types, int rank) { + set_rank(rank); + GraphPyService::set_up(ips_str, shard_num, edge_types); } - - void load_node_file(std::string name, std::string filepath) { - std::string params = "node"; - if (this->table_id_map.count(name)) { - uint32_t table_id = this->table_id_map[name]; - auto status = - get_ps_client()->load(table_id, std::string(filepath), params); - status.wait(); - } + int get_rank() { return rank; } + void set_rank(int rank) { this->rank = rank; } + void start_server(); + ::paddle::distributed::PSParameter GetServerProto(); + std::shared_ptr get_ps_server() { + return pserver_ptr; } + protected: + int rank; + std::shared_ptr pserver_ptr; + std::thread* server_thread; +}; +class GraphPyClient : public GraphPyService { + public: + void set_up(std::string ips_str, int shard_num, + std::vector edge_types, int client_id) { + set_client_id(client_id); + GraphPyService::set_up(ips_str, shard_num, edge_types); + } + std::shared_ptr get_ps_client() { + return worker_ptr; + } + void load_edge_file(std::string name, std::string filepath, bool reverse); + void load_node_file(std::string name, std::string filepath); + int get_client_id() { return client_id; } + void set_client_id(int client_id) { this->client_id = client_id; } + void start_client(); std::vector> sample_k(std::string name, uint64_t node_id, - int sample_size) { - std::vector> v; - if (this->table_id_map.count(name)) { - uint32_t table_id = this->table_id_map[name]; - auto status = worker_ptr->sample(table_id, node_id, sample_size, v); - status.wait(); - } - return v; - } + int sample_size); std::vector pull_graph_list(std::string name, int server_index, - int start, int size) { - std::vector res; - if (this->table_id_map.count(name)) { - uint32_t table_id = this->table_id_map[name]; - auto status = - worker_ptr->pull_graph_list(table_id, server_index, start, size, res); - status.wait(); - } - return res; - } - void start_server(std::string ip, uint32_t port) { - server_thread = new std::thread([this, &ip, &port]() { - std::function func = [this, &ip, &port]() { - VLOG(0) << "enter inner function "; - ::paddle::distributed::PSParameter server_proto = - this->GetServerProto(); + int start, int size); + ::paddle::distributed::PSParameter GetWorkerProto(); - auto _ps_env = paddle::distributed::PaddlePSEnvironment(); - _ps_env.set_ps_servers(&this->host_sign_list, - this->host_sign_list.size()); // test - pserver_ptr = std::shared_ptr( - paddle::distributed::PSServerFactory::create(server_proto)); - VLOG(0) << "pserver-ptr created "; - std::vector empty_vec; - framework::ProgramDesc empty_prog; - empty_vec.push_back(empty_prog); - pserver_ptr->configure(server_proto, _ps_env, rank, empty_vec); - VLOG(0) << " starting server " << ip << " " << port; - pserver_ptr->start(ip, port); - }; - std::thread t1(func); - t1.join(); - }); - } - void start_client() { - VLOG(0) << "in start_client " << rank; - std::map> dense_regions; - dense_regions.insert( - std::pair>(0, {})); - auto regions = dense_regions[0]; - ::paddle::distributed::PSParameter worker_proto = GetWorkerProto(); - paddle::distributed::PaddlePSEnvironment _ps_env; - auto servers_ = host_sign_list.size(); - _ps_env = paddle::distributed::PaddlePSEnvironment(); - _ps_env.set_ps_servers(&host_sign_list, servers_); - worker_ptr = std::shared_ptr( - paddle::distributed::PSClientFactory::create(worker_proto)); - worker_ptr->configure(worker_proto, dense_regions, _ps_env, client_id); - } - void set_up(std::string ips_str, int shard_num, int rank, int client_id, - std::vector edge_types); - void set_keys(std::vector keys) { // just for test - this->keys = keys; - } - std::vector get_keys(int start, int size) { // just for test - return std::vector(keys.begin() + start, keys.begin() + start + size); - } + protected: + int client_id; + std::shared_ptr worker_ptr; + std::thread* client_thread; }; } } diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index fccca80a173021..9d0aa046db1017 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -222,17 +222,15 @@ void RunBrpcPushSparse() { std::vector> v; pull_status = worker_ptr_->sample(0, 37, 4, v); pull_status.wait(); - // for (auto g : v) { - // std::cout << g.get_id() << " " << g.get_graph_node_type() << std::endl; - // } ASSERT_EQ(v.size(), 3); v.clear(); pull_status = worker_ptr_->sample(0, 96, 4, v); pull_status.wait(); - std::unordered_set s = { 111, 48, 247 } ASSERT_EQ(3, v.size()); + std::unordered_set s = {111, 48, 247}; + ASSERT_EQ(3, v.size()); for (auto g : v) { // std::cout << g.first << std::endl; - ASSERT_EQ(true, s.find(g.first) != s.end()) + ASSERT_EQ(true, s.find(g.first) != s.end()); } v.clear(); std::vector nodes; @@ -253,22 +251,63 @@ void RunBrpcPushSparse() { std::cout << g.get_id() << std::endl; } - distributed::GraphPyService gps1, gps2; + // distributed::GraphPyService gps1, gps2; + distributed::GraphPyServer server1, server2; + distributed::GraphPyClient client1, client2; std::string ips_str = "127.0.0.1:4211;127.0.0.1:4212"; std::vector edge_types = {std::string("user2item")}; - gps1.set_up(ips_str, 127, 0, 0, edge_types); - gps2.set_up(ips_str, 127, 1, 1, edge_types); - gps1.load_edge_file(std::string("user2item"), std::string(file_name), 0); + server1.set_up(ips_str, 127, edge_types, 0); + server2.set_up(ips_str, 127, edge_types, 1); + client1.set_up(ips_str, 127, edge_types, 0); + client2.set_up(ips_str, 127, edge_types, 1); + server1.start_server(); + std::cout << "first server done" << std::endl; + server2.start_server(); + std::cout << "second server done" << std::endl; + client1.start_client(); + std::cout << "first client done" << std::endl; + client2.start_client(); + std::cout << "first client done" << std::endl; + std::cout << "started" << std::endl; + client1.load_edge_file(std::string("user2item"), std::string(file_name), 0); + // client2.load_edge_file(std::string("user2item"), std::string(file_name), + // 0); nodes.clear(); - nodes = gps2.pull_graph_list(std::string("user2item"), 0, 1, 4); + nodes = client2.pull_graph_list(std::string("user2item"), 0, 1, 4); ASSERT_EQ(nodes[0].get_id(), 59); nodes.clear(); - v = gps2.sample_k(std::string("user2item"), 96, 4); + v = client1.sample_k(std::string("user2item"), 96, 4); ASSERT_EQ(v.size(), 3); std::cout << "sample result" << std::endl; for (auto p : v) { std::cout << p.first << " " << p.second << std::endl; } + /* +from paddle.fluid.core import GraphPyService +ips_str = "127.0.0.1:4211;127.0.0.1:4212" +server1 = GraphPyServer() +server2 = GraphPyServer() +client1 = GraphPyClient() +client2 = GraphPyClient() +edge_types = ["user2item"] +server1.set_up(ips_str,127,edge_types,0); +server2.set_up(ips_str,127,edge_types,1); +client1.set_up(ips_str,127,edge_types,0); +client2.set_up(ips_str,127,edge_types,1); +server1.start_server(); +server2.start_server(); +client1.start_client(); +client2.start_client(); +client1.load_edge_file(user2item", "input.txt", 0); +list = client2.pull_graph_list("user2item",0,1,4) +for x in list: + print(x.get_id()) + +list = client1.sample_k("user2item",96, 4); +for x in list: + print(x.get_id()) + */ + // to test in python,try this: // from paddle.fluid.core import GraphPyService // ips_str = "127.0.0.1:4211;127.0.0.1:4212" diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index e967acee77d20f..ac40a611475127 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -43,6 +43,8 @@ using paddle::distributed::FleetWrapper; using paddle::distributed::HeterClient; using paddle::distributed::GraphPyService; using paddle::distributed::GraphNode; +using paddle::distributed::GraphPyServer; +using paddle::distributed::GraphPyClient; namespace paddle { namespace pybind { @@ -162,14 +164,24 @@ void BindGraphNode(py::module* m) { .def("get_id", &GraphNode::get_id) .def("get_feature", &GraphNode::get_feature); } -void BindGraphService(py::module* m) { - py::class_(*m, "GraphPyService") +void BindGraphPyService(py::module* m) { + py::class_(*m, "GraphPyService").def(py::init<>()); +} +void BindGraphPyServer(py::module* m) { + py::class_(*m, "GraphPyServer") + .def(py::init<>()) + .def("start_server", &GraphPyServer::start_server) + .def("set_up", &GraphPyServer::set_up); +} +void BindGraphPyClient(py::module* m) { + py::class_(*m, "GraphPyClient") .def(py::init<>()) - .def("load_edge_file", &GraphPyService::load_edge_file) - .def("load_node_file", &GraphPyService::load_node_file) - .def("set_up", &GraphPyService::set_up) - .def("pull_graph_list", &GraphPyService::pull_graph_list) - .def("sample_k", &GraphPyService::sample_k); + .def("load_edge_file", &GraphPyClient::load_edge_file) + .def("load_node_file", &GraphPyClient::load_node_file) + .def("set_up", &GraphPyClient::set_up) + .def("pull_graph_list", &GraphPyClient::pull_graph_list) + .def("sample_k", &GraphPyClient::sample_k) + .def("start_client", &GraphPyClient::start_client); } } // end namespace pybind diff --git a/paddle/fluid/pybind/fleet_py.h b/paddle/fluid/pybind/fleet_py.h index 57861a8e2ed6e4..aea5e69ca0df19 100644 --- a/paddle/fluid/pybind/fleet_py.h +++ b/paddle/fluid/pybind/fleet_py.h @@ -28,7 +28,8 @@ void BindCommunicatorContext(py::module* m); void BindDistCommunicator(py::module* m); void BindHeterClient(py::module* m); void BindGraphNode(py::module* m); -void BindGraphService(py::module* m); - +void BindGraphPyService(py::module* m); +void BindGraphPyServer(py::module* m); +void BindGraphPyClient(py::module* m); } // namespace pybind } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 34a44e74c1b341..9bba6ab74e3caa 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -2847,7 +2847,9 @@ All parameter, weight, gradient are variables in Paddle. BindDistCommunicator(&m); BindHeterClient(&m); BindGraphNode(&m); - BindGraphService(&m); + BindGraphPyService(&m); + BindGraphPyServer(&m); + BindGraphPyClient(&m); #endif } } // namespace pybind