Skip to content
Merged
72 changes: 66 additions & 6 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,7 @@ MemObjRecord *Scheduler::GraphBuilder::getOrInsertMemObjRecord(

void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp) {

const bool ReadOnlyReq = AccessMode == access::mode::read;
if (ReadOnlyReq)
return;

for (Command *Cmd : Cmds) {
bool WasLeaf = Cmd->MLeafCounter > 0;
Cmd->MLeafCounter -= Record->MReadLeaves.remove(Cmd);
Expand All @@ -252,6 +246,18 @@ void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
}
}

void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp) {

const bool ReadOnlyReq = AccessMode == access::mode::read;
if (ReadOnlyReq)
return;

updateLeaves(Cmds, Record, ToCleanUp);
}

void Scheduler::GraphBuilder::addNodeToLeaves(
MemObjRecord *Record, Command *Cmd, access::mode AccessMode,
std::vector<Command *> &ToEnqueue) {
Expand Down Expand Up @@ -1253,6 +1259,60 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands(
handleVisitedNodes(MVisitedCmds);
}

void Scheduler::GraphBuilder::cleanupFailedCommand(
Command *FailedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>>
&StreamsToDeallocate,
std::vector<Command *> &ToCleanUp) {

// If the failed command has no users and no dependencies, there is no reason
// to replace it with an empty command.
if (FailedCmd->MDeps.size() == 0 && FailedCmd->MUsers.size() == 0)
return;

// Create empty command that is "ready" for enqueuing.
EmptyCommand *EmptyCmd = new EmptyCommand(FailedCmd->getQueue());
if (!EmptyCmd)
throw runtime_error("Out of host memory", PI_OUT_OF_HOST_MEMORY);
EmptyCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady;

// Collect stream objects for the failed command.
if (FailedCmd->getType() == Command::CommandType::RUN_CG) {
auto ExecCmd = static_cast<ExecCGCommand *>(FailedCmd);
std::vector<std::shared_ptr<stream_impl>> Streams = ExecCmd->getStreams();
ExecCmd->clearStreams();
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
Streams.end());
Comment on lines +1280 to +1285
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do similar handling for reduction resources here once #5653 lands?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, absolutely! I will make sure it is included into whichever is merged last.

}

for (DepDesc &Dep : FailedCmd->MDeps) {
// Replace failed command in dependency records.
const Requirement *Req = Dep.MDepRequirement;
MemObjRecord *Record = getMemObjRecord(Req->MSYCLMemObj);
updateLeaves({FailedCmd}, Record, ToCleanUp);
std::vector<Command *> ToEnqueue;
addNodeToLeaves(Record, EmptyCmd, Req->MAccessMode, ToEnqueue);
assert(ToEnqueue.empty());

// Replace failed command as a user.
if (Dep.MDepCommand->MUsers.erase(FailedCmd)) {
Dep.MDepCommand->MUsers.insert(EmptyCmd);
EmptyCmd->MDeps.push_back(Dep);
}
}
FailedCmd->MDeps.clear();

for (Command *UserCmd : FailedCmd->MUsers)
for (DepDesc &Dep : UserCmd->MDeps)
if (Dep.MDepCommand == FailedCmd)
Dep.MDepCommand = EmptyCmd;
std::swap(FailedCmd->MUsers, EmptyCmd->MUsers);

FailedCmd->getEvent()->setCommand(EmptyCmd);
assert(FailedCmd->MLeafCounter == 0);
delete FailedCmd;
}

void Scheduler::GraphBuilder::removeRecordForMemObj(SYCLMemObjI *MemObject) {
const auto It = std::find_if(
MMemObjs.begin(), MMemObjs.end(),
Expand Down
1 change: 1 addition & 0 deletions sycl/source/detail/scheduler/leaves_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ static inline bool doOverlap(const Requirement *LHS, const Requirement *RHS) {

static inline bool isHostAccessorCmd(Command *Cmd) {
return Cmd->getType() == Command::EMPTY_TASK &&
Cmd->MEnqueueStatus == EnqueueResultT::SyclEnqueueBlocked &&
Cmd->MBlockReason == Command::BlockReason::HostAccessor;
}

Expand Down
77 changes: 35 additions & 42 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record,
}
}

static void deallocateStreams(
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
QueueImplPtr Queue) {
EventImplPtr NewEvent = nullptr;
Expand Down Expand Up @@ -111,58 +121,51 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
}

std::vector<Command *> ToCleanUp;
{
try {
ReadLockT Lock(MGraphLock);

Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());

EnqueueResultT Res;
bool Enqueued;

auto CleanUp = [&]() {
if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) {
if (Type == CG::RunOnHostIntel)
static_cast<ExecCGCommand *>(NewCmd)->releaseCG();

NewEvent->setCommand(nullptr);
delete NewCmd;
}
};

for (Command *Cmd : AuxiliaryCmds) {
Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
try {
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Auxiliary enqueue process failed.",
PI_INVALID_OPERATION);
} catch (...) {
// enqueueCommand() func and if statement above may throw an exception,
// so destroy required resources to avoid memory leak
CleanUp();
std::rethrow_exception(std::current_exception());
}
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Auxiliary enqueue process failed.",
PI_INVALID_OPERATION);
}

if (NewCmd) {
// TODO: Check if lazy mode.
EnqueueResultT Res;
try {
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION);
} catch (...) {
// enqueueCommand() func and if statement above may throw an exception,
// so destroy required resources to avoid memory leak
CleanUp();
std::rethrow_exception(std::current_exception());
}
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION);

// If there are no memory dependencies decouple and free the command.
// Though, dismiss ownership of native kernel command group as it's
// resources may be in use by backend and synchronization point here is
// at native kernel execution finish.
CleanUp();
if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) {
if (Type == CG::RunOnHostIntel)
static_cast<ExecCGCommand *>(NewCmd)->releaseCG();

NewEvent->setCommand(nullptr);
delete NewCmd;
}
}
} catch (...) {
std::vector<StreamImplPtr> StreamsToDeallocate;
Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());
if (NewCmd) {
WriteLockT Lock(MGraphLock, std::defer_lock);
MGraphBuilder.cleanupFailedCommand(NewCmd, StreamsToDeallocate,
ToCleanUp);
}
deallocateStreams(StreamsToDeallocate);
cleanupCommands(ToCleanUp);
std::rethrow_exception(std::current_exception());
}
cleanupCommands(ToCleanUp);

Expand Down Expand Up @@ -223,16 +226,6 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
cleanupCommands(ToCleanUp);
}

static void deallocateStreams(
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

void Scheduler::cleanupFinishedCommands(EventImplPtr FinishedEvent) {
// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
Expand Down
9 changes: 9 additions & 0 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,13 @@ class Scheduler {
Command *FinishedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);

/// Replaces a failed command in the subgraph with an empty command and
/// deletes the failed command.
void cleanupFailedCommand(
Command *FailedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &,
std::vector<Command *> &ToCleanUp);

/// Reschedules the command passed using Queue provided.
///
/// This can lead to rescheduling of all dependent commands. This can be
Expand Down Expand Up @@ -551,6 +558,8 @@ class Scheduler {
std::vector<Command *> &ToEnqueue);

/// Removes commands from leaves.
void updateLeaves(const std::set<Command *> &Cmds, MemObjRecord *Record,
std::vector<Command *> &ToCleanUp);
void updateLeaves(const std::set<Command *> &Cmds, MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp);
Expand Down
Loading