Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion paddle/operators/detail/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ class RequestBase {

CallStatus Status() { return status_; }
void SetStatus(CallStatus status) { status_ = status; }
virtual std::string GetReqName() { assert(false); }
virtual std::string GetReqName() {
assert(false);
return "";
}

protected:
grpc::ServerContext ctx_;
Expand Down
14 changes: 7 additions & 7 deletions paddle/operators/recv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ limitations under the License. */
namespace paddle {
namespace operators {

constexpr char kOptimizeBlock[] = "OptimizeBlock";

void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) {
service->RunSyncUpdate();
VLOG(4) << "RunServer thread end";
Expand Down Expand Up @@ -150,14 +152,12 @@ class RecvOp : public framework::OperatorBase {

rpc_service_->Reset();

std::string program_str = Attr<std::string>("OptimizeProgram");
framework::proto::ProgramDesc program_desc;
program_desc.ParseFromString(program_str);
framework::ProgramDesc program(program_desc);
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program();
framework::Executor executor(dev_place);
// Run sub graph to get optimized tensor
try {
executor.Run(program, &recv_scope, 0, /*global_block*/
executor.Run(*program, &recv_scope, block->ID(), /*global_block*/
false /*create_local_scope*/, false /*create_vars*/);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
Expand Down Expand Up @@ -189,8 +189,8 @@ This operator will recv tensor from send_op
"IP address to listen on.")
.SetDefault("127.0.0.1:6164")
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<std::string>("OptimizeProgram", "type string",
"Serialized ProgramDesc string for recv to run.");
AddAttr<framework::BlockDesc *>(
kOptimizeBlock, "Serialized ProgramDesc string for recv to run.");
AddAttr<std::vector<std::string>>(
"ParamList", "type list of string",
"grad->param name mapping to find which param to optimize.")
Expand Down
5 changes: 1 addition & 4 deletions paddle/operators/send_recv_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,7 @@ void StartServerNet(bool is_sparse) {
attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
attrs.insert({"GradList", std::vector<std::string>({"x1"})});
std::string program_proto;
PADDLE_ENFORCE(program.Proto()->SerializeToString(&program_proto));

attrs.insert({"OptimizeProgram", program_proto});
attrs.insert({"OptimizeBlock", block});
recv_op = f::OpRegistry::CreateOp("recv", {{"RX", {"x1"}}}, {}, attrs);
recv_op->Run(scope, place);
}
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/v2/fluid/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def get_pserver_program(self, endpoint, optimize_ops):
}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"OptimizeBlock": optimize_sub_program.global_block(),
"endpoint": endpoint,
"ParamList": [
p.name
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/v2/fluid/distribute_transpiler_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def get_pserver_program(self, endpoint, optimize_ops):
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"OptimizeBlock": optimize_sub_program.global_block(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge optimize_sub_program to the pserver_program as the second block (block_id 1), then we don't need to create variables in both two of the programs.

"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
Expand Down