Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
addd5fc
miss format (#34771)
b3602sss Aug 11, 2021
45af4f2
[NPU] add elementwise_min_grad_op_npu,test=develop (#34731)
andyjiang1116 Aug 11, 2021
b5ec65e
[NPU] Add exp and exp_grad npu op (#34612)
0x45f Aug 11, 2021
88f2f4a
[HybridParallel] Support save/load for PipeLineParallel (#34768)
ForFishes Aug 11, 2021
3f962e7
add the basic apis for auto_parallel (#33804)
Aug 11, 2021
4d7af37
[hybrid] pp+dp support fp16 allreduce (#34762)
wangxicoding Aug 11, 2021
3f011d8
Add ext_tensor.slice() API (#34227)
Aug 11, 2021
9ed5db2
[NPU] add batch_norm_op_npu and test (#34056)
ronny1996 Aug 11, 2021
f6fab55
[NPU] add reduce_mean_op_npu and test (#34053)
ronny1996 Aug 11, 2021
9e3e08f
[NPU] add momentum_op_npu and test (#34082)
ronny1996 Aug 11, 2021
d45d311
split_op for npu (#34699)
sljlp Aug 11, 2021
234c21a
[NPU] add while, read_from_array and write_to_array npu op (#34755)
pangyoki Aug 11, 2021
fc537d4
[NPU] Support npu op flatten_contiguous_range_grad (#34798)
WorgenZhang Aug 11, 2021
3429c04
[Paddle TRT]fix_fc_int8_convert; fix_reshape_convert (#34787)
Wangzheee Aug 11, 2021
9d02313
`set_value_grad` propagate gradients to `Input` and `TensorValue` (#…
hbwx24 Aug 11, 2021
99f8f5c
[AMP] add state_dict and load_state_dict and unittest for class GradS…
zhangbo9674 Aug 11, 2021
0a5c99e
[oneDNN] Fix to issue #34554 (#34623)
jczaja Aug 11, 2021
1c31d9d
add det_mv3_db & LeViT test case in pr-ci-inference (#34803)
OliverLPH Aug 12, 2021
bc543e3
[NPU] Support npu op expand_v2 and expand_v2_grad (#34764)
WorgenZhang Aug 12, 2021
cfa6913
[NPU] Support npu kernel for smooth_l1_loss op (#34674)
wuhuachaocoding Aug 12, 2021
589d13c
[HybridParallel]Add Recompute for PipeLineParallel (#34607)
ForFishes Aug 12, 2021
0e28c8b
Fix safety-bug of functional.linear (#34696)
Ray2020BD Aug 12, 2021
016cc56
transformer c files (#34706)
xingfeng01 Aug 12, 2021
6326c3e
[Inference] Inference python api support fp16 (#34676)
jiweibo Aug 12, 2021
dffb0b2
fix set_grad_ivar bug of Tensor.backward (#34819)
zhwesky2010 Aug 12, 2021
dc62a22
Revert "[oneDNN] Fix to issue #34554 (#34623)" (#34838)
chenwhql Aug 12, 2021
572adcc
Remove incorrect signal error stack trace (#34842)
chenwhql Aug 12, 2021
3f71e8d
[NPU] add meshgrid, test=develop (#34576)
qili93 Aug 12, 2021
2164ad6
[npu]add unsqueeze2_grad,test=develop (#34733)
andyjiang1116 Aug 13, 2021
e92f038
add retry for gethostbyname (#34855)
Baibaifan Aug 13, 2021
507ea06
[Bug-Fix]fix bug of py36 import utils (#34873)
ForFishes Aug 13, 2021
17a9976
fix npu_finalize (#34857)
ronny1996 Aug 13, 2021
5b86b99
[NPU] fix bce_loss_npu, test=develop (#34876)
qili93 Aug 13, 2021
8bc4d85
Support sccache distributed storage on windows (#34879)
zhwesky2010 Aug 13, 2021
ac56d54
Add EmptyGradOpMaker CI Approval (#34810)
Aug 13, 2021
f421741
fix generator thread safety bug (#34888)
sneaxiy Aug 13, 2021
fc6b4a5
Bug fix : Can't load multiple modules of custom c++ op (#34505)
zyfncg Aug 13, 2021
ff4bdac
fix a bug of slice by none index (#34877)
zyfncg Aug 13, 2021
8c8667f
New Einsum API (#33821)
tongxin Aug 13, 2021
2cd05d5
[hybrid] refine pipeline stage and mp send/recv check (#34870)
wangxicoding Aug 13, 2021
7316018
[NPU] add p_norm_op_npu (#34695)
ronny1996 Aug 16, 2021
8fb17fc
Op-benchmark CI cpu and gpu (#34631)
tianshuo78520a Aug 16, 2021
34d188b
Check whl size (#34767)
tianshuo78520a Aug 16, 2021
28279f6
[NPU] remove npu int64 kernel for increment op (#34909)
zhiqiu Aug 16, 2021
e84b2e9
Add bcast semantics checks at C++ level to BroadcastTensorsOp (#34874)
jim19930609 Aug 16, 2021
ad6c3b9
[dev] fix dice_loss bug (#34757)
ghostxsl Aug 16, 2021
fd92d94
Support npu op hard_swish and hard_swish_grad (#34608)
zyfncg Aug 16, 2021
dc439a1
Enhance tensor shape check for dist op. (#34915)
ZHUI Aug 16, 2021
b0cb414
support margin loss (arcface, cosface, sphereface) for single GPU and…
GuoxiaWang Aug 16, 2021
e29c2d1
[amp] dygraph amp support param_group (#34899)
zhiqiu Aug 16, 2021
875cfd5
add unique_consecutive_op (#34334)
firestonelib Aug 16, 2021
e4e8cc9
[NPU] Support NPU kernel for nearest_interp and nearest_interp_grad o…
From00 Aug 16, 2021
3b9f040
[NPU] add nearest_interp_v2 and nearest_interp_v2_grad, test=develop …
qili93 Aug 16, 2021
f6d8ab5
fix iscan bug in test file (#34912)
lelelelelez Aug 16, 2021
9cb6565
[oneDNN] Fix to 34554 (same as previous PR but should build with GPU)…
jczaja Aug 16, 2021
ae80df9
Fix elementwise_add quantization (#34820)
wozna Aug 16, 2021
d028214
[CPU-PSLIB] Add config for scale_sparse_grad in config_fleet.py,test=…
WorgenZhang Aug 16, 2021
2e30134
Change the invoking method of settiem by Ellipsis and None index from…
zyfncg Aug 16, 2021
4981894
[NPU] Add size npu op (#34636)
0x45f Aug 16, 2021
6b4b9fe
hccl init sync (#34918)
Baibaifan Aug 16, 2021
2a4ed08
Jetson nano bilinear (#34751)
Aug 16, 2021
b1cc4a4
[NPU] Support npu op:(1)arg_min (2)arg_max (#34867)
veyron95 Aug 16, 2021
35ef418
Fix typos in English docs for diag and diagflat. (#34869)
limin2021 Aug 16, 2021
ed6624a
concurrent (#34908)
lelelelelez Aug 16, 2021
181f7ce
fix a bug in nlp: text_matching/sentence_transformers when last dim i…
AnnaTrainingG Aug 17, 2021
1614608
fix drop_last not work on IterableDataset (#34801)
heavengate Aug 17, 2021
5de576b
add api fill_diagonal_inplace (#34460)
zhiboniu Aug 17, 2021
8046e33
Add some passes which can be applied to Program (#34730)
sneaxiy Aug 17, 2021
10f9644
Align CTC grad scale same with ESPNet (#34729)
zh794390558 Aug 17, 2021
9d4f00b
add mkl multi-thread test cases in PR-CI-INFERENCE (#34946)
OliverLPH Aug 17, 2021
8ef1bf8
[bug fix] fix unfold negative_size_param (#34943)
ghostxsl Aug 17, 2021
01a3a2e
Modify the name of class in unittest with the same name (#34952)
hbwx24 Aug 17, 2021
7b3295a
add exclude rules of pre-commit for paddle/utils and third_party (#34…
MingMingShangTian Aug 17, 2021
f1c1d9e
[oneDNN ] disabling more ops caching (#34830)
jczaja Aug 17, 2021
9be4144
Copy boost optional to Paddle (#34780)
MingMingShangTian Aug 17, 2021
690f583
Update op-benchmark CI (#34962)
tianshuo78520a Aug 17, 2021
1ef2185
[NPU] add where_index op and tests (#34951)
Aganlengzi Aug 17, 2021
b4474fb
[NPU]Adamw skip update for npu (#34897)
sljlp Aug 17, 2021
c7070cb
[Paddle-TRT] unitest_quant_dequant (#34929)
Wangzheee Aug 18, 2021
100db44
support class center sample of PartialFC (#34106)
GuoxiaWang Aug 18, 2021
52a7b0c
[NPU] add retry on HcclGetRootInfo to fix "bind fail" (#34977)
zhiqiu Aug 18, 2021
12bf046
add the safe check for the some ops (#34978)
wawltor Aug 18, 2021
a137371
NPU use squared_l2_norm in GradientClipByGlobalNorm (#34836)
wangxicoding Aug 18, 2021
2e9a31e
Fix bug in alltoall (#34975)
Aug 18, 2021
dd533dd
Add function to disable paddle signal handler (#34577)
jim19930609 Aug 18, 2021
4d88cdb
[CustomOp] Fix ext_tensor.cast failed bug (#34884)
chenwhql Aug 18, 2021
8967a66
support quantization of conv2d_transpose (#34547)
XGZhang11 Aug 18, 2021
209075a
[CPU-PSLIB] Add consistency insepection of use_var_list and data_gene…
WorgenZhang Aug 18, 2021
51939c8
Fix the parameter name for atan2 API (#34812)
ronny1996 Aug 18, 2021
a9673b4
[Hybrid Performance] Move the cast op of AMP which cast fp32 param to…
wangxicoding Aug 18, 2021
40f6273
[NPU] Add leaky Relu (#34894)
Jackwaterveg Aug 18, 2021
1b71a71
[NPU] Add square grad (#34889)
Jackwaterveg Aug 18, 2021
1b747de
add paddle detection model in pr-ci-inference (#34986)
OliverLPH Aug 18, 2021
40d4d83
code refactoring for new executor (#34970)
wanghuancoder Aug 18, 2021
248e27b
fix pad outliers err (#34979)
littletomatodonkey Aug 18, 2021
755c8a1
Add NPU kernel for norm Op: float16 and float32 (#34609)
2742195759 Aug 18, 2021
9cbba97
[NPU]add rmsprop op (#34864)
lzzyzlbb Aug 18, 2021
22da190
Abstract DeviceEvent to manage cross-platform Event implementation (#…
Aurelius84 Aug 19, 2021
c4e05e1
Fix op-benchmark cpu/gpu error (#34997)
tianshuo78520a Aug 19, 2021
26213a7
Fix Inference CI CPU/GPU (#34931)
tianshuo78520a Aug 19, 2021
255fc7d
add the auto scan test for TensorRT convert,test=develop (#34980)
winter-wang Aug 19, 2021
ca7f520
fix batch_norm and instance norm when input is [] (#34107)
ceci3 Aug 19, 2021
a2e0865
Add dimension check for inverse to avoid dividing by 0 error when inp…
Xreki Aug 19, 2021
97cae5e
add resnet50_quant model in PR-CI-INFERENCE (#35012)
OliverLPH Aug 19, 2021
ef024c8
remove unused statements in test_dist_base.py (#35017)
Aug 19, 2021
ed9a14e
Fix op-benchmark cpu/gpu; test=document_fix (#35027)
tianshuo78520a Aug 19, 2021
866c1ea
fix reshape when is a number (#35016)
parap1uie-s Aug 19, 2021
4641e8f
[NPU] Support npu kernel for sin op (#34844)
JZZ-NOTE Aug 19, 2021
096b0f2
Add op benchmark run function log (#35034)
tianshuo78520a Aug 20, 2021
1aa2bde
[bug fix] fix spectral_norm bug (#35005)
ghostxsl Aug 20, 2021
4637151
add (N,C,*) input support for GroupNorm (#34773)
zoooo0820 Aug 20, 2021
f927b65
temporary disable resnet50-quant multi-thread test (#35035)
OliverLPH Aug 20, 2021
d082955
[NPU] Support npu op where and where grad (#34587)
zhaoyinglia Aug 20, 2021
4c115a8
[NPU] Support npu op depthwise_conv2d (#34853)
zhaoyinglia Aug 20, 2021
4416c79
fix set_lod in data_feed (#35000)
esythan Aug 20, 2021
6bacfb0
use spin lock in auto growth allocator (#34910)
wanghuancoder Aug 20, 2021
ef517a5
[NPU] Support npu kernel for pad3d op (#34815)
betterpig Aug 20, 2021
99ffeff
[npu]Add argsort op (#34865)
lzzyzlbb Aug 20, 2021
f6015d0
fix model-benchmark build error (#35041)
tianshuo78520a Aug 20, 2021
4d9b2d6
[hybrid performance] Grad fuse for gradient merge under pipeline mode…
FeixLiu Aug 20, 2021
e2241a4
Add paddle.linalg.matrix_power OP (#34667)
Aug 20, 2021
56c5e21
implementation of broadcast add backward by reduce (#34143)
ZzSean Aug 22, 2021
cf99c0d
Add cuda.device_count api (#34811)
linjieccc Aug 23, 2021
77a8a39
add adamw cuda kernel (#35020)
zhaoyinglia Aug 23, 2021
c3efabe
set node feature (#34994)
seemingwang Aug 23, 2021
aefec22
Fix a bug of strided_slice op, about the axes parameter access memory…
TeslaZhao Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
5 changes: 4 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ repos:
entry: python ./tools/codestyle/copyright.hook
language: system
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|py|sh)$
exclude: (?!.*third_party)^.*$ | (?!.*book)^.*$
exclude: |
(?x)^(
paddle/utils/.*
)$
1 change: 1 addition & 0 deletions cmake/cupti.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ find_path(CUPTI_INCLUDE_DIR cupti.h
$ENV{CUPTI_ROOT} $ENV{CUPTI_ROOT}/include
${CUDA_TOOLKIT_ROOT_DIR}/extras/CUPTI/include
${CUDA_TOOLKIT_ROOT_DIR}/targets/x86_64-linux/include
${CUDA_TOOLKIT_ROOT_DIR}/targets/aarch64-linux/include
NO_DEFAULT_PATH
)

Expand Down
96 changes: 96 additions & 0 deletions paddle/fluid/distributed/service/graph_brpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,102 @@ std::future<int32_t> GraphBrpcClient::pull_graph_list(
closure);
return fut;
}

std::future<int32_t> GraphBrpcClient::set_node_feat(
const uint32_t &table_id, const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
const std::vector<std::vector<std::string>> &features) {
std::vector<int> request2server;
std::vector<int> server2request(server_size, -1);
for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx) {
int server_index = get_server_index_by_id(node_ids[query_idx]);
if (server2request[server_index] == -1) {
server2request[server_index] = request2server.size();
request2server.push_back(server_index);
}
}
size_t request_call_num = request2server.size();
std::vector<std::vector<uint64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int>> query_idx_buckets(request_call_num);
std::vector<std::vector<std::vector<std::string>>> features_idx_buckets(
request_call_num);
for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx) {
int server_index = get_server_index_by_id(node_ids[query_idx]);
int request_idx = server2request[server_index];
node_id_buckets[request_idx].push_back(node_ids[query_idx]);
query_idx_buckets[request_idx].push_back(query_idx);
if (features_idx_buckets[request_idx].size() == 0) {
features_idx_buckets[request_idx].resize(feature_names.size());
}
for (int feat_idx = 0; feat_idx < feature_names.size(); ++feat_idx) {
features_idx_buckets[request_idx][feat_idx].push_back(
features[feat_idx][query_idx]);
}
}

DownpourBrpcClosure *closure = new DownpourBrpcClosure(
request_call_num,
[&, node_id_buckets, query_idx_buckets, request_call_num](void *done) {
int ret = 0;
auto *closure = (DownpourBrpcClosure *)done;
size_t fail_num = 0;
for (int request_idx = 0; request_idx < request_call_num;
++request_idx) {
if (closure->check_response(request_idx, PS_GRAPH_SET_NODE_FEAT) !=
0) {
++fail_num;
}
if (fail_num == request_call_num) {
ret = -1;
}
}
closure->set_promise_value(ret);
});

auto promise = std::make_shared<std::promise<int32_t>>();
closure->add_promise(promise);
std::future<int> fut = promise->get_future();

for (int request_idx = 0; request_idx < request_call_num; ++request_idx) {
int server_index = request2server[request_idx];
closure->request(request_idx)->set_cmd_id(PS_GRAPH_SET_NODE_FEAT);
closure->request(request_idx)->set_table_id(table_id);
closure->request(request_idx)->set_client_id(_client_id);
size_t node_num = node_id_buckets[request_idx].size();

closure->request(request_idx)
->add_params((char *)node_id_buckets[request_idx].data(),
sizeof(uint64_t) * node_num);
std::string joint_feature_name =
paddle::string::join_strings(feature_names, '\t');
closure->request(request_idx)
->add_params(joint_feature_name.c_str(), joint_feature_name.size());

// set features
std::string set_feature = "";
for (size_t feat_idx = 0; feat_idx < feature_names.size(); ++feat_idx) {
for (size_t node_idx = 0; node_idx < node_num; ++node_idx) {
size_t feat_len =
features_idx_buckets[request_idx][feat_idx][node_idx].size();
set_feature.append((char *)&feat_len, sizeof(size_t));
set_feature.append(
features_idx_buckets[request_idx][feat_idx][node_idx].data(),
feat_len);
}
}
closure->request(request_idx)
->add_params(set_feature.c_str(), set_feature.size());

GraphPsService_Stub rpc_stub =
getServiceStub(get_cmd_channel(server_index));
closure->cntl(request_idx)->set_log_id(butil::gettimeofday_ms());
rpc_stub.service(closure->cntl(request_idx), closure->request(request_idx),
closure->response(request_idx), closure);
}

return fut;
}

int32_t GraphBrpcClient::initialize() {
// set_shard_num(_config.shard_num());
BrpcPsClient::initialize();
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/distributed/service/graph_brpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class GraphBrpcClient : public BrpcPsClient {
const std::vector<std::string>& feature_names,
std::vector<std::vector<std::string>>& res);

virtual std::future<int32_t> set_node_feat(
const uint32_t& table_id, const std::vector<uint64_t>& node_ids,
const std::vector<std::string>& feature_names,
const std::vector<std::vector<std::string>>& features);

virtual std::future<int32_t> clear_nodes(uint32_t table_id);
virtual std::future<int32_t> add_graph_node(
uint32_t table_id, std::vector<uint64_t>& node_id_list,
Expand Down
42 changes: 42 additions & 0 deletions paddle/fluid/distributed/service/graph_brpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "paddle/fluid/distributed/service/brpc_ps_server.h"

#include <thread> // NOLINT
#include <utility>
#include "butil/endpoint.h"
#include "iomanip"
#include "paddle/fluid/distributed/service/brpc_ps_client.h"
Expand Down Expand Up @@ -157,6 +158,8 @@ int32_t GraphBrpcService::initialize() {
&GraphBrpcService::add_graph_node;
_service_handler_map[PS_GRAPH_REMOVE_GRAPH_NODE] =
&GraphBrpcService::remove_graph_node;
_service_handler_map[PS_GRAPH_SET_NODE_FEAT] =
&GraphBrpcService::graph_set_node_feat;
// shard初始化,server启动后才可从env获取到server_list的shard信息
initialize_shard_info();

Expand Down Expand Up @@ -400,5 +403,44 @@ int32_t GraphBrpcService::graph_get_node_feat(Table *table,

return 0;
}

int32_t GraphBrpcService::graph_set_node_feat(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
CHECK_TABLE_EXIST(table, request, response)
if (request.params_size() < 3) {
set_response_code(
response, -1,
"graph_set_node_feat request requires at least 2 arguments");
return 0;
}
size_t node_num = request.params(0).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(0).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);

std::vector<std::string> feature_names =
paddle::string::split_string<std::string>(request.params(1), "\t");

std::vector<std::vector<std::string>> features(
feature_names.size(), std::vector<std::string>(node_num));

const char *buffer = request.params(2).c_str();

for (size_t feat_idx = 0; feat_idx < feature_names.size(); ++feat_idx) {
for (size_t node_idx = 0; node_idx < node_num; ++node_idx) {
size_t feat_len = *(size_t *)(buffer);
buffer += sizeof(size_t);
auto feat = std::string(buffer, feat_len);
features[feat_idx][node_idx] = feat;
buffer += feat_len;
}
}

((GraphTable *)table)->set_node_feat(node_ids, feature_names, features);

return 0;
}

} // namespace distributed
} // namespace paddle
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/service/graph_brpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@ class GraphBrpcService : public PsBaseService {
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl);

int32_t graph_get_node_feat(Table *table, const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl);
int32_t graph_set_node_feat(Table *table, const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl);
int32_t clear_nodes(Table *table, const PsRequestMessage &request,
PsResponseMessage &response, brpc::Controller *cntl);
int32_t add_graph_node(Table *table, const PsRequestMessage &request,
Expand Down
13 changes: 13 additions & 0 deletions paddle/fluid/distributed/service/graph_py_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,19 @@ std::vector<std::vector<std::string>> GraphPyClient::get_node_feat(
return v;
}

void GraphPyClient::set_node_feat(
std::string node_type, std::vector<uint64_t> node_ids,
std::vector<std::string> feature_names,
const std::vector<std::vector<std::string>> features) {
if (this->table_id_map.count(node_type)) {
uint32_t table_id = this->table_id_map[node_type];
auto status =
worker_ptr->set_node_feat(table_id, node_ids, feature_names, features);
status.wait();
}
return;
}

std::vector<FeatureNode> GraphPyClient::pull_graph_list(std::string name,
int server_index,
int start, int size,
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/service/graph_py_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class GraphPyClient : public GraphPyService {
std::vector<std::vector<std::string>> get_node_feat(
std::string node_type, std::vector<uint64_t> node_ids,
std::vector<std::string> feature_names);
void set_node_feat(std::string node_type, std::vector<uint64_t> node_ids,
std::vector<std::string> feature_names,
const std::vector<std::vector<std::string>> features);
std::vector<FeatureNode> pull_graph_list(std::string name, int server_index,
int start, int size, int step = 1);
::paddle::distributed::PSParameter GetWorkerProto();
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/service/sendrecv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ enum PsCmdID {
PS_GRAPH_CLEAR = 34;
PS_GRAPH_ADD_GRAPH_NODE = 35;
PS_GRAPH_REMOVE_GRAPH_NODE = 36;
PS_GRAPH_SET_NODE_FEAT = 37;
}

message PsRequestMessage {
Expand Down
28 changes: 28 additions & 0 deletions paddle/fluid/distributed/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,34 @@ int32_t GraphTable::get_node_feat(const std::vector<uint64_t> &node_ids,
return 0;
}

int32_t GraphTable::set_node_feat(
const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
const std::vector<std::vector<std::string>> &res) {
size_t node_num = node_ids.size();
std::vector<std::future<int>> tasks;
for (size_t idx = 0; idx < node_num; ++idx) {
uint64_t node_id = node_ids[idx];
tasks.push_back(_shards_task_pool[get_thread_pool_index(node_id)]->enqueue(
[&, idx, node_id]() -> int {
size_t index = node_id % this->shard_num - this->shard_start;
auto node = shards[index].add_feature_node(node_id);
node->set_feature_size(this->feat_name.size());
for (int feat_idx = 0; feat_idx < feature_names.size(); ++feat_idx) {
const std::string &feature_name = feature_names[feat_idx];
if (feat_id_map.find(feature_name) != feat_id_map.end()) {
node->set_feature(feat_id_map[feature_name], res[feat_idx][idx]);
}
}
return 0;
}));
}
for (size_t idx = 0; idx < node_num; ++idx) {
tasks[idx].get();
}
return 0;
}

std::pair<int32_t, std::string> GraphTable::parse_feature(
std::string feat_str) {
// Return (feat_id, btyes) if name are in this->feat_name, else return (-1,
Expand Down
6 changes: 6 additions & 0 deletions paddle/fluid/distributed/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class GraphShard {
}
return res;
}

GraphNode *add_graph_node(uint64_t id);
FeatureNode *add_feature_node(uint64_t id);
Node *find_node(uint64_t id);
Expand Down Expand Up @@ -122,6 +123,11 @@ class GraphTable : public SparseTable {
const std::vector<std::string> &feature_names,
std::vector<std::vector<std::string>> &res);

virtual int32_t set_node_feat(
const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
const std::vector<std::vector<std::string>> &res);

protected:
std::vector<GraphShard> shards;
size_t shard_start, shard_end, server_num, shard_num_per_table, shard_num;
Expand Down
11 changes: 11 additions & 0 deletions paddle/fluid/distributed/test/graph_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,17 @@ void RunBrpcPushSparse() {
VLOG(0) << "get_node_feat: " << node_feat[1][0];
VLOG(0) << "get_node_feat: " << node_feat[1][1];

node_feat[1][0] = "helloworld";

client1.set_node_feat(std::string("user"), node_ids, feature_names,
node_feat);

// sleep(5);
node_feat =
client1.get_node_feat(std::string("user"), node_ids, feature_names);
VLOG(0) << "get_node_feat: " << node_feat[1][0];
ASSERT_TRUE(node_feat[1][0] == "helloworld");

// Test string
node_ids.clear();
node_ids.push_back(37);
Expand Down
12 changes: 11 additions & 1 deletion paddle/fluid/extension/include/ext_tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,20 @@ class PD_DLL_DECL Tensor {
/// It's usually used to set the input tensor data.
/// \param PlaceType of target place, of which
/// the tensor will copy to.

template <typename T>
Tensor copy_to(const PlaceType& place) const;

/// \brief Return a sub-tensor of the given tensor.
/// It is usually used to extract a sub-tensor (which supports
/// modifying the data of the original tensor) to perform further
/// operations.
/// \param begin_idx The index of the start row (inclusive) to slice.
/// The index number begins from 0.
/// \param end_idx The index of the end row (exclusive) to slice.
/// The index number begins from begin_idx + 1.
/// \return The sliced tensor.
Tensor slice(const int64_t begin_idx, const int64_t end_idx) const;

/// \brief Return the shape of the Tensor.
std::vector<int64_t> shape() const;

Expand Down
25 changes: 25 additions & 0 deletions paddle/fluid/extension/src/ext_tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ void DeviceCopy(T *src, T *dst, PlaceType src_plc, PlaceType dst_plc,
} \
auto *tensor = static_cast<framework::LoDTensor *>(tensor_.get());

#define GET_INNER_PLACE \
platform::Place place; \
switch (place_) { \
case PlaceType::kCPU: \
place = platform::CPUPlace(); \
break; \
case PlaceType::kGPU: \
place = platform::CUDAPlace(); \
break; \
default: \
PADDLE_THROW(platform::errors::Unavailable( \
"Custom operator unsupported place id(%d)", \
static_cast<int>(place_))); \
}

void Tensor::reshape(const std::vector<int64_t> &shape) {
GET_CASTED_TENSOR
auto new_dim = framework::make_ddim(shape);
Expand Down Expand Up @@ -257,6 +272,16 @@ Tensor Tensor::copy_to(const PlaceType &target_place) const {
return target;
}

Tensor Tensor::slice(const int64_t begin_idx, const int64_t end_idx) const {
GET_CASTED_TENSOR
GET_INNER_PLACE
framework::Tensor intermediate = tensor->Slice(begin_idx, end_idx);
Tensor target = Tensor(place_);
framework::CustomTensorUtils::ShareDataFrom(
static_cast<const void *>(&intermediate), target);
return target;
}

template PD_DLL_DECL Tensor
Tensor::copy_to<float>(const PlaceType &target_place) const;
template PD_DLL_DECL Tensor
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/extension/src/ext_tensor.cu
Loading