Batch barrier in send/recv op#7847
Conversation
| void SetCond(int cond); | ||
| void WaitClientGet(int count); | ||
| bool CondEqualTo(int cond); | ||
| void SubCond(int arg); |
There was a problem hiding this comment.
SubCond is not used in this PR.
There was a problem hiding this comment.
Yep, this PR is also work in progress and this interface will be used in rpc_server while recv a BatchBarrier RPC call, I will implement this shortly.
| return true; | ||
| } | ||
|
|
||
| bool RPCClient::AsyncBatchBarrier(const std::string& ep, int64_t time_out) { |
There was a problem hiding this comment.
BatchBarrier是个名词,看这个实现应该是AsyncSendBatchBarrier,或者这个实现直接改成一个同步的调用会比较方便。
There was a problem hiding this comment.
Done, 同时支持同步和异步两种请求的Server可能会比较复杂,现在的方案可以复用AsyncSendVariable接口来发送barrier signal,代码会简洁很多,如果后续有强需求支持同步的请求,再实现同步的接口?
|
|
||
| cq_send_ = builder.AddCompletionQueue(); | ||
| cq_get_ = builder.AddCompletionQueue(); | ||
| cq_batch_barrier_ = builder.AddCompletionQueue(); |
There was a problem hiding this comment.
cq_batch_barrier_ seems never used.
There was a problem hiding this comment.
Done, sorry I forgot to delete the older code.
| volatile bool is_shut_down_ = false; | ||
| std::unique_ptr<grpc::ServerCompletionQueue> cq_send_; | ||
| std::unique_ptr<grpc::ServerCompletionQueue> cq_get_; | ||
| std::unique_ptr<grpc::ServerCompletionQueue> cq_batch_barrier_; |
There was a problem hiding this comment.
cq_batch_barrier_ seems never used.
paddle/operators/recv_op.cc
Outdated
| batch_barrier++; | ||
| continue; | ||
| } | ||
| barrier_size++; |
There was a problem hiding this comment.
barrier_size is used only for printing log, can remove or rename it.
paddle/operators/recv_op.cc
Outdated
| for (size_t i = 0; i < barrier_size; ++i) { | ||
| size_t barrier_size = 0; | ||
| int batch_barrier = 0; | ||
| while (batch_barrier != fan_in || !rpc_service_->IsRecvQueueEmpty()) { |
There was a problem hiding this comment.
Not sure why !rpc_service_->IsRecvQueueEmpty() is needed. rpc_service_->Get() is a blocking call which will wait until a new message arrives.
There was a problem hiding this comment.
Thanks, it's not used :)
There was a problem hiding this comment.
I deleted !rpc_service_->IsRecvQueueEmpty(), because send op would send barrier signal by least, if RecvOp received barrier signal, it should be the least message from one trainer.
typhoonzero
left a comment
There was a problem hiding this comment.
LGTM, one minor comment can be updated later.
paddle/operators/send_op.cc
Outdated
| } | ||
| PADDLE_ENFORCE(client_.Wait()); | ||
|
|
||
| std::set<std::string> epset(epmap.begin(), epmap.end()); |
There was a problem hiding this comment.
Use endpoints attribute is the same thing.
Fixed #7764