Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/threadpool_config.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/os_info.h"
Expand Down Expand Up @@ -47,9 +48,6 @@ constexpr const char* kTaskCompletion = "TaskCompletion";

namespace paddle {
namespace framework {
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
static constexpr size_t kDeviceNumThreads = 1;

InterpreterCore::InterpreterCore(const platform::Place& place,
const BlockDesc& block,
Expand Down Expand Up @@ -293,8 +291,14 @@ bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {

std::shared_ptr<interpreter::AsyncWorkQueue> InterpreterCore::GetWorkQueue() {
if (async_work_queue_ == nullptr) {
async_work_queue_ = std::make_shared<interpreter::AsyncWorkQueue>(
kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_);
int host_num_threads = 1, deivce_num_threads = 1, prepare_num_threads = 1;
std::tie(host_num_threads, deivce_num_threads, prepare_num_threads) =
interpreter::GetThreadPoolConfig(place_, vec_instruction_.size());
async_work_queue_ =
std::make_shared<interpreter::AsyncWorkQueue>(host_num_threads,
deivce_num_threads,
prepare_num_threads,
&main_thread_blocker_);
}
return async_work_queue_;
}
Expand Down Expand Up @@ -773,14 +777,23 @@ void InterpreterCore::ExecuteInstructionList(

platform::RecordEvent record_prepare(
"PrepareAtomic", platform::TracerEventType::UserDefined, 1);
// NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
// those for the next step
auto atomic_deps = atomic_deps_.get();
auto atomic_var_ref = atomic_var_ref_.get();

atomic_deps_ = async_work_queue_->PrepareAtomicDeps(dependecy_count_);
atomic_var_ref_ =
async_work_queue_->PrepareAtomicVarRef(var_scope_.VecMetaInfo());

std::unique_ptr<std::vector<std::atomic<size_t>>> atomic_deps = nullptr;
std::unique_ptr<std::vector<std::atomic<size_t>>> atomic_var_ref = nullptr;

if (async_work_queue_->QueueNumThreads(kPrepareWorkQueueIdx)) {
// NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
// those for the next step
atomic_deps = atomic_deps_.get();
atomic_var_ref = atomic_var_ref_.get();

atomic_deps_ = async_work_queue_->PrepareAtomicDeps(dependecy_count_);
atomic_var_ref_ =
async_work_queue_->PrepareAtomicVarRef(var_scope_.VecMetaInfo());
} else {
atomic_deps = interpreter::PrepareAtomicDeps(dependecy_count_);
atomic_var_ref = interpreter::PrepareAtomicVarRef(var_scope_.VecMetaInfo());
}
record_prepare.End();

exception_holder_.Clear();
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class InterpreterCore {

std::vector<Instruction> vec_instruction_; // deconstruct before OpFuncNode

// last_live_ops_[i] contains the id of operatos that last access var[i]
// last_live_ops_[i] contains the id of operators that last access var[i]
std::map<size_t, std::set<size_t>> last_live_ops_;

std::vector<size_t> dependecy_count_;
Expand Down
13 changes: 8 additions & 5 deletions paddle/fluid/framework/new_executor/interpretercore_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ namespace framework {
namespace interpreter {

using VariableIdMap = std::map<std::string, std::vector<int>>;
constexpr size_t kPrepareWorkQueueIdx = 2;

const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(
size_t host_num_threads, size_t device_num_threads, EventsWaiter* waiter) {
size_t host_num_threads,
size_t device_num_threads,
size_t prepare_num_threads,
EventsWaiter* waiter) {
std::vector<WorkQueueOptions> group_options;
// for execute host Kernel
group_options.emplace_back(/*name*/ "HostTasks",
Expand All @@ -65,7 +67,7 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(
/*events_waiter*/ waiter);
// for prepare deps and others
group_options.emplace_back(/*name*/ "Prepare",
/*num_threads*/ 1,
/*num_threads*/ prepare_num_threads,
/*allow_spinning*/ true,
/*always_spinning*/ false,
/*track_task*/ false,
Expand All @@ -76,10 +78,11 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(

AsyncWorkQueue::AsyncWorkQueue(size_t host_num_threads,
size_t device_num_threads,
size_t prepare_num_threads,
EventsWaiter* waiter)
: host_num_thread_(host_num_threads) {
queue_group_ = CreateWorkQueueGroup(
ConstructWorkQueueOptions(host_num_threads, device_num_threads, waiter));
queue_group_ = CreateWorkQueueGroup(ConstructWorkQueueOptions(
host_num_threads, device_num_threads, prepare_num_threads, waiter));
}

void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type,
Expand Down
6 changes: 6 additions & 0 deletions paddle/fluid/framework/new_executor/interpretercore_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "paddle/fluid/platform/init.h"

using AtomicVectorSizeT = std::vector<std::atomic<size_t>>;
constexpr size_t kPrepareWorkQueueIdx = 2;

namespace paddle {
namespace framework {
Expand All @@ -48,6 +49,7 @@ class AsyncWorkQueue {
public:
AsyncWorkQueue(size_t host_num_threads,
size_t deivce_num_threads,
size_t prepare_num_threads,
EventsWaiter* waiter);

std::future<std::unique_ptr<AtomicVectorSizeT>> PrepareAtomicDeps(
Expand All @@ -61,6 +63,10 @@ class AsyncWorkQueue {

void Cancel() { queue_group_->Cancel(); }

size_t QueueNumThreads(size_t idx) {
return queue_group_->QueueNumThreads(idx);
}

private:
size_t host_num_thread_;
std::unique_ptr<WorkQueueGroup> queue_group_;
Expand Down
136 changes: 136 additions & 0 deletions paddle/fluid/framework/new_executor/threadpool_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <thread>
#include "paddle/fluid/platform/device/ipu/ipu_info.h"
#include "paddle/fluid/platform/device/npu/npu_info.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/backends/device_manager.h"
#include "paddle/phi/backends/gpu/gpu_info.h"
#include "paddle/phi/backends/xpu/xpu_info.h"

DECLARE_bool(new_executor_serial_run);

namespace paddle {
namespace framework {
namespace interpreter {

static constexpr size_t kHostNumThreads = 4;
static constexpr size_t kDeviceNumThreads = 1;
static constexpr size_t kNumGcThreads = 1;
static constexpr size_t kNumPrepareThreads = 0;

static constexpr size_t kMinOpNumForAsyncPrepare = 1000;

// By default, one interpretercore contains:
// 1-size thread pool for device kernel launch (or 0 for cpu execution),
// 1-size thread pool for host kernel launch (or more if the system contains
// enough processors).

// And it may contain:
// 1-size thread pool for gc if it is can not use FastGC,
// 1-size thread pool for preparation if the program contains two many ops
// (1000+).

// Note that the purpose of the config is to limit the total 'possible'
// threads introduced by interpretercore to avoid hurting performance.

inline std::tuple<int, int, int> GetThreadPoolConfig(const phi::Place place,
size_t op_num) {
int num_device_threads = kDeviceNumThreads,
num_host_threads = kHostNumThreads,
num_prepare_threads = kNumPrepareThreads;

if (op_num > kMinOpNumForAsyncPrepare) {
num_prepare_threads = 1;
}

int device_count = 0, processor_count = 0;
if (platform::is_cpu_place(place)) {
num_device_threads = 0;
num_host_threads = 4;
} else {
processor_count = std::thread::hardware_concurrency();
if (processor_count) {
if (platform::is_gpu_place(place)) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
device_count = phi::backends::gpu::GetGPUDeviceCount();
#endif
}
if (platform::is_xpu_place(place)) {
#if defined(PADDLE_WITH_XPU)
device_count = phi::backends::xpu::GetXPUDeviceCount();
#endif
}
if (platform::is_npu_place(place)) {
#if defined(PADDLE_WITH_ASCEND_CL)
device_count = platform::GetNPUDeviceCount();
#endif
}
if (platform::is_ipu_place(place)) {
#if defined(PADDLE_WITH_IPU)
device_count = platform::GetIPUDeviceCount();
#endif
}
if (platform::is_custom_place(place)) {
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
device_count =
phi::DeviceManager::GetDeviceCount(place.GetDeviceType());
#endif
}

// Tricky implementation.
// In multi-card training, each card may set env like
// CUDA_VISIBLE_DEVICE=0 In that case, device_count is set to 8.
if (device_count == 1) {
device_count = 8; // in many case, the accelerator has 8 cards.
}

// We expect processor_count = 2 * (the possible total threads when doing
// multi-card training), to make sure that the system will not slow down
// because of too many threads. Here, 2 is experience value. Since each
// device has one interpretercore, the possible total threads when doing
// multi-card training = device_count * (the possible total threads in one
// interpretercore).

if (device_count) {
auto num = processor_count / device_count / 2 -
(kNumGcThreads + kNumPrepareThreads + num_device_threads);
num_host_threads =
num > 0 ? (num > kHostNumThreads ? kHostNumThreads : num) : 1;
}
}
}

// In serial run, only one 1-size thread pool is used
if (FLAGS_new_executor_serial_run) {
num_host_threads = 0;
num_device_threads = 1;
}

VLOG(4) << "place:" << place << ", processor_count:" << processor_count
<< ", device_count:" << device_count
<< ", serial_run:" << FLAGS_new_executor_serial_run
<< ", num_host_threads:" << num_host_threads
<< ", num_device_threads:" << num_device_threads
<< ", num_prepare_threads:" << num_prepare_threads;
return std::make_tuple(
num_host_threads, num_device_threads, num_prepare_threads);
}

} // namespace interpreter
} // namespace framework
} // namespace paddle
24 changes: 21 additions & 3 deletions paddle/fluid/framework/new_executor/workqueue/workqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
queues_.resize(num_queues);
void* buffer = malloc(sizeof(NonblockingThreadPool) * num_queues);
queues_storage_ = reinterpret_cast<NonblockingThreadPool*>(buffer);

for (size_t idx = 0; idx < num_queues; ++idx) {
const auto& options = queues_options_[idx];
if (options.num_threads == 0) {
queues_[idx] = nullptr;
continue;
}
if (options.track_task && tracker_ == nullptr &&
options.events_waiter != nullptr) {
empty_notifier_ = options.events_waiter->RegisterEvent(kQueueEmptyEvent);
Expand All @@ -144,7 +149,9 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(

WorkQueueGroupImpl::~WorkQueueGroupImpl() {
for (auto queue : queues_) {
queue->~NonblockingThreadPool();
if (queue) {
queue->~NonblockingThreadPool();
}
}
if (tracker_ != nullptr) {
tracker_->~TaskTracker();
Expand All @@ -161,6 +168,10 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
platform::TracerEventType::UserDefined,
10 /*level*/);
assert(queue_idx < queues_.size());
PADDLE_ENFORCE_NOT_NULL(
queues_.at(queue_idx),
platform::errors::NotFound("Workqueue of index %d is not initialized.",
queue_idx));
if (queues_options_.at(queue_idx).track_task) {
fn = [task = std::move(fn),
raii = CounterGuard<TaskTracker>(tracker_)]() mutable { task(); };
Expand All @@ -170,6 +181,9 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {

size_t WorkQueueGroupImpl::QueueNumThreads(size_t queue_idx) const {
assert(queue_idx < queues_.size());
if (!queues_.at(queue_idx)) {
return 0;
}
return queues_.at(queue_idx)->NumThreads();
}

Expand All @@ -183,10 +197,14 @@ size_t WorkQueueGroupImpl::QueueGroupNumThreads() const {

void WorkQueueGroupImpl::Cancel() {
for (auto queue : queues_) {
queue->Cancel();
if (queue) {
queue->Cancel();
}
}
for (auto queue : queues_) {
queue->WaitThreadsExit();
if (queue) {
queue->WaitThreadsExit();
}
}
}

Expand Down