Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,11 @@ const char *Command::getBlockReason() const {
return "A Buffer is locked by the host accessor";
case BlockReason::HostTask:
return "Blocked by host task";
case BlockReason::EnqueueFailed:
return "Failed to enqueue a preceding command";
default:
return "Unknown block reason";
}

return "Unknown block reason";
}

AllocaCommandBase::AllocaCommandBase(CommandType Type, QueueImplPtr Queue,
Expand Down
2 changes: 1 addition & 1 deletion sycl/source/detail/scheduler/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Command {
/// Used for marking the node during graph traversal.
Marks MMarks;

enum class BlockReason : int { HostAccessor = 0, HostTask };
enum class BlockReason : int { HostAccessor = 0, HostTask, EnqueueFailed };

// Only have reasonable value while MIsBlockable is true
BlockReason MBlockReason;
Expand Down
73 changes: 67 additions & 6 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,7 @@ MemObjRecord *Scheduler::GraphBuilder::getOrInsertMemObjRecord(

void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp) {

const bool ReadOnlyReq = AccessMode == access::mode::read;
if (ReadOnlyReq)
return;

for (Command *Cmd : Cmds) {
bool WasLeaf = Cmd->MLeafCounter > 0;
Cmd->MLeafCounter -= Record->MReadLeaves.remove(Cmd);
Expand All @@ -253,6 +247,18 @@ void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
}
}

void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp) {

const bool ReadOnlyReq = AccessMode == access::mode::read;
if (ReadOnlyReq)
return;

updateLeaves(Cmds, Record, ToCleanUp);
}

void Scheduler::GraphBuilder::addNodeToLeaves(
MemObjRecord *Record, Command *Cmd, access::mode AccessMode,
std::vector<Command *> &ToEnqueue) {
Expand Down Expand Up @@ -1279,6 +1285,61 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands(
handleVisitedNodes(MVisitedCmds);
}

void Scheduler::GraphBuilder::cleanupFailedCommand(
Command *FailedCmd,
std::vector<std::shared_ptr<sycl::detail::stream_impl>>
&StreamsToDeallocate,
std::vector<Command *> &ToCleanUp) {

// If the failed command has no users and no dependencies, there is no reason
// to replace it with an empty command.
if (FailedCmd->MDeps.size() == 0 && FailedCmd->MUsers.size() == 0)
return;

// Create empty command that is "ready" for enqueuing.
EmptyCommand *EmptyCmd = new EmptyCommand(FailedCmd->getQueue());
if (!EmptyCmd)
throw runtime_error("Out of host memory", PI_ERROR_OUT_OF_HOST_MEMORY);
EmptyCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady;
EmptyCmd->MBlockReason = Command::BlockReason::EnqueueFailed;

// Collect stream objects for the failed command.
if (FailedCmd->getType() == Command::CommandType::RUN_CG) {
auto ExecCmd = static_cast<ExecCGCommand *>(FailedCmd);
std::vector<std::shared_ptr<stream_impl>> Streams = ExecCmd->getStreams();
ExecCmd->clearStreams();
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
Streams.end());
}

for (DepDesc &Dep : FailedCmd->MDeps) {
// Replace failed command in dependency records.
const Requirement *Req = Dep.MDepRequirement;
MemObjRecord *Record = getMemObjRecord(Req->MSYCLMemObj);
updateLeaves({FailedCmd}, Record, ToCleanUp);
std::vector<Command *> ToEnqueue;
addNodeToLeaves(Record, EmptyCmd, Req->MAccessMode, ToEnqueue);
assert(ToEnqueue.empty());

// Replace failed command as a user.
if (Dep.MDepCommand->MUsers.erase(FailedCmd)) {
Dep.MDepCommand->MUsers.insert(EmptyCmd);
EmptyCmd->MDeps.push_back(Dep);
}
}
FailedCmd->MDeps.clear();

for (Command *UserCmd : FailedCmd->MUsers)
for (DepDesc &Dep : UserCmd->MDeps)
if (Dep.MDepCommand == FailedCmd)
Dep.MDepCommand = EmptyCmd;
std::swap(FailedCmd->MUsers, EmptyCmd->MUsers);

FailedCmd->getEvent()->setCommand(EmptyCmd);
assert(FailedCmd->MLeafCounter == 0);
delete FailedCmd;
}

void Scheduler::GraphBuilder::removeRecordForMemObj(SYCLMemObjI *MemObject) {
const auto It = std::find_if(
MMemObjs.begin(), MMemObjs.end(),
Expand Down
68 changes: 29 additions & 39 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record,
}
}

static void deallocateStreams(
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
const QueueImplPtr &Queue) {
EventImplPtr NewEvent = nullptr;
Expand Down Expand Up @@ -113,50 +123,40 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
}

std::vector<Command *> ToCleanUp;
{
try {
ReadLockT Lock = acquireReadLock();

Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());

EnqueueResultT Res;
bool Enqueued;

auto CleanUp = [&]() {
if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) {
NewEvent->setCommand(nullptr);
delete NewCmd;
}
};

for (Command *Cmd : AuxiliaryCmds) {
Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
try {
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Auxiliary enqueue process failed.",
PI_ERROR_INVALID_OPERATION);
} catch (...) {
// enqueueCommand() func and if statement above may throw an exception,
// so destroy required resources to avoid memory leak
CleanUp();
std::rethrow_exception(std::current_exception());
}
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Auxiliary enqueue process failed.",
PI_ERROR_INVALID_OPERATION);
}

