diff --git a/sycl/source/detail/scheduler/commands.cpp b/sycl/source/detail/scheduler/commands.cpp index 49a904e60ea93..558e10cf10ca0 100644 --- a/sycl/source/detail/scheduler/commands.cpp +++ b/sycl/source/detail/scheduler/commands.cpp @@ -821,9 +821,11 @@ const char *Command::getBlockReason() const { return "A Buffer is locked by the host accessor"; case BlockReason::HostTask: return "Blocked by host task"; + case BlockReason::EnqueueFailed: + return "Failed to enqueue a preceding command"; + default: + return "Unknown block reason"; } - - return "Unknown block reason"; } AllocaCommandBase::AllocaCommandBase(CommandType Type, QueueImplPtr Queue, diff --git a/sycl/source/detail/scheduler/commands.hpp b/sycl/source/detail/scheduler/commands.hpp index d9ad883346ba8..46715ffcf7efc 100644 --- a/sycl/source/detail/scheduler/commands.hpp +++ b/sycl/source/detail/scheduler/commands.hpp @@ -279,7 +279,7 @@ class Command { /// Used for marking the node during graph traversal. Marks MMarks; - enum class BlockReason : int { HostAccessor = 0, HostTask }; + enum class BlockReason : int { HostAccessor = 0, HostTask, EnqueueFailed }; // Only have reasonable value while MIsBlockable is true BlockReason MBlockReason; diff --git a/sycl/source/detail/scheduler/graph_builder.cpp b/sycl/source/detail/scheduler/graph_builder.cpp index 531c416642b03..6b6390573c027 100644 --- a/sycl/source/detail/scheduler/graph_builder.cpp +++ b/sycl/source/detail/scheduler/graph_builder.cpp @@ -235,13 +235,7 @@ MemObjRecord *Scheduler::GraphBuilder::getOrInsertMemObjRecord( void Scheduler::GraphBuilder::updateLeaves(const std::set &Cmds, MemObjRecord *Record, - access::mode AccessMode, std::vector &ToCleanUp) { - - const bool ReadOnlyReq = AccessMode == access::mode::read; - if (ReadOnlyReq) - return; - for (Command *Cmd : Cmds) { bool WasLeaf = Cmd->MLeafCounter > 0; Cmd->MLeafCounter -= Record->MReadLeaves.remove(Cmd); @@ -253,6 +247,18 @@ void Scheduler::GraphBuilder::updateLeaves(const std::set &Cmds, } } +void Scheduler::GraphBuilder::updateLeaves(const std::set &Cmds, + MemObjRecord *Record, + access::mode AccessMode, + std::vector &ToCleanUp) { + + const bool ReadOnlyReq = AccessMode == access::mode::read; + if (ReadOnlyReq) + return; + + updateLeaves(Cmds, Record, ToCleanUp); +} + void Scheduler::GraphBuilder::addNodeToLeaves( MemObjRecord *Record, Command *Cmd, access::mode AccessMode, std::vector &ToEnqueue) { @@ -1279,6 +1285,61 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands( handleVisitedNodes(MVisitedCmds); } +void Scheduler::GraphBuilder::cleanupFailedCommand( + Command *FailedCmd, + std::vector> + &StreamsToDeallocate, + std::vector &ToCleanUp) { + + // If the failed command has no users and no dependencies, there is no reason + // to replace it with an empty command. + if (FailedCmd->MDeps.size() == 0 && FailedCmd->MUsers.size() == 0) + return; + + // Create empty command that is "ready" for enqueuing. + EmptyCommand *EmptyCmd = new EmptyCommand(FailedCmd->getQueue()); + if (!EmptyCmd) + throw runtime_error("Out of host memory", PI_ERROR_OUT_OF_HOST_MEMORY); + EmptyCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady; + EmptyCmd->MBlockReason = Command::BlockReason::EnqueueFailed; + + // Collect stream objects for the failed command. + if (FailedCmd->getType() == Command::CommandType::RUN_CG) { + auto ExecCmd = static_cast(FailedCmd); + std::vector> Streams = ExecCmd->getStreams(); + ExecCmd->clearStreams(); + StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(), + Streams.end()); + } + + for (DepDesc &Dep : FailedCmd->MDeps) { + // Replace failed command in dependency records. + const Requirement *Req = Dep.MDepRequirement; + MemObjRecord *Record = getMemObjRecord(Req->MSYCLMemObj); + updateLeaves({FailedCmd}, Record, ToCleanUp); + std::vector ToEnqueue; + addNodeToLeaves(Record, EmptyCmd, Req->MAccessMode, ToEnqueue); + assert(ToEnqueue.empty()); + + // Replace failed command as a user. + if (Dep.MDepCommand->MUsers.erase(FailedCmd)) { + Dep.MDepCommand->MUsers.insert(EmptyCmd); + EmptyCmd->MDeps.push_back(Dep); + } + } + FailedCmd->MDeps.clear(); + + for (Command *UserCmd : FailedCmd->MUsers) + for (DepDesc &Dep : UserCmd->MDeps) + if (Dep.MDepCommand == FailedCmd) + Dep.MDepCommand = EmptyCmd; + std::swap(FailedCmd->MUsers, EmptyCmd->MUsers); + + FailedCmd->getEvent()->setCommand(EmptyCmd); + assert(FailedCmd->MLeafCounter == 0); + delete FailedCmd; +} + void Scheduler::GraphBuilder::removeRecordForMemObj(SYCLMemObjI *MemObject) { const auto It = std::find_if( MMemObjs.begin(), MMemObjs.end(), diff --git a/sycl/source/detail/scheduler/scheduler.cpp b/sycl/source/detail/scheduler/scheduler.cpp index 336835d6198be..784f399d102e3 100644 --- a/sycl/source/detail/scheduler/scheduler.cpp +++ b/sycl/source/detail/scheduler/scheduler.cpp @@ -73,6 +73,16 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record, } } +static void deallocateStreams( + std::vector> &StreamsToDeallocate) { + // Deallocate buffers for stream objects of the finished commands. Iterate in + // reverse order because it is the order of commands execution. + for (auto StreamImplPtr = StreamsToDeallocate.rbegin(); + StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr) + detail::Scheduler::getInstance().deallocateStreamBuffers( + StreamImplPtr->get()); +} + EventImplPtr Scheduler::addCG(std::unique_ptr CommandGroup, const QueueImplPtr &Queue) { EventImplPtr NewEvent = nullptr; @@ -113,7 +123,7 @@ EventImplPtr Scheduler::addCG(std::unique_ptr CommandGroup, } std::vector ToCleanUp; - { + try { ReadLockT Lock = acquireReadLock(); Command *NewCmd = static_cast(NewEvent->getCommand()); @@ -121,42 +131,32 @@ EventImplPtr Scheduler::addCG(std::unique_ptr CommandGroup, EnqueueResultT Res; bool Enqueued; - auto CleanUp = [&]() { - if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) { - NewEvent->setCommand(nullptr); - delete NewCmd; - } - }; - for (Command *Cmd : AuxiliaryCmds) { Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp); - try { - if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) - throw runtime_error("Auxiliary enqueue process failed.", - PI_ERROR_INVALID_OPERATION); - } catch (...) { - // enqueueCommand() func and if statement above may throw an exception, - // so destroy required resources to avoid memory leak - CleanUp(); - std::rethrow_exception(std::current_exception()); - } + if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) + throw runtime_error("Auxiliary enqueue process failed.", + PI_ERROR_INVALID_OPERATION); } if (NewCmd) { // TODO: Check if lazy mode. EnqueueResultT Res; - try { - bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp); - if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) - throw runtime_error("Enqueue process failed.", - PI_ERROR_INVALID_OPERATION); - } catch (...) { - // enqueueCommand() func and if statement above may throw an exception, - // so destroy required resources to avoid memory leak - CleanUp(); - std::rethrow_exception(std::current_exception()); - } + bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp); + if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) + throw runtime_error("Enqueue process failed.", + PI_ERROR_INVALID_OPERATION); } + } catch (...) { + std::vector StreamsToDeallocate; + Command *NewCmd = static_cast(NewEvent->getCommand()); + if (NewCmd) { + WriteLockT Lock(MGraphLock, std::defer_lock); + MGraphBuilder.cleanupFailedCommand(NewCmd, StreamsToDeallocate, + ToCleanUp); + } + deallocateStreams(StreamsToDeallocate); + cleanupCommands(ToCleanUp); + std::rethrow_exception(std::current_exception()); } cleanupCommands(ToCleanUp); @@ -218,16 +218,6 @@ void Scheduler::waitForEvent(const EventImplPtr &Event) { cleanupCommands(ToCleanUp); } -static void deallocateStreams( - std::vector> &StreamsToDeallocate) { - // Deallocate buffers for stream objects of the finished commands. Iterate in - // reverse order because it is the order of commands execution. - for (auto StreamImplPtr = StreamsToDeallocate.rbegin(); - StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr) - detail::Scheduler::getInstance().deallocateStreamBuffers( - StreamImplPtr->get()); -} - void Scheduler::cleanupFinishedCommands(const EventImplPtr &FinishedEvent) { // We are going to traverse a graph of finished commands. Gather stream // objects from these commands if any and deallocate buffers for these stream diff --git a/sycl/source/detail/scheduler/scheduler.hpp b/sycl/source/detail/scheduler/scheduler.hpp index a09a79c846f88..3a5d37da534c0 100644 --- a/sycl/source/detail/scheduler/scheduler.hpp +++ b/sycl/source/detail/scheduler/scheduler.hpp @@ -537,6 +537,13 @@ class Scheduler { std::vector> &, std::vector> &); + /// Replaces a failed command in the subgraph with an empty command and + /// deletes the failed command. + void cleanupFailedCommand( + Command *FailedCmd, + std::vector> &, + std::vector &ToCleanUp); + /// Reschedules the command passed using Queue provided. /// /// This can lead to rescheduling of all dependent commands. This can be @@ -573,6 +580,8 @@ class Scheduler { std::vector &ToEnqueue); /// Removes commands from leaves. + void updateLeaves(const std::set &Cmds, MemObjRecord *Record, + std::vector &ToCleanUp); void updateLeaves(const std::set &Cmds, MemObjRecord *Record, access::mode AccessMode, std::vector &ToCleanUp); diff --git a/sycl/unittests/scheduler/FailedCommands.cpp b/sycl/unittests/scheduler/FailedCommands.cpp index 6e3014ce79179..4d4104604c2f7 100644 --- a/sycl/unittests/scheduler/FailedCommands.cpp +++ b/sycl/unittests/scheduler/FailedCommands.cpp @@ -9,7 +9,9 @@ #include "SchedulerTest.hpp" #include "SchedulerTestUtils.hpp" +#include #include +#include using namespace sycl; @@ -42,3 +44,109 @@ TEST_F(SchedulerTest, FailedDependency) { ASSERT_EQ(MDep.MEnqueueStatus, detail::EnqueueResultT::SyclEnqueueFailed) << "MDep should be marked as failed\n"; } + +pi_result redefinedFailingEnqueueKernelLaunch(pi_queue, pi_kernel, pi_uint32, + const size_t *, const size_t *, + const size_t *, pi_uint32, + const pi_event *, pi_event *) { + throw sycl::runtime_error( + "Exception from redefinedFailingEnqueueKernelLaunch.", + PI_ERROR_INVALID_OPERATION); + return PI_ERROR_INVALID_OPERATION; +} + +size_t MemBufRefCount = 0u; + +pi_result redefinedMemBufferCreate(pi_context, pi_mem_flags, size_t, void *, + pi_mem *ret_mem, const pi_mem_properties *) { + *ret_mem = reinterpret_cast(0x1); + ++MemBufRefCount; + return PI_SUCCESS; +} + +pi_result redefinedMemBufferPartition(pi_mem, pi_mem_flags, + pi_buffer_create_type, void *, + pi_mem *ret_mem) { + *ret_mem = reinterpret_cast(0x1); + ++MemBufRefCount; + return PI_SUCCESS; +} + +pi_result redefinedMemRetain(pi_mem) { + ++MemBufRefCount; + return PI_SUCCESS; +} + +pi_result redefinedMemRelease(pi_mem) { + --MemBufRefCount; + return PI_SUCCESS; +} + +TEST_F(SchedulerTest, FailedCommandAccessorCleanup) { + unittest::PiMock Mock; + platform Plt = Mock.getPlatform(); + MemBufRefCount = 0u; + Mock.redefine( + redefinedFailingEnqueueKernelLaunch); + Mock.redefine(redefinedMemBufferCreate); + Mock.redefine(redefinedMemRetain); + Mock.redefine(redefinedMemRelease); + + { + context Ctx{Plt}; + queue Q{Ctx, default_selector_v}; + + kernel_bundle KernelBundle = + sycl::get_kernel_bundle(Ctx); + auto ExecBundle = sycl::build(KernelBundle); + + buffer Buff{sycl::range<1>(1)}; + + try { + Q.submit([&](sycl::handler &CGH) { + auto Acc = Buff.get_access(CGH); + CGH.use_kernel_bundle(ExecBundle); + CGH.single_task>([=] {}); + }); + FAIL() << "No exception was thrown."; + } catch (...) { + } + } + + ASSERT_EQ(MemBufRefCount, 0u) << "Memory leak detected."; +} + +TEST_F(SchedulerTest, FailedCommandStreamCleanup) { + unittest::PiMock Mock; + platform Plt = Mock.getPlatform(); + MemBufRefCount = 0u; + Mock.redefine( + redefinedFailingEnqueueKernelLaunch); + Mock.redefine(redefinedMemBufferCreate); + Mock.redefine( + redefinedMemBufferPartition); + Mock.redefine(redefinedMemRetain); + Mock.redefine(redefinedMemRelease); + + { + context Ctx{Plt}; + queue Q{Ctx, default_selector_v}; + + kernel_bundle KernelBundle = + sycl::get_kernel_bundle(Ctx); + auto ExecBundle = sycl::build(KernelBundle); + + try { + Q.submit([&](sycl::handler &CGH) { + sycl::stream KernelStream(108 * 64 + 128, 64, CGH); + CGH.use_kernel_bundle(ExecBundle); + CGH.single_task>([=] {}); + }); + FAIL() << "No exception was thrown."; + } catch (...) { + } + Q.wait(); + } + + ASSERT_EQ(MemBufRefCount, 0u) << "Memory leak detected."; +} diff --git a/sycl/unittests/scheduler/LeavesCollection.cpp b/sycl/unittests/scheduler/LeavesCollection.cpp index ea883041add66..45e93bb67ab1a 100644 --- a/sycl/unittests/scheduler/LeavesCollection.cpp +++ b/sycl/unittests/scheduler/LeavesCollection.cpp @@ -42,6 +42,7 @@ createEmptyCommand(const std::shared_ptr &Q, EmptyCommand *Cmd = new EmptyCommand(Q); Cmd->addRequirement(/* DepCmd = */ nullptr, /* AllocaCmd = */ nullptr, &Req); Cmd->MBlockReason = Command::BlockReason::HostAccessor; + Cmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueBlocked; return std::shared_ptr{Cmd}; }