[fleet_executor] Add compute interceptor#37376
[fleet_executor] Add compute interceptor#37376wangxicoding merged 1 commit intoPaddlePaddle:developfrom
Conversation
|
Thanks for your contribution! |
6782f96 to
2e75832
Compare
acd19ea to
3793ee5
Compare
3793ee5 to
d6a4958
Compare
| void ComputeInterceptor::Compute(const InterceptorMessage& msg) { | ||
| if (msg.message_type() == DATA_IS_READY) { | ||
| auto src_id = msg.src_id(); | ||
| upstream_deps_.erase(src_id); |
There was a problem hiding this comment.
这里感觉erase不太好,这个Compute会被调用很多次,它的上下游是固定的话,一次compute清空,下一次算还要加回来,感觉。
There was a problem hiding this comment.
是的,我也觉得不太好。特别是有多个上游情况下,如果有个上游产出多次,这个还会出错。不过目前是demo,所以没那么讲究。
可能还是新建一个空的,然后填充对比比较好一些,但这样还是有上游产出多次的问题需要考虑下怎么解决。
There was a problem hiding this comment.
我觉得完全不考虑micro scope可能有问题🤨
比如C同时依赖A,B的情况,且A的运行速度比B快。需要跑两个micro steps。
| time | event | C's upstream_deps() |
|---|---|---|
| 0 | C初始化 | (A, B) |
| 1 | A的micro step 0完成,A->C: DATA_IS_READY | (B) |
| 2 | A的micro step 1完成,A->C: DATA_IS_READY | (B) |
| 3 | B的micro step 0完成,B->C: DATA_IS_READY | (),C开始执行micro step 0 |
| 4 | C重新构建upstream_deps | (A, B) |
| 5 | B的micro step 1完成,B->C: DATA_IS_READY | (A) |
这样时间2这一刻,A给C发送的micro step1的DATA_IS_READY怎么处理?且在最后,AB都结束了两个micro steps的运行,但是C永远会在等A的第二个micro step的DATA_IS_READY。
现阶段我们的上下游依赖很简单,应该都是单依赖的。
There was a problem hiding this comment.
嗯,现在这个写的是个demo compute,后续需要有buffer作为流控,需要一个个buffer写,一个buffer写满了才能计算一个
| void Interceptor::PoolTheMailbox() { | ||
| // pool the local mailbox, parse the Message | ||
| while (true) { | ||
| for (;;) { |
| void ComputeInterceptor::Compute(const InterceptorMessage& msg) { | ||
| if (msg.message_type() == DATA_IS_READY) { | ||
| auto src_id = msg.src_id(); | ||
| upstream_deps_.erase(src_id); |
There was a problem hiding this comment.
我觉得完全不考虑micro scope可能有问题🤨
比如C同时依赖A,B的情况,且A的运行速度比B快。需要跑两个micro steps。
| time | event | C's upstream_deps() |
|---|---|---|
| 0 | C初始化 | (A, B) |
| 1 | A的micro step 0完成,A->C: DATA_IS_READY | (B) |
| 2 | A的micro step 1完成,A->C: DATA_IS_READY | (B) |
| 3 | B的micro step 0完成,B->C: DATA_IS_READY | (),C开始执行micro step 0 |
| 4 | C重新构建upstream_deps | (A, B) |
| 5 | B的micro step 1完成,B->C: DATA_IS_READY | (A) |
这样时间2这一刻,A给C发送的micro step1的DATA_IS_READY怎么处理?且在最后,AB都结束了两个micro steps的运行,但是C永远会在等A的第二个micro step的DATA_IS_READY。
现阶段我们的上下游依赖很简单,应该都是单依赖的。
| int64_t max_run_times_; | ||
| int64_t max_slot_nums_; | ||
|
|
||
| std::string type_; |
There was a problem hiding this comment.
这个是做什么的?区分interceptor种类用的?
There was a problem hiding this comment.
是的,carrier构建用的,这段逻辑还没有加
|
|
||
| InterceptorMessage msg; | ||
| msg.set_message_type(DATA_IS_READY); | ||
| a->Send(1, msg); |
There was a problem hiding this comment.
是不是可以给StopInterceptor加一个 finish的flag,这里wait那个flag。然后就可以delete那三个new出来的指针了?
There was a problem hiding this comment.
这个后续讨论一下,我觉得应该是通过析构来
PR types
Others
PR changes
Others
Describe
Add
democompute interceptor