diff --git a/sycl/source/detail/scheduler/graph_builder.cpp b/sycl/source/detail/scheduler/graph_builder.cpp index ed2ee3e6f78dc..d0bc43c3e6078 100644 --- a/sycl/source/detail/scheduler/graph_builder.cpp +++ b/sycl/source/detail/scheduler/graph_builder.cpp @@ -234,13 +234,7 @@ MemObjRecord *Scheduler::GraphBuilder::getOrInsertMemObjRecord( void Scheduler::GraphBuilder::updateLeaves(const std::set &Cmds, MemObjRecord *Record, - access::mode AccessMode, std::vector &ToCleanUp) { - - const bool ReadOnlyReq = AccessMode == access::mode::read; - if (ReadOnlyReq) - return; - for (Command *Cmd : Cmds) { bool WasLeaf = Cmd->MLeafCounter > 0; Cmd->MLeafCounter -= Record->MReadLeaves.remove(Cmd); @@ -252,6 +246,18 @@ void Scheduler::GraphBuilder::updateLeaves(const std::set &Cmds, } } +void Scheduler::GraphBuilder::updateLeaves(const std::set &Cmds, + MemObjRecord *Record, + access::mode AccessMode, + std::vector &ToCleanUp) { + + const bool ReadOnlyReq = AccessMode == access::mode::read; + if (ReadOnlyReq) + return; + + updateLeaves(Cmds, Record, ToCleanUp); +} + void Scheduler::GraphBuilder::addNodeToLeaves( MemObjRecord *Record, Command *Cmd, access::mode AccessMode, std::vector &ToEnqueue) { @@ -1253,6 +1259,60 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands( handleVisitedNodes(MVisitedCmds); } +void Scheduler::GraphBuilder::cleanupFailedCommand( + Command *FailedCmd, + std::vector> + &StreamsToDeallocate, + std::vector &ToCleanUp) { + + // If the failed command has no users and no dependencies, there is no reason + // to replace it with an empty command. + if (FailedCmd->MDeps.size() == 0 && FailedCmd->MUsers.size() == 0) + return; + + // Create empty command that is "ready" for enqueuing. + EmptyCommand *EmptyCmd = new EmptyCommand(FailedCmd->getQueue()); + if (!EmptyCmd) + throw runtime_error("Out of host memory", PI_OUT_OF_HOST_MEMORY); + EmptyCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady; + + // Collect stream objects for the failed command. + if (FailedCmd->getType() == Command::CommandType::RUN_CG) { + auto ExecCmd = static_cast(FailedCmd); + std::vector> Streams = ExecCmd->getStreams(); + ExecCmd->clearStreams(); + StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(), + Streams.end()); + } + + for (DepDesc &Dep : FailedCmd->MDeps) { + // Replace failed command in dependency records. + const Requirement *Req = Dep.MDepRequirement; + MemObjRecord *Record = getMemObjRecord(Req->MSYCLMemObj); + updateLeaves({FailedCmd}, Record, ToCleanUp); + std::vector ToEnqueue; + addNodeToLeaves(Record, EmptyCmd, Req->MAccessMode, ToEnqueue); + assert(ToEnqueue.empty()); + + // Replace failed command as a user. + if (Dep.MDepCommand->MUsers.erase(FailedCmd)) { + Dep.MDepCommand->MUsers.insert(EmptyCmd); + EmptyCmd->MDeps.push_back(Dep); + } + } + FailedCmd->MDeps.clear(); + + for (Command *UserCmd : FailedCmd->MUsers) + for (DepDesc &Dep : UserCmd->MDeps) + if (Dep.MDepCommand == FailedCmd) + Dep.MDepCommand = EmptyCmd; + std::swap(FailedCmd->MUsers, EmptyCmd->MUsers); + + FailedCmd->getEvent()->setCommand(EmptyCmd); + assert(FailedCmd->MLeafCounter == 0); + delete FailedCmd; +} + void Scheduler::GraphBuilder::removeRecordForMemObj(SYCLMemObjI *MemObject) { const auto It = std::find_if( MMemObjs.begin(), MMemObjs.end(), diff --git a/sycl/source/detail/scheduler/leaves_collection.cpp b/sycl/source/detail/scheduler/leaves_collection.cpp index 0ae0bcfbb9c0e..8bac1b95daeed 100644 --- a/sycl/source/detail/scheduler/leaves_collection.cpp +++ b/sycl/source/detail/scheduler/leaves_collection.cpp @@ -32,6 +32,7 @@ static inline bool doOverlap(const Requirement *LHS, const Requirement *RHS) { static inline bool isHostAccessorCmd(Command *Cmd) { return Cmd->getType() == Command::EMPTY_TASK && + Cmd->MEnqueueStatus == EnqueueResultT::SyclEnqueueBlocked && Cmd->MBlockReason == Command::BlockReason::HostAccessor; } diff --git a/sycl/source/detail/scheduler/scheduler.cpp b/sycl/source/detail/scheduler/scheduler.cpp index 232ee0a5d6e47..7be417bb48c8c 100644 --- a/sycl/source/detail/scheduler/scheduler.cpp +++ b/sycl/source/detail/scheduler/scheduler.cpp @@ -70,6 +70,16 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record, } } +static void deallocateStreams( + std::vector> &StreamsToDeallocate) { + // Deallocate buffers for stream objects of the finished commands. Iterate in + // reverse order because it is the order of commands execution. + for (auto StreamImplPtr = StreamsToDeallocate.rbegin(); + StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr) + detail::Scheduler::getInstance().deallocateStreamBuffers( + StreamImplPtr->get()); +} + EventImplPtr Scheduler::addCG(std::unique_ptr CommandGroup, QueueImplPtr Queue) { EventImplPtr NewEvent = nullptr; @@ -111,7 +121,7 @@ EventImplPtr Scheduler::addCG(std::unique_ptr CommandGroup, } std::vector ToCleanUp; - { + try { ReadLockT Lock(MGraphLock); Command *NewCmd = static_cast(NewEvent->getCommand()); @@ -119,50 +129,43 @@ EventImplPtr Scheduler::addCG(std::unique_ptr CommandGroup, EnqueueResultT Res; bool Enqueued; - auto CleanUp = [&]() { - if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) { - if (Type == CG::RunOnHostIntel) - static_cast(NewCmd)->releaseCG(); - - NewEvent->setCommand(nullptr); - delete NewCmd; - } - }; - for (Command *Cmd : AuxiliaryCmds) { Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp); - try { - if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) - throw runtime_error("Auxiliary enqueue process failed.", - PI_INVALID_OPERATION); - } catch (...) { - // enqueueCommand() func and if statement above may throw an exception, - // so destroy required resources to avoid memory leak - CleanUp(); - std::rethrow_exception(std::current_exception()); - } + if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) + throw runtime_error("Auxiliary enqueue process failed.", + PI_INVALID_OPERATION); } if (NewCmd) { // TODO: Check if lazy mode. EnqueueResultT Res; - try { - bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp); - if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) - throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION); - } catch (...) { - // enqueueCommand() func and if statement above may throw an exception, - // so destroy required resources to avoid memory leak - CleanUp(); - std::rethrow_exception(std::current_exception()); - } + bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp); + if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult) + throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION); // If there are no memory dependencies decouple and free the command. // Though, dismiss ownership of native kernel command group as it's // resources may be in use by backend and synchronization point here is // at native kernel execution finish. - CleanUp(); + if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) { + if (Type == CG::RunOnHostIntel) + static_cast(NewCmd)->releaseCG(); + + NewEvent->setCommand(nullptr); + delete NewCmd; + } } + } catch (...) { + std::vector StreamsToDeallocate; + Command *NewCmd = static_cast(NewEvent->getCommand()); + if (NewCmd) { + WriteLockT Lock(MGraphLock, std::defer_lock); + MGraphBuilder.cleanupFailedCommand(NewCmd, StreamsToDeallocate, + ToCleanUp); + } + deallocateStreams(StreamsToDeallocate); + cleanupCommands(ToCleanUp); + std::rethrow_exception(std::current_exception()); } cleanupCommands(ToCleanUp); @@ -223,16 +226,6 @@ void Scheduler::waitForEvent(EventImplPtr Event) { cleanupCommands(ToCleanUp); } -static void deallocateStreams( - std::vector> &StreamsToDeallocate) { - // Deallocate buffers for stream objects of the finished commands. Iterate in - // reverse order because it is the order of commands execution. - for (auto StreamImplPtr = StreamsToDeallocate.rbegin(); - StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr) - detail::Scheduler::getInstance().deallocateStreamBuffers( - StreamImplPtr->get()); -} - void Scheduler::cleanupFinishedCommands(EventImplPtr FinishedEvent) { // We are going to traverse a graph of finished commands. Gather stream // objects from these commands if any and deallocate buffers for these stream diff --git a/sycl/source/detail/scheduler/scheduler.hpp b/sycl/source/detail/scheduler/scheduler.hpp index 18ed2f5004c06..9ae0386cd5f48 100644 --- a/sycl/source/detail/scheduler/scheduler.hpp +++ b/sycl/source/detail/scheduler/scheduler.hpp @@ -516,6 +516,13 @@ class Scheduler { Command *FinishedCmd, std::vector> &); + /// Replaces a failed command in the subgraph with an empty command and + /// deletes the failed command. + void cleanupFailedCommand( + Command *FailedCmd, + std::vector> &, + std::vector &ToCleanUp); + /// Reschedules the command passed using Queue provided. /// /// This can lead to rescheduling of all dependent commands. This can be @@ -551,6 +558,8 @@ class Scheduler { std::vector &ToEnqueue); /// Removes commands from leaves. + void updateLeaves(const std::set &Cmds, MemObjRecord *Record, + std::vector &ToCleanUp); void updateLeaves(const std::set &Cmds, MemObjRecord *Record, access::mode AccessMode, std::vector &ToCleanUp); diff --git a/sycl/unittests/scheduler/FailedCommands.cpp b/sycl/unittests/scheduler/FailedCommands.cpp index 4d294b8eff741..5fd9523994d80 100644 --- a/sycl/unittests/scheduler/FailedCommands.cpp +++ b/sycl/unittests/scheduler/FailedCommands.cpp @@ -9,8 +9,58 @@ #include "SchedulerTest.hpp" #include "SchedulerTestUtils.hpp" +#include +#include +#include + using namespace cl::sycl; +class TestKernel; + +__SYCL_INLINE_NAMESPACE(cl) { +namespace sycl { +namespace detail { +template <> struct KernelInfo { + static constexpr unsigned getNumParams() { return 0; } + static const kernel_param_desc_t &getParamDesc(int) { + static kernel_param_desc_t Dummy; + return Dummy; + } + static constexpr const char *getName() { return "TestKernel"; } + static constexpr bool isESIMD() { return false; } + static constexpr bool callsThisItem() { return false; } + static constexpr bool callsAnyThisFreeFunction() { return false; } +}; + +} // namespace detail +} // namespace sycl +} // __SYCL_INLINE_NAMESPACE(cl) + +static sycl::unittest::PiImage generateDefaultImage() { + using namespace sycl::unittest; + + PiPropertySet PropSet; + + std::vector Bin{0, 1, 2, 3, 4, 5}; // Random data + + PiArray Entries = makeEmptyKernels({"TestKernel"}); + + PiImage Img{PI_DEVICE_BINARY_TYPE_SPIRV, // Format + __SYCL_PI_DEVICE_BINARY_TARGET_SPIRV64, // DeviceTargetSpec + "", // Compile options + "", // Link options + std::move(Bin), + std::move(Entries), + std::move(PropSet)}; + + return Img; +} + +static sycl::unittest::PiImage Img = generateDefaultImage(); +static sycl::unittest::PiImageArray<1> ImgArray{&Img}; + +using namespace sycl; + TEST_F(SchedulerTest, FailedDependency) { detail::Requirement MockReq = getMockRequirement(); MockCommand MDep(detail::getSyclObjImpl(MQueue)); @@ -36,3 +86,132 @@ TEST_F(SchedulerTest, FailedDependency) { ASSERT_EQ(MDep.MEnqueueStatus, detail::EnqueueResultT::SyclEnqueueFailed) << "MDep should be marked as failed\n"; } + +pi_result redefinedFailingEnqueueKernelLaunch(pi_queue, pi_kernel, pi_uint32, + const size_t *, const size_t *, + const size_t *, pi_uint32, + const pi_event *, pi_event *) { + throw sycl::runtime_error( + "Exception from redefinedFailingEnqueueKernelLaunch.", + PI_INVALID_OPERATION); +} + +size_t MemBufRefCount = 0u; + +pi_result redefinedMemBufferCreate(pi_context, pi_mem_flags, size_t, void *, + pi_mem *ret_mem, const pi_mem_properties *) { + *ret_mem = reinterpret_cast(0x1); + ++MemBufRefCount; + return PI_SUCCESS; +} + +pi_result redefinedMemBufferPartition(pi_mem, pi_mem_flags, + pi_buffer_create_type, void *, + pi_mem *ret_mem) { + *ret_mem = reinterpret_cast(0x1); + ++MemBufRefCount; + return PI_SUCCESS; +} + +pi_result redefinedMemRetain(pi_mem) { + ++MemBufRefCount; + return PI_SUCCESS; +} + +pi_result redefinedMemRelease(pi_mem) { + --MemBufRefCount; + return PI_SUCCESS; +} + +TEST_F(SchedulerTest, FailedCommandAccessorCleanup) { + default_selector Selector; + platform Plt{default_selector()}; + if (Plt.is_host()) { + std::cout << "Not run due to host-only environment\n"; + return; + } + if (Plt.get_backend() == sycl::backend::ext_oneapi_cuda || + Plt.get_backend() == sycl::backend::ext_oneapi_hip) { + std::cout << "CUDA and HIP backends do not currently support this test\n"; + return; + } + + unittest::PiMock Mock{Plt}; + setupDefaultMockAPIs(Mock); + MemBufRefCount = 0u; + Mock.redefine( + redefinedFailingEnqueueKernelLaunch); + Mock.redefine(redefinedMemBufferCreate); + Mock.redefine(redefinedMemRetain); + Mock.redefine(redefinedMemRelease); + + { + context Ctx{Plt}; + queue Q{Ctx, Selector}; + + kernel_bundle KernelBundle = + sycl::get_kernel_bundle(Ctx); + auto ExecBundle = sycl::build(KernelBundle); + + buffer Buff{cl::sycl::range<1>(1)}; + + try { + Q.submit([&](sycl::handler &CGH) { + auto Acc = Buff.get_access(CGH); + CGH.use_kernel_bundle(ExecBundle); + CGH.single_task([=] {}); + }); + FAIL() << "No exception was thrown."; + } catch (...) { + } + } + + ASSERT_EQ(MemBufRefCount, 0u) << "Memory leak detected."; +} + +TEST_F(SchedulerTest, FailedCommandStreamCleanup) { + default_selector Selector; + platform Plt{default_selector()}; + if (Plt.is_host()) { + std::cout << "Not run due to host-only environment\n"; + return; + } + if (Plt.get_backend() == sycl::backend::ext_oneapi_cuda || + Plt.get_backend() == sycl::backend::ext_oneapi_hip) { + std::cout << "CUDA and HIP backends do not currently support this test\n"; + return; + } + + unittest::PiMock Mock{Plt}; + setupDefaultMockAPIs(Mock); + MemBufRefCount = 0u; + Mock.redefine( + redefinedFailingEnqueueKernelLaunch); + Mock.redefine(redefinedMemBufferCreate); + Mock.redefine( + redefinedMemBufferPartition); + Mock.redefine(redefinedMemRetain); + Mock.redefine(redefinedMemRelease); + + { + context Ctx{Plt}; + queue Q{Ctx, Selector}; + + kernel_bundle KernelBundle = + sycl::get_kernel_bundle(Ctx); + auto ExecBundle = sycl::build(KernelBundle); + + try { + Q.submit([&](sycl::handler &CGH) { + sycl::stream KernelStream(108 * 64 + 128, 64, CGH); + CGH.use_kernel_bundle(ExecBundle); + CGH.single_task([=] {}); + }); + FAIL() << "No exception was thrown."; + } catch (...) { + } + Q.wait(); + } + + ASSERT_EQ(MemBufRefCount, 0u) << "Memory leak detected."; +} diff --git a/sycl/unittests/scheduler/LeavesCollection.cpp b/sycl/unittests/scheduler/LeavesCollection.cpp index 19d243388d198..ee62fc89c8277 100644 --- a/sycl/unittests/scheduler/LeavesCollection.cpp +++ b/sycl/unittests/scheduler/LeavesCollection.cpp @@ -43,6 +43,7 @@ createEmptyCommand(const std::shared_ptr &Q, EmptyCommand *Cmd = new EmptyCommand(Q); Cmd->addRequirement(/* DepCmd = */ nullptr, /* AllocaCmd = */ nullptr, &Req); Cmd->MBlockReason = Command::BlockReason::HostAccessor; + Cmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueBlocked; return std::shared_ptr{Cmd}; }