Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
228eb89
cinn_launch_op: skip checking input variables must be used (#37119)
CtfGo Nov 13, 2021
895692e
[PTen]Reshape Kernel Refactor (#37164)
YuanRisheng Nov 14, 2021
31cd914
[heterps]bug fix for local training with --heter_worker_num (#37166)
zmxdream Nov 15, 2021
6b0cc2b
modify sparse_attention docs, test=document_fix (#36554)
Liu-xiandong Nov 15, 2021
444a735
Optimize Matmul_v2 (#37037)
linjieccc Nov 15, 2021
10cc040
add fetch op for cinn graph output node of build_cinn_pass (#37172)
thisjiang Nov 15, 2021
f2a56c6
fix bug of indexing with ellipsis (#37182)
zyfncg Nov 15, 2021
b44db69
graph-engine cache optimization (#37168)
seemingwang Nov 15, 2021
12339fa
Add distributed pass framework: including PassBase/PassTest/PassUtils…
sneaxiy Nov 15, 2021
84b0ec9
Accessor 20211112 2 (#37181)
zhaocaibei123 Nov 15, 2021
59fdf4d
[New features] Add elementwise_mul triple grad kernel (#37152)
veyron95 Nov 15, 2021
83eef6d
fix cinn_compile_test not pass problem (#37190)
thisjiang Nov 15, 2021
df7cc45
Added BF16 to mean op (#37104)
arlesniak Nov 15, 2021
b628c31
fix:delete macro INFERENCE (#37130)
Nov 15, 2021
8358d61
fix 3 bug of new_executor (#37142)
wanghuancoder Nov 15, 2021
cf958f2
fix ctest depent probs (#37203)
FeixLiu Nov 15, 2021
6b21bb0
remove input dim check in op_teller and update ut (#37097)
baoachun Nov 15, 2021
9c59170
remove needless declare (#37195)
chenwhql Nov 15, 2021
584b4b2
[new-exec] fix stream analysis (#37161)
zhiqiu Nov 15, 2021
1e598f1
[Pten] Refactor the implementation of custom operator (#37122)
chenwhql Nov 15, 2021
f49c2c2
[fleet_executor] Add sync method (#37167)
FeixLiu Nov 16, 2021
c5ccff7
supports the slice of upper tensor, test=develop (#37215)
Shixiaowei02 Nov 16, 2021
a9e7a85
Fix attn_bias_add bug. (#37147)
limin2021 Nov 16, 2021
ae40ee3
added onednn elu kernel (#37149)
jakpiase Nov 16, 2021
5091fed
modify long time ut list (#37220)
betterpig Nov 16, 2021
ea47d21
Make FLAGS_determinstic effective in conv2d forward. (#37173)
Xreki Nov 16, 2021
a01e27c
Make Distributed Pass UT Timeout Smaller (#37199)
sneaxiy Nov 16, 2021
56810f4
test=document_fix (#37234)
tianshuo78520a Nov 16, 2021
6ebc318
for pure fp16 (#37230)
Nov 16, 2021
79b49c2
Add API and unit test for reshape (#37232)
YuanRisheng Nov 16, 2021
70b7c7e
Removed unnecessary ENFORCE statement (#37219)
veyron95 Nov 16, 2021
4c160be
refine pass by removing CommOpt, CalcOpt, ParallelOpt (#37206)
sneaxiy Nov 16, 2021
f29a3c6
Fix the logic of VarBase _to func (#37193)
veyron95 Nov 16, 2021
62ec644
[psgpu]fix pipe bug:save and pull overlap; test=develop (#37233)
danleifeng Nov 16, 2021
f95d44a
Added BF16 Pool2d grad (#37081)
arlesniak Nov 16, 2021
d8982c5
decrease pten log level (#37239)
chenwhql Nov 16, 2021
d943459
Dependence analysis (#37231)
2742195759 Nov 17, 2021
5237cc0
[Einsum] correct output dimension errors. (#37222)
tongxin Nov 17, 2021
762819a
[npu][hybrid] support offload (#37224)
wangxicoding Nov 17, 2021
0daa69d
[Fleet Executor] Construct runtime graph (#37158)
LiYuRio Nov 17, 2021
1e9b3a3
rename TensorBase interface data_type() to dtype() (#37257)
zyfncg Nov 17, 2021
5e4b419
copy beta pow to same place when skip_update=1 (#37245)
zhiqiu Nov 17, 2021
1223238
add ut parallel (#37211)
lelelelelez Nov 17, 2021
0057c12
fix compile error when pslib use cpu branch;test=develop (#37248)
danleifeng Nov 17, 2021
54d2626
[heterps]Refactor heterogenous worker (#37244)
zmxdream Nov 17, 2021
ca8c4f3
update dataset (#37194)
zhaocaibei123 Nov 17, 2021
06847bc
optimize the data structure from c++ to python to speed up sampling i…
Liwb5 Nov 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions cmake/inference_lib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,36 @@ copy(inference_lib_dist
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/crypto/)
include_directories(${CMAKE_BINARY_DIR}/../paddle/fluid/framework/io)

# TODO(chenweihang, before 11.27) Here, the header file of pten is copied to
# the experimental directory, the include path needs to be changed, so the
# header file path needs to be processed here
# copy api headers for custom op
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/fluid/extension/include/*
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/)
SRCS ${PADDLE_SOURCE_DIR}/paddle/pten/api/ext/*
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/api/ext/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/pten/api/include/*
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/api/include/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/pten/api/all.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/api/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/pten/common/*
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/fluid/platform/complex.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/)
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/fluid/platform/float16.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/)
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/utils/any.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/)
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/utils/)
# In order to be compatible with the original behavior, the header file name needs to be changed
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/extension.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/ext_all.h)


# CAPI inference library for only inference
set(PADDLE_INFERENCE_C_INSTALL_DIR "${CMAKE_BINARY_DIR}/paddle_inference_c_install_dir" CACHE STRING
Expand Down
2 changes: 1 addition & 1 deletion paddle/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ limitations under the License. */
#pragma once

// All paddle apis in C++ frontend
#include "paddle/extension/include/ext_all.h"
#include "paddle/pten/api/all.h"
5 changes: 5 additions & 0 deletions paddle/fluid/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Adapt to custom op mechanism: Include the header files related to the data type
# to avoid exposing the path of the underlying file, remove it after moving
# float16.h/complex.h/bfloat16.h into pten
include_directories(${PADDLE_SOURCE_DIR}/paddle/fluid/platform)

add_subdirectory(memory)
add_subdirectory(platform)
add_subdirectory(distributed)
Expand Down
191 changes: 172 additions & 19 deletions paddle/fluid/distributed/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,15 @@ uint64_t FleetWrapper::RunServer(const std::string& ip, uint32_t port) {

std::vector<uint64_t> FleetWrapper::GetClientsInfo() {
VLOG(3) << "Going to get client info";
return pserver_ptr_->get_client_info();
return std::vector<uint64_t>();
auto* communicator = Communicator::GetInstance();
std::vector<uint64_t> res = communicator->GetClientInfo();
return res;
}

void FleetWrapper::CreateClient2ClientConnection() {
VLOG(3) << "Going to create client2client connection";
pserver_ptr_->create_client2client_connection(
VLOG(1) << "Going to create client2client connection";
auto* communicator = Communicator::GetInstance();
communicator->_worker_ptr->create_client2client_connection(
client2client_request_timeout_ms_, client2client_connect_timeout_ms_,
client2client_max_retry_);
}
Expand Down Expand Up @@ -370,12 +372,26 @@ void FleetWrapper::PushDenseVarsAsync(
const std::vector<std::string>& var_names,
std::vector<std::future<int32_t>>* push_sparse_status, float scale_datanorm,
int batch_size) {
auto* communicator = Communicator::GetInstance();
PADDLE_ENFORCE_EQ(
communicator->Check(table_id), true,
platform::errors::InvalidArgument(
"can not find table: %s, please check your config", table_id));
communicator->Send(var_names, scope);
auto place = platform::CPUPlace();
std::vector<paddle::distributed::Region> regions;
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* g = tensor->mutable_data<float>(place);
paddle::distributed::Region reg(g, tensor->numel());
regions.emplace_back(std::move(reg));
VLOG(3) << "FleetWrapper::PushDenseVarsAsync Var " << t << " talbe_id "
<< table_id << " Temp_data[0] " << g[0] << " Temp_data[-1] "
<< g[tensor->numel() - 1];
}

auto* communicator =
dynamic_cast<AsyncCommunicator*>(Communicator::GetInstance());
auto push_status = communicator->_worker_ptr->push_dense(
regions.data(), regions.size(), table_id);

communicator->PushDensePostProcessing();
}

void FleetWrapper::PushSparseVarsAsync(
Expand Down Expand Up @@ -417,10 +433,140 @@ void FleetWrapper::PushSparseFromTensorWithLabelAsync(
return;
}

void FleetWrapper::LoadModel(const std::string& path, const std::string& mode) {
void FleetWrapper::PushSparseFromTensorAsync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, std::vector<const LoDTensor*>* inputs,
const LoDTensor* shows, const LoDTensor* clks,
std::vector<LoDTensor*>* outputs) {
int batch_size = -1;
for (auto* input : *inputs) {
int cur_batch_size =
input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0];
if (batch_size == -1) {
batch_size = cur_batch_size;
} else {
CHECK(batch_size == cur_batch_size); // NOLINT
}
}
CHECK(batch_size > 0); // NOLINT

int show_size =
shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];
CHECK(show_size == batch_size || show_size == 1);
int clk_size =
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);

std::vector<float> g;
for (framework::LoDTensor* g_tensor : *outputs) {
float* g_ori = g_tensor->data<float>();
// no cvm
if (true) { // TODO(zhaocaibei123): add config
// scale_sparse_gradient_with_batch_size_
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
g_mat(g_ori, g_tensor->numel() / fea_dim, fea_dim);
g_mat.rightCols(fea_dim) *= batch_size;
}

size_t origin = g.size();
size_t add = g_tensor->numel();
g.resize(origin + add);

memcpy(g.data() + origin, g_tensor->data<float>(), add * sizeof(float));
}

std::vector<uint64_t> push_keys;
push_keys.reserve(MAX_FEASIGN_NUM / 100);
std::vector<std::vector<float>> push_values;
push_values.reserve(MAX_FEASIGN_NUM / 100);
size_t output_len = 0;
size_t input_idx = 0;

VLOG(2) << "fleet.cc::emb_dim: " << fea_dim;

// TODO(zhaocaibei123): check type of show/clk is int? float? uint64?
// const long int* show_tensor = shows->data<int64_t>();
// const long int* clk_tensor = clks->data<int64_t>();
const int64_t* show_tensor = shows->data<int64_t>();
const int64_t* clk_tensor = clks->data<int64_t>();

for (size_t index = 0; index < inputs->size(); ++index) {
const framework::LoDTensor* tensor = inputs->at(index);
const int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();

if (tensor->lod().size() > 0) {
for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) {
for (int j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1];
++j, output_len += fea_dim) {
uint64_t real_id = static_cast<uint64_t>(ids[j]);
if (real_id == padding_id) {
continue;
}
push_keys.emplace_back(real_id);
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));

float* data = push_values.back().data() + 3;

memcpy(data, g.data() + output_len, sizeof(float) * fea_dim);

++input_idx;
}
}
} else {
for (size_t i = 0; i < len; ++i, output_len += fea_dim) {
uint64_t real_id = static_cast<uint64_t>(ids[i]);
if (real_id == padding_id) {
continue;
}
push_keys.emplace_back(real_id);
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));

float* data = push_values.back().data() + 3;

memcpy(data, g.data() + output_len, sizeof(float) * fea_dim);

++input_idx;
}
}
}
VLOG(1) << "output_len: " << output_len << " g.size(): " << g.size();
CHECK(output_len == g.size());

std::vector<float*> push_g_vec(input_idx, nullptr);

for (auto i = 0u; i < push_keys.size(); ++i) {
push_g_vec[i] = push_values.at(i).data();
}

auto* communicator = Communicator::GetInstance();
PADDLE_ENFORCE_EQ(
communicator->Check(table_id), true,
platform::errors::InvalidArgument(
"can not find table: %s, please check your config", table_id));
auto status = communicator->_worker_ptr->push_sparse(
table_id, push_keys.data(), (const float**)push_g_vec.data(),
push_keys.size());
}

void FleetWrapper::LoadModel(const std::string& path, const int mode) {
auto* communicator = Communicator::GetInstance();
auto ret = communicator->_worker_ptr->load(path, mode);
// auto ret = pserver_ptr_->_worker_ptr->load(path, std::to_string(mode));
auto ret = communicator->_worker_ptr->load(path, std::to_string(mode));
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "load model from path:" << path << " failed";
Expand Down Expand Up @@ -562,16 +708,23 @@ void FleetWrapper::ClientFlush() {

int FleetWrapper::RegisterClientToClientMsgHandler(int msg_type,
MsgHandlerFunc handler) {
VLOG(3) << "calling FleetWrapper::RegisterClientToClientMsgHandler";
VLOG(3) << "pserver_ptr_=" << pserver_ptr_;
VLOG(3) << "_worker_ptr=" << pserver_ptr_->_worker_ptr;
return pserver_ptr_->_worker_ptr->registe_client2client_msg_handler(msg_type,
handler);
VLOG(1) << "calling FleetWrapper::RegisterClientToClientMsgHandler";
auto* communicator = Communicator::GetInstance();
// for unittest which does not call fleet.init_worker() first
if (communicator == nullptr) {
VLOG(0) << "FleetWrapper::RegisterClientToClientMsgHandler communicator is "
"null";
return -1;
} else {
return communicator->_worker_ptr->registe_client2client_msg_handler(
msg_type, handler);
}
}

std::future<int32_t> FleetWrapper::SendClientToClientMsg(
int msg_type, int to_client_id, const std::string& msg) {
return pserver_ptr_->_worker_ptr->send_client2client_msg(msg_type,
auto* communicator = Communicator::GetInstance();
return communicator->_worker_ptr->send_client2client_msg(msg_type,
to_client_id, msg);
}

Expand Down
9 changes: 7 additions & 2 deletions paddle/fluid/distributed/fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ class FleetWrapper {
const std::vector<std::string>& input_names,
std::vector<const LoDTensor*>* inputs, // NOLINT
std::vector<const LoDTensor*>* outputs); // NOLINT

void PushSparseFromTensorAsync(const uint64_t table_id, int fea_dim,
uint64_t padding_id, platform::Place place,
std::vector<const LoDTensor*>* inputs,
const LoDTensor* shows,
const LoDTensor* clicks,
std::vector<LoDTensor*>* outputs);
// Push sparse variables to server in Async mode
// Param<In>: scope, table_id, fea_keys, sparse_grad_names
// Param<Out>: push_values, push_sparse_status
Expand Down Expand Up @@ -200,7 +205,7 @@ class FleetWrapper {
void PrintTableStat(const uint64_t table_id);
// mode = 0, load all feature
// mode = 1, load delta feature, which means load diff
void LoadModel(const std::string& path, const std::string& mode);
void LoadModel(const std::string& path, const int mode);
// mode = 0, load all feature
// mode = 1, load delta feature, which means load diff
void LoadModelOneTable(const uint64_t table_id, const std::string& path,
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/fleet_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ if(WITH_PYTHON)
endif()
proto_library(interceptor_message_proto SRCS interceptor_message.proto)

if(WITH_DISTRIBUTE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL))
if(WITH_DISTRIBUTE AND WITH_PSCORE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL))
set(BRPC_DEPS brpc ssl crypto protobuf gflags glog zlib leveldb snappy gflags glog)
else()
set(BRPC_DEPS "")
endif()

cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc
cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc
interceptor.cc interceptor_message_service.cc message_bus.cc
DEPS proto_desc fleet_executor_desc_proto interceptor_message_proto ${BRPC_DEPS})

Expand Down
50 changes: 43 additions & 7 deletions paddle/fluid/distributed/fleet_executor/carrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ bool Carrier::EnqueueInterceptorMessage(
// handle control message
return true;
} else {
if (creating_interceptors_) {
// Cannot handle the message to interceptor since interceptors
// are still under creating. Will enqueue into a tmp stack.
VLOG(3) << "Receiving message while creating interceptors.";
message_tmp_.emplace_back(interceptor_message);
return true;
}
int64_t dst_id = interceptor_message.dst_id();
Interceptor* dst_interceptor = GetInterceptor(dst_id);
bool rst =
Expand Down Expand Up @@ -70,16 +77,45 @@ Interceptor* Carrier::SetInterceptor(int64_t interceptor_id,
return ptr;
}

void Carrier::SetCreatingFlag(bool flag) {
// set the creating flag
VLOG(3) << "Carrier is set the creating flag from " << creating_interceptors_
<< " to " << flag << ".";
creating_interceptors_ = flag;
if (!flag) {
// finish create interceptors outside, handle tmp messsages
HandleTmpMessages();
}
}

void Carrier::HandleTmpMessages() {
VLOG(3) << "Carrier has received " << message_tmp_.size()
<< " messages during creating interceptors.";
for (const auto& msg : message_tmp_) {
EnqueueInterceptorMessage(msg);
}
message_tmp_.clear();
}

void Carrier::CreateInterceptors() {
// create each Interceptor
for (const auto& item : interceptor_id_to_node_) {
int64_t interceptor_id = item.first;
TaskNode* task_node = item.second;
if (!interceptor_id_to_node_.empty()) {
// no auto init since there is no config
for (const auto& item : interceptor_id_to_node_) {
int64_t interceptor_id = item.first;
TaskNode* task_node = item.second;

// TODO(wangxi): use node_type to select different Interceptor
auto interceptor = std::make_unique<Interceptor>(interceptor_id, task_node);
SetInterceptor(interceptor_id, std::move(interceptor));
VLOG(3) << "Create Interceptor for " << interceptor_id;
// TODO(wangxi): use node_type to select different Interceptor
auto interceptor =
std::make_unique<Interceptor>(interceptor_id, task_node);
SetInterceptor(interceptor_id, std::move(interceptor));
VLOG(3) << "Create Interceptor with interceptor id: " << interceptor_id
<< ".";
}
// The carrier will be always waiting for outside initializer
// since there is no interceptor has been created during auto init
creating_interceptors_ = false;
HandleTmpMessages();
}
}

Expand Down
Loading