if (NewCmd) {
// TODO: Check if lazy mode.
EnqueueResultT Res;
try {
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Enqueue process failed.",
PI_ERROR_INVALID_OPERATION);
} catch (...) {
// enqueueCommand() func and if statement above may throw an exception,
// so destroy required resources to avoid memory leak
CleanUp();
std::rethrow_exception(std::current_exception());
}
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Enqueue process failed.",
PI_ERROR_INVALID_OPERATION);
}
} catch (...) {
std::vector<StreamImplPtr> StreamsToDeallocate;
Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());
if (NewCmd) {
WriteLockT Lock(MGraphLock, std::defer_lock);
MGraphBuilder.cleanupFailedCommand(NewCmd, StreamsToDeallocate,
ToCleanUp);
}
deallocateStreams(StreamsToDeallocate);
cleanupCommands(ToCleanUp);
std::rethrow_exception(std::current_exception());
}
cleanupCommands(ToCleanUp);

Expand Down Expand Up @@ -218,16 +218,6 @@ void Scheduler::waitForEvent(const EventImplPtr &Event) {
cleanupCommands(ToCleanUp);
}

static void deallocateStreams(
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

void Scheduler::cleanupFinishedCommands(const EventImplPtr &FinishedEvent) {
// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
Expand Down
9 changes: 9 additions & 0 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,13 @@ class Scheduler {
std::vector<std::shared_ptr<sycl::detail::stream_impl>> &,
std::vector<std::shared_ptr<const void>> &);

/// Replaces a failed command in the subgraph with an empty command and
/// deletes the failed command.
void cleanupFailedCommand(
Command *FailedCmd,
std::vector<std::shared_ptr<sycl::detail::stream_impl>> &,
std::vector<Command *> &ToCleanUp);

/// Reschedules the command passed using Queue provided.
///
/// This can lead to rescheduling of all dependent commands. This can be
Expand Down Expand Up @@ -573,6 +580,8 @@ class Scheduler {
std::vector<Command *> &ToEnqueue);

/// Removes commands from leaves.
void updateLeaves(const std::set<Command *> &Cmds, MemObjRecord *Record,
std::vector<Command *> &ToCleanUp);
void updateLeaves(const std::set<Command *> &Cmds, MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp);
Expand Down
108 changes: 108 additions & 0 deletions sycl/unittests/scheduler/FailedCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include "SchedulerTest.hpp"
#include "SchedulerTestUtils.hpp"

#include <helpers/PiImage.hpp>
#include <helpers/PiMock.hpp>
#include <helpers/TestKernel.hpp>

using namespace sycl;

Expand Down Expand Up @@ -42,3 +44,109 @@ TEST_F(SchedulerTest, FailedDependency) {
ASSERT_EQ(MDep.MEnqueueStatus, detail::EnqueueResultT::SyclEnqueueFailed)
<< "MDep should be marked as failed\n";
}

pi_result redefinedFailingEnqueueKernelLaunch(pi_queue, pi_kernel, pi_uint32,
const size_t *, const size_t *,
const size_t *, pi_uint32,
const pi_event *, pi_event *) {
throw sycl::runtime_error(
"Exception from redefinedFailingEnqueueKernelLaunch.",
PI_ERROR_INVALID_OPERATION);
return PI_ERROR_INVALID_OPERATION;
}

size_t MemBufRefCount = 0u;

pi_result redefinedMemBufferCreate(pi_context, pi_mem_flags, size_t, void *,
pi_mem *ret_mem, const pi_mem_properties *) {
*ret_mem = reinterpret_cast<pi_mem>(0x1);
++MemBufRefCount;
return PI_SUCCESS;
}

pi_result redefinedMemBufferPartition(pi_mem, pi_mem_flags,
pi_buffer_create_type, void *,
pi_mem *ret_mem) {
*ret_mem = reinterpret_cast<pi_mem>(0x1);
++MemBufRefCount;
return PI_SUCCESS;
}

pi_result redefinedMemRetain(pi_mem) {
++MemBufRefCount;
return PI_SUCCESS;
}

pi_result redefinedMemRelease(pi_mem) {
--MemBufRefCount;
return PI_SUCCESS;
}

TEST_F(SchedulerTest, FailedCommandAccessorCleanup) {
unittest::PiMock Mock;
platform Plt = Mock.getPlatform();
MemBufRefCount = 0u;
Mock.redefine<detail::PiApiKind::piEnqueueKernelLaunch>(
redefinedFailingEnqueueKernelLaunch);
Mock.redefine<detail::PiApiKind::piMemBufferCreate>(redefinedMemBufferCreate);
Mock.redefine<detail::PiApiKind::piMemRetain>(redefinedMemRetain);
Mock.redefine<detail::PiApiKind::piMemRelease>(redefinedMemRelease);

{
context Ctx{Plt};
queue Q{Ctx, default_selector_v};

kernel_bundle KernelBundle =
sycl::get_kernel_bundle<sycl::bundle_state::input>(Ctx);
auto ExecBundle = sycl::build(KernelBundle);

buffer<int, 1> Buff{sycl::range<1>(1)};

try {
Q.submit([&](sycl::handler &CGH) {
auto Acc = Buff.get_access<sycl::access::mode::read_write>(CGH);
CGH.use_kernel_bundle(ExecBundle);
CGH.single_task<TestKernel<>>([=] {});
});
FAIL() << "No exception was thrown.";
} catch (...) {
}
}

ASSERT_EQ(MemBufRefCount, 0u) << "Memory leak detected.";
}

TEST_F(SchedulerTest, FailedCommandStreamCleanup) {
unittest::PiMock Mock;
platform Plt = Mock.getPlatform();
MemBufRefCount = 0u;
Mock.redefine<detail::PiApiKind::piEnqueueKernelLaunch>(
redefinedFailingEnqueueKernelLaunch);
Mock.redefine<detail::PiApiKind::piMemBufferCreate>(redefinedMemBufferCreate);
Mock.redefine<detail::PiApiKind::piMemBufferPartition>(
redefinedMemBufferPartition);
Mock.redefine<detail::PiApiKind::piMemRetain>(redefinedMemRetain);
Mock.redefine<detail::PiApiKind::piMemRelease>(redefinedMemRelease);

{
context Ctx{Plt};
queue Q{Ctx, default_selector_v};

kernel_bundle KernelBundle =
sycl::get_kernel_bundle<sycl::bundle_state::input>(Ctx);
auto ExecBundle = sycl::build(KernelBundle);

try {
Q.submit([&](sycl::handler &CGH) {
sycl::stream KernelStream(108 * 64 + 128, 64, CGH);
CGH.use_kernel_bundle(ExecBundle);
CGH.single_task<TestKernel<>>([=] {});
});
FAIL() << "No exception was thrown.";
} catch (...) {
}
Q.wait();
}

ASSERT_EQ(MemBufRefCount, 0u) << "Memory leak detected.";
}
1 change: 1 addition & 0 deletions sycl/unittests/scheduler/LeavesCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ createEmptyCommand(const std::shared_ptr<queue_impl> &Q,
EmptyCommand *Cmd = new EmptyCommand(Q);
Cmd->addRequirement(/* DepCmd = */ nullptr, /* AllocaCmd = */ nullptr, &Req);
Cmd->MBlockReason = Command::BlockReason::HostAccessor;
Cmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueBlocked;
return std::shared_ptr<Command>{Cmd};
}

Expand Down