diff --git a/sycl/plugins/hip/pi_hip.cpp b/sycl/plugins/hip/pi_hip.cpp index 3be580aca8bde..01d76ac0702e3 100644 --- a/sycl/plugins/hip/pi_hip.cpp +++ b/sycl/plugins/hip/pi_hip.cpp @@ -152,20 +152,20 @@ pi_result forLatestEvents(const pi_event *event_wait_list, std::sort(events.begin(), events.end(), [](pi_event e0, pi_event e1) { // Tiered sort creating sublists of streams (smallest value first) in which // the corresponding events are sorted into a sequence of newest first. - return e0->get_queue()->stream_ < e1->get_queue()->stream_ || - (e0->get_queue()->stream_ == e1->get_queue()->stream_ && + return e0->get_stream() < e1->get_stream() || + (e0->get_stream() == e1->get_stream() && e0->get_event_id() > e1->get_event_id()); }); bool first = true; hipStream_t lastSeenStream = 0; for (pi_event event : events) { - if (!event || (!first && event->get_queue()->stream_ == lastSeenStream)) { + if (!event || (!first && event->get_stream() == lastSeenStream)) { continue; } first = false; - lastSeenStream = event->get_queue()->stream_; + lastSeenStream = event->get_stream(); auto result = f(event); if (result != PI_SUCCESS) { @@ -342,6 +342,36 @@ void simpleGuessLocalWorkSize(size_t *threadsPerBlock, } } +pi_result enqueueEventsWait(pi_queue command_queue, hipStream_t stream, + pi_uint32 num_events_in_wait_list, + const pi_event *event_wait_list) { + if (!event_wait_list) { + return PI_SUCCESS; + } + try { + ScopedContext active(command_queue->get_context()); + + auto result = forLatestEvents( + event_wait_list, num_events_in_wait_list, + [stream](pi_event event) -> pi_result { + if (event->get_stream() == stream) { + return PI_SUCCESS; + } else { + return PI_CHECK_ERROR(hipStreamWaitEvent(stream, event->get(), 0)); + } + }); + + if (result != PI_SUCCESS) { + return result; + } + return PI_SUCCESS; + } catch (pi_result err) { + return err; + } catch (...) { + return PI_ERROR_UNKNOWN; + } +} + } // anonymous namespace /// ------ Error handling, matching OpenCL plugin semantics. @@ -395,10 +425,82 @@ pi_result hip_piEventRetain(pi_event event); /// \endcond -_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue) - : commandType_{type}, refCount_{1}, isCompleted_{false}, isRecorded_{false}, - isStarted_{false}, evEnd_{nullptr}, evStart_{nullptr}, evQueued_{nullptr}, - queue_{queue}, context_{context} { +hipStream_t _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) { + pi_uint32 stream_i; + while (true) { + if (num_compute_streams_ < compute_streams_.size()) { + // the check above is for performance - so as not to lock mutex every time + std::lock_guard guard(compute_stream_mutex_); + // The second check is done after mutex is locked so other threads can not + // change num_compute_streams_ after that + if (num_compute_streams_ < compute_streams_.size()) { + PI_CHECK_ERROR(hipStreamCreateWithFlags( + &compute_streams_[num_compute_streams_++], flags_)); + } + } + stream_i = compute_stream_idx_++; + // if a stream has been reused before it was next selected round-robin + // fashion, we want to delay its next use and instead select another one + // that is more likely to have completed all the enqueued work. + if (delay_compute_[stream_i % compute_streams_.size()]) { + delay_compute_[stream_i % compute_streams_.size()] = false; + } else { + break; + } + } + if (stream_token) { + *stream_token = stream_i; + } + return compute_streams_[stream_i % compute_streams_.size()]; +} + +hipStream_t _pi_queue::get_next_compute_stream( + pi_uint32 num_events_in_wait_list, const pi_event *event_wait_list, + _pi_stream_guard &guard, pi_uint32 *stream_token) { + for (pi_uint32 i = 0; i < num_events_in_wait_list; i++) { + pi_uint32 token = event_wait_list[i]->get_stream_token(); + if (event_wait_list[i]->get_queue() == this && can_reuse_stream(token)) { + std::unique_lock compute_sync_guard( + compute_stream_sync_mutex_); + // redo the check after lock to avoid data races on + // last_sync_compute_streams_ + if (can_reuse_stream(token)) { + delay_compute_[token % delay_compute_.size()] = true; + if (stream_token) { + *stream_token = token; + } + guard = _pi_stream_guard{std::move(compute_sync_guard)}; + return event_wait_list[i]->get_stream(); + } + } + } + guard = {}; + return get_next_compute_stream(stream_token); +} + +hipStream_t _pi_queue::get_next_transfer_stream() { + if (transfer_streams_.empty()) { // for example in in-order queue + return get_next_compute_stream(); + } + if (num_transfer_streams_ < transfer_streams_.size()) { + // the check above is for performance - so as not to lock mutex every time + std::lock_guard guard(transfer_stream_mutex_); + // The second check is done after mutex is locked so other threads can not + // change num_transfer_streams_ after that + if (num_transfer_streams_ < transfer_streams_.size()) { + PI_CHECK_ERROR(hipStreamCreateWithFlags( + &transfer_streams_[num_transfer_streams_++], flags_)); + } + } + return transfer_streams_[transfer_stream_idx_++ % transfer_streams_.size()]; +} + +_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue, + hipStream_t stream, pi_uint32 stream_token) + : commandType_{type}, refCount_{1}, hasBeenWaitedOn_{false}, + isRecorded_{false}, isStarted_{false}, + streamToken_{stream_token}, evEnd_{nullptr}, evStart_{nullptr}, + evQueued_{nullptr}, queue_{queue}, stream_{stream}, context_{context} { assert(type != PI_COMMAND_TYPE_USER); @@ -447,7 +549,7 @@ bool _pi_event::is_completed() const noexcept { if (!isRecorded_) { return false; } - if (!isCompleted_) { + if (!hasBeenWaitedOn_) { const hipError_t ret = hipEventQuery(evEnd_); if (ret != hipSuccess && ret != hipErrorNotReady) { PI_CHECK_ERROR(ret); @@ -497,15 +599,13 @@ pi_result _pi_event::record() { return PI_ERROR_INVALID_QUEUE; } - hipStream_t hipStream = queue_->get(); - try { eventId_ = queue_->get_next_event_id(); if (eventId_ == 0) { cl::sycl::detail::pi::die( "Unrecoverable program state reached in event identifier overflow"); } - result = PI_CHECK_ERROR(hipEventRecord(evEnd_, hipStream)); + result = PI_CHECK_ERROR(hipEventRecord(evEnd_, stream_)); } catch (pi_result error) { result = error; } @@ -521,7 +621,7 @@ pi_result _pi_event::wait() { pi_result retErr; try { retErr = PI_CHECK_ERROR(hipEventSynchronize(evEnd_)); - isCompleted_ = true; + hasBeenWaitedOn_ = true; } catch (pi_result error) { retErr = error; } @@ -546,7 +646,10 @@ pi_result enqueueEventWait(pi_queue queue, pi_event event) { // for native events, the hipStreamWaitEvent call is used. // This makes all future work submitted to stream wait for all // work captured in event. - return PI_CHECK_ERROR(hipStreamWaitEvent(queue->get(), event->get(), 0)); + queue->for_each_stream([e = event->get()](hipStream_t s) { + PI_CHECK_ERROR(hipStreamWaitEvent(s, e, 0)); + }); + return PI_SUCCESS; } _pi_program::_pi_program(pi_context ctxt) @@ -2190,8 +2293,6 @@ pi_result hip_piextMemCreateWithNativeHandle(pi_native_handle nativeHandle, pi_result hip_piQueueCreate(pi_context context, pi_device device, pi_queue_properties properties, pi_queue *queue) { try { - pi_result err = PI_SUCCESS; - std::unique_ptr<_pi_queue> queueImpl{nullptr}; if (context->get_device() != device) { @@ -2199,17 +2300,19 @@ pi_result hip_piQueueCreate(pi_context context, pi_device device, return PI_ERROR_INVALID_DEVICE; } - ScopedContext active(context); + unsigned int flags = 0; - hipStream_t hipStream; + const bool is_out_of_order = + properties & PI_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE; - err = PI_CHECK_ERROR(hipStreamCreate(&hipStream)); - if (err != PI_SUCCESS) { - return err; - } + std::vector computeHipStreams( + is_out_of_order ? _pi_queue::default_num_compute_streams : 1); + std::vector transferHipStreams( + is_out_of_order ? _pi_queue::default_num_transfer_streams : 0); - queueImpl = std::unique_ptr<_pi_queue>( - new _pi_queue{hipStream, context, device, properties}); + queueImpl = std::unique_ptr<_pi_queue>(new _pi_queue{ + std::move(computeHipStreams), std::move(transferHipStreams), context, + device, properties, flags}); *queue = queueImpl.release(); @@ -2269,9 +2372,10 @@ pi_result hip_piQueueRelease(pi_queue command_queue) { ScopedContext active(command_queue->get_context()); - auto stream = queueImpl->stream_; - PI_CHECK_ERROR(hipStreamSynchronize(stream)); - PI_CHECK_ERROR(hipStreamDestroy(stream)); + command_queue->for_each_stream([](hipStream_t s) { + PI_CHECK_ERROR(hipStreamSynchronize(s)); + PI_CHECK_ERROR(hipStreamDestroy(s)); + }); return PI_SUCCESS; } catch (pi_result err) { @@ -2291,7 +2395,10 @@ pi_result hip_piQueueFinish(pi_queue command_queue) { assert(command_queue != nullptr); // need PI_ERROR_INVALID_EXTERNAL_HANDLE error code ScopedContext active(command_queue->get_context()); - result = PI_CHECK_ERROR(hipStreamSynchronize(command_queue->stream_)); + + command_queue->sync_streams([&result](hipStream_t s) { + result = PI_CHECK_ERROR(hipStreamSynchronize(s)); + }); } catch (pi_result err) { @@ -2321,7 +2428,9 @@ pi_result hip_piQueueFlush(pi_queue command_queue) { /// \return PI_SUCCESS pi_result hip_piextQueueGetNativeHandle(pi_queue queue, pi_native_handle *nativeHandle) { - *nativeHandle = reinterpret_cast(queue->get()); + ScopedContext active(queue->get_context()); + *nativeHandle = + reinterpret_cast(queue->get_next_compute_stream()); return PI_SUCCESS; } @@ -2362,18 +2471,17 @@ pi_result hip_piEnqueueMemBufferWrite(pi_queue command_queue, pi_mem buffer, assert(buffer != nullptr); assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); - - retErr = hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + hipStream_t hipStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_WRITE, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_WRITE, command_queue, hipStream)); retImplEv->start(); } @@ -2408,18 +2516,17 @@ pi_result hip_piEnqueueMemBufferRead(pi_queue command_queue, pi_mem buffer, assert(buffer != nullptr); assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); - - retErr = hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + hipStream_t hipStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_READ, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_READ, command_queue, hipStream)); retImplEv->start(); } @@ -2667,11 +2774,15 @@ pi_result hip_piEnqueueKernelLaunch( try { ScopedContext active(command_queue->get_context()); - hipStream_t hipStream = command_queue->get(); + + pi_uint32 stream_token; + _pi_stream_guard guard; + hipStream_t hipStream = command_queue->get_next_compute_stream( + num_events_in_wait_list, event_wait_list, guard, &stream_token); hipFunction_t hipFunc = kernel->get(); - retError = hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retError = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); // Set the implicit global offset parameter if kernel has offset variant if (kernel->get_with_offset_parameter()) { @@ -2692,8 +2803,9 @@ pi_result hip_piEnqueueKernelLaunch( auto argIndices = kernel->get_arg_indices(); if (event) { - retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue)); + retImplEv = std::unique_ptr<_pi_event>( + _pi_event::make_native(PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue, + hipStream, stream_token)); retImplEv->start(); } @@ -2703,11 +2815,9 @@ pi_result hip_piEnqueueKernelLaunch( kernel->get_local_size(), hipStream, argIndices.data(), nullptr)); kernel->clear_local_size(); - if (event) { - retError = retImplEv->record(); - } if (event) { + retError = retImplEv->record(); *event = retImplEv.release(); } } catch (pi_result err) { @@ -3593,7 +3703,12 @@ pi_result hip_piEnqueueEventsWaitWithBarrier(pi_queue command_queue, } if (event) { - *event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue); + pi_uint32 stream_token; + _pi_stream_guard guard; + hipStream_t hipStream = command_queue->get_next_compute_stream( + num_events_in_wait_list, event_wait_list, guard, &stream_token); + *event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue, + hipStream, stream_token); (*event)->start(); (*event)->record(); } @@ -3849,19 +3964,19 @@ pi_result hip_piEnqueueMemBufferReadRect( assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); void *devPtr = buffer->mem_.buffer_mem_.get_void(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); + hipStream_t hipStream = command_queue->get_next_transfer_stream(); - retErr = hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_READ_RECT, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_READ_RECT, command_queue, hipStream)); retImplEv->start(); } @@ -3900,19 +4015,18 @@ pi_result hip_piEnqueueMemBufferWriteRect( assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); void *devPtr = buffer->mem_.buffer_mem_.get_void(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); - - retErr = hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + hipStream_t hipStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_WRITE_RECT, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_WRITE_RECT, command_queue, hipStream)); retImplEv->start(); } @@ -3953,21 +4067,20 @@ pi_result hip_piEnqueueMemBufferCopy(pi_queue command_queue, pi_mem src_buffer, try { ScopedContext active(command_queue->get_context()); + pi_result result; + auto stream = command_queue->get_next_transfer_stream(); if (event_wait_list) { - hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + result = enqueueEventsWait(command_queue, stream, num_events_in_wait_list, + event_wait_list); } - pi_result result; - if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_COPY, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_COPY, command_queue, stream)); result = retImplEv->start(); } - auto stream = command_queue->get(); auto src = src_buffer->mem_.buffer_mem_.get_with_offset(src_offset); auto dst = dst_buffer->mem_.buffer_mem_.get_with_offset(dst_offset); @@ -3999,20 +4112,19 @@ pi_result hip_piEnqueueMemBufferCopyRect( assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); void *srcPtr = src_buffer->mem_.buffer_mem_.get_void(); void *dstPtr = dst_buffer->mem_.buffer_mem_.get_void(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); - - retErr = hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + hipStream_t hipStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_COPY_RECT, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_COPY_RECT, command_queue, hipStream)); retImplEv->start(); } @@ -4060,21 +4172,20 @@ pi_result hip_piEnqueueMemBufferFill(pi_queue command_queue, pi_mem buffer, try { ScopedContext active(command_queue->get_context()); + auto stream = command_queue->get_next_transfer_stream(); + pi_result result; if (event_wait_list) { - hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + result = enqueueEventsWait(command_queue, stream, num_events_in_wait_list, + event_wait_list); } - pi_result result; - if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_FILL, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_FILL, command_queue, stream)); result = retImplEv->start(); } auto dstDevice = buffer->mem_.buffer_mem_.get_with_offset(offset); - auto stream = command_queue->get(); auto N = size / pattern_size; // pattern size in bytes @@ -4254,14 +4365,14 @@ pi_result hip_piEnqueueMemImageRead(pi_queue command_queue, pi_mem image, assert(image->mem_type_ == _pi_mem::mem_type::surface); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); try { ScopedContext active(command_queue->get_context()); + hipStream_t hipStream = command_queue->get_next_transfer_stream(); if (event_wait_list) { - hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); } hipArray *array = image->mem_.surface_mem_.get_array(); @@ -4289,8 +4400,8 @@ pi_result hip_piEnqueueMemImageRead(pi_queue command_queue, pi_mem image, } if (event) { - auto new_event = - _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_READ, command_queue); + auto new_event = _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_READ, + command_queue, hipStream); new_event->record(); *event = new_event; } @@ -4323,14 +4434,14 @@ pi_result hip_piEnqueueMemImageWrite(pi_queue command_queue, pi_mem image, assert(image->mem_type_ == _pi_mem::mem_type::surface); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); try { ScopedContext active(command_queue->get_context()); + hipStream_t hipStream = command_queue->get_next_transfer_stream(); if (event_wait_list) { - hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); } hipArray *array = image->mem_.surface_mem_.get_array(); @@ -4358,8 +4469,8 @@ pi_result hip_piEnqueueMemImageWrite(pi_queue command_queue, pi_mem image, } if (event) { - auto new_event = - _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_WRITE, command_queue); + auto new_event = _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_WRITE, + command_queue, hipStream); new_event->record(); *event = new_event; } @@ -4388,14 +4499,13 @@ pi_result hip_piEnqueueMemImageCopy(pi_queue command_queue, pi_mem src_image, dst_image->mem_.surface_mem_.get_image_type()); pi_result retErr = PI_SUCCESS; - hipStream_t hipStream = command_queue->get(); try { ScopedContext active(command_queue->get_context()); - + hipStream_t hipStream = command_queue->get_next_transfer_stream(); if (event_wait_list) { - hip_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retErr = enqueueEventsWait(command_queue, hipStream, + num_events_in_wait_list, event_wait_list); } hipArray *srcArray = src_image->mem_.surface_mem_.get_array(); @@ -4432,8 +4542,8 @@ pi_result hip_piEnqueueMemImageCopy(pi_queue command_queue, pi_mem src_image, } if (event) { - auto new_event = - _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_COPY, command_queue); + auto new_event = _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_COPY, + command_queue, hipStream); new_event->record(); *event = new_event; } @@ -4515,8 +4625,9 @@ pi_result hip_piEnqueueMemBufferMap(pi_queue command_queue, pi_mem buffer, if (event) { try { - *event = _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_MAP, - command_queue); + *event = _pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_MAP, command_queue, + command_queue->get_next_transfer_stream()); (*event)->start(); (*event)->record(); } catch (pi_result error) { @@ -4569,8 +4680,9 @@ pi_result hip_piEnqueueMemUnmap(pi_queue command_queue, pi_mem memobj, if (event) { try { - *event = _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_UNMAP, - command_queue); + *event = _pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_UNMAP, command_queue, + command_queue->get_next_transfer_stream()); (*event)->start(); (*event)->record(); } catch (pi_result error) { @@ -4688,17 +4800,20 @@ pi_result hip_piextUSMEnqueueMemset(pi_queue queue, void *ptr, pi_int32 value, assert(queue != nullptr); assert(ptr != nullptr); - hipStream_t hipStream = queue->get(); pi_result result = PI_SUCCESS; std::unique_ptr<_pi_event> event_ptr{nullptr}; try { ScopedContext active(queue->get_context()); - result = hip_piEnqueueEventsWait(queue, num_events_in_waitlist, - events_waitlist, nullptr); + pi_uint32 stream_token; + _pi_stream_guard guard; + hipStream_t hipStream = queue->get_next_compute_stream( + num_events_in_waitlist, events_waitlist, guard, &stream_token); + result = enqueueEventsWait(queue, hipStream, num_events_in_waitlist, + events_waitlist); if (event) { - event_ptr = std::unique_ptr<_pi_event>( - _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue)); + event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, hipStream, stream_token)); event_ptr->start(); } result = PI_CHECK_ERROR( @@ -4721,21 +4836,21 @@ pi_result hip_piextUSMEnqueueMemcpy(pi_queue queue, pi_bool blocking, pi_uint32 num_events_in_waitlist, const pi_event *events_waitlist, pi_event *event) { - assert(queue != nullptr); assert(dst_ptr != nullptr); assert(src_ptr != nullptr); - hipStream_t hipStream = queue->get(); pi_result result = PI_SUCCESS; + std::unique_ptr<_pi_event> event_ptr{nullptr}; try { ScopedContext active(queue->get_context()); - result = hip_piEnqueueEventsWait(queue, num_events_in_waitlist, - events_waitlist, nullptr); + hipStream_t hipStream = queue->get_next_transfer_stream(); + result = enqueueEventsWait(queue, hipStream, num_events_in_waitlist, + events_waitlist); if (event) { - event_ptr = std::unique_ptr<_pi_event>( - _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue)); + event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue, hipStream)); event_ptr->start(); } result = PI_CHECK_ERROR( @@ -4752,7 +4867,6 @@ pi_result hip_piextUSMEnqueueMemcpy(pi_queue queue, pi_bool blocking, } catch (pi_result err) { result = err; } - return result; } @@ -4767,17 +4881,17 @@ pi_result hip_piextUSMEnqueuePrefetch(pi_queue queue, const void *ptr, return PI_ERROR_INVALID_VALUE; assert(queue != nullptr); assert(ptr != nullptr); - hipStream_t hipStream = queue->get(); pi_result result = PI_SUCCESS; std::unique_ptr<_pi_event> event_ptr{nullptr}; try { ScopedContext active(queue->get_context()); - result = hip_piEnqueueEventsWait(queue, num_events_in_waitlist, - events_waitlist, nullptr); + hipStream_t hipStream = queue->get_next_transfer_stream(); + result = enqueueEventsWait(queue, hipStream, num_events_in_waitlist, + events_waitlist); if (event) { - event_ptr = std::unique_ptr<_pi_event>( - _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue)); + event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue, hipStream)); event_ptr->start(); } result = PI_CHECK_ERROR(hipMemPrefetchAsync( diff --git a/sycl/plugins/hip/pi_hip.hpp b/sycl/plugins/hip/pi_hip.hpp index 1519492f67598..96d2d8721d199 100644 --- a/sycl/plugins/hip/pi_hip.hpp +++ b/sycl/plugins/hip/pi_hip.hpp @@ -57,6 +57,8 @@ pi_result hip_piKernelRelease(pi_kernel); /// \endcond } +using _pi_stream_guard = std::unique_lock; + /// A PI platform stores all known PI devices, /// in the HIP plugin this is just a vector of /// available devices since initialization is done @@ -370,18 +372,47 @@ struct _pi_mem { /// struct _pi_queue { using native_type = hipStream_t; - - native_type stream_; + static constexpr int default_num_compute_streams = 64; + static constexpr int default_num_transfer_streams = 16; + + std::vector compute_streams_; + std::vector transfer_streams_; + // delay_compute_ keeps track of which streams have been recently reused and + // their next use should be delayed. If a stream has been recently reused it + // will be skipped the next time it would be selected round-robin style. When + // skipped, its delay flag is cleared. + std::vector delay_compute_; _pi_context *context_; _pi_device *device_; pi_queue_properties properties_; std::atomic_uint32_t refCount_; std::atomic_uint32_t eventCount_; - - _pi_queue(hipStream_t stream, _pi_context *context, _pi_device *device, - pi_queue_properties properties) - : stream_{stream}, context_{context}, device_{device}, - properties_{properties}, refCount_{1}, eventCount_{0} { + std::atomic_uint32_t compute_stream_idx_; + std::atomic_uint32_t transfer_stream_idx_; + unsigned int num_compute_streams_; + unsigned int num_transfer_streams_; + unsigned int last_sync_compute_streams_; + unsigned int last_sync_transfer_streams_; + unsigned int flags_; + // When compute_stream_sync_mutex_ and compute_stream_mutex_ both need to be + // locked at the same time, compute_stream_sync_mutex_ should be locked first + // to avoid deadlocks + std::mutex compute_stream_sync_mutex_; + std::mutex compute_stream_mutex_; + std::mutex transfer_stream_mutex_; + + _pi_queue(std::vector &&compute_streams, + std::vector &&transfer_streams, _pi_context *context, + _pi_device *device, pi_queue_properties properties, + unsigned int flags) + : compute_streams_{std::move(compute_streams)}, + transfer_streams_{std::move(transfer_streams)}, + delay_compute_(compute_streams_.size(), false), context_{context}, + device_{device}, properties_{properties}, refCount_{1}, eventCount_{0}, + compute_stream_idx_{0}, transfer_stream_idx_{0}, + num_compute_streams_{0}, num_transfer_streams_{0}, + last_sync_compute_streams_{0}, last_sync_transfer_streams_{0}, + flags_(flags) { hip_piContextRetain(context_); hip_piDeviceRetain(device_); } @@ -391,10 +422,136 @@ struct _pi_queue { hip_piDeviceRelease(device_); } - native_type get() const noexcept { return stream_; }; + // get_next_compute/transfer_stream() functions return streams from + // appropriate pools in round-robin fashion + native_type get_next_compute_stream(pi_uint32 *stream_token = nullptr); + // this overload tries select a stream that was used by one of dependancies. + // If that is not possible returns a new stream. If a stream is reused it + // returns a lock that needs to remain locked as long as the stream is in use + native_type get_next_compute_stream(pi_uint32 num_events_in_wait_list, + const pi_event *event_wait_list, + _pi_stream_guard &guard, + pi_uint32 *stream_token = nullptr); + native_type get_next_transfer_stream(); + native_type get() { return get_next_compute_stream(); }; + + bool has_been_synchronized(pi_uint32 stream_token) { + // stream token not associated with one of the compute streams + if (stream_token == std::numeric_limits::max()) { + return false; + } + return last_sync_compute_streams_ >= stream_token; + } + + bool can_reuse_stream(pi_uint32 stream_token) { + // stream token not associated with one of the compute streams + if (stream_token == std::numeric_limits::max()) { + return false; + } + // If the command represented by the stream token was not the last command + // enqueued to the stream we can not reuse the stream - we need to allow for + // commands enqueued after it and the one we are about to enqueue to run + // concurrently + bool is_last_command = + (compute_stream_idx_ - stream_token) <= compute_streams_.size(); + // If there was a barrier enqueued to the queue after the command + // represented by the stream token we should not reuse the stream, as we can + // not take that stream into account for the bookkeeping for the next + // barrier - such a stream would not be synchronized with. Performance-wise + // it does not matter that we do not reuse the stream, as the work + // represented by the stream token is guaranteed to be complete by the + // barrier before any work we are about to enqueue to the stream will start, + // so the event does not need to be synchronized with. + return is_last_command && !has_been_synchronized(stream_token); + } + + template void for_each_stream(T &&f) { + { + std::lock_guard compute_guard(compute_stream_mutex_); + unsigned int end = + std::min(static_cast(compute_streams_.size()), + num_compute_streams_); + for (unsigned int i = 0; i < end; i++) { + f(compute_streams_[i]); + } + } + { + std::lock_guard transfer_guard(transfer_stream_mutex_); + unsigned int end = + std::min(static_cast(transfer_streams_.size()), + num_transfer_streams_); + for (unsigned int i = 0; i < end; i++) { + f(transfer_streams_[i]); + } + } + } + + template void sync_streams(T &&f) { + auto sync_compute = [&f, &streams = compute_streams_, + &delay = delay_compute_](unsigned int start, + unsigned int stop) { + for (unsigned int i = start; i < stop; i++) { + f(streams[i]); + delay[i] = false; + } + }; + auto sync_transfer = [&f, &streams = transfer_streams_](unsigned int start, + unsigned int stop) { + for (unsigned int i = start; i < stop; i++) { + f(streams[i]); + } + }; + { + unsigned int size = static_cast(compute_streams_.size()); + std::lock_guard compute_sync_guard(compute_stream_sync_mutex_); + std::lock_guard compute_guard(compute_stream_mutex_); + unsigned int start = last_sync_compute_streams_; + unsigned int end = num_compute_streams_ < size + ? num_compute_streams_ + : compute_stream_idx_.load(); + last_sync_compute_streams_ = end; + if (end - start >= size) { + sync_compute(0, size); + } else { + start %= size; + end %= size; + if (start < end) { + sync_compute(start, end); + } else { + sync_compute(start, size); + sync_compute(0, end); + } + } + } + { + unsigned int size = static_cast(transfer_streams_.size()); + if (size > 0) { + std::lock_guard transfer_guard(transfer_stream_mutex_); + unsigned int start = last_sync_transfer_streams_; + unsigned int end = num_transfer_streams_ < size + ? num_transfer_streams_ + : transfer_stream_idx_.load(); + last_sync_transfer_streams_ = end; + if (end - start >= size) { + sync_transfer(0, size); + } else { + start %= size; + end %= size; + if (start < end) { + sync_transfer(start, end); + } else { + sync_transfer(start, size); + sync_transfer(0, end); + } + } + } + } + } _pi_context *get_context() const { return context_; }; + _pi_device *get_device() const { return device_; }; + pi_uint32 increment_reference_count() noexcept { return ++refCount_; } pi_uint32 decrement_reference_count() noexcept { return --refCount_; } @@ -422,6 +579,10 @@ struct _pi_event { pi_queue get_queue() const noexcept { return queue_; } + hipStream_t get_stream() const noexcept { return stream_; } + + pi_uint32 get_stream_token() const noexcept { return streamToken_; } + pi_command_type get_command_type() const noexcept { return commandType_; } pi_uint32 get_reference_count() const noexcept { return refCount_; } @@ -465,8 +626,11 @@ struct _pi_event { pi_uint64 get_end_time() const; // construct a native HIP. This maps closely to the underlying HIP event. - static pi_event make_native(pi_command_type type, pi_queue queue) { - return new _pi_event(type, queue->get_context(), queue); + static pi_event + make_native(pi_command_type type, pi_queue queue, hipStream_t stream, + pi_uint32 stream_token = std::numeric_limits::max()) { + return new _pi_event(type, queue->get_context(), queue, stream, + stream_token); } pi_result release(); @@ -476,14 +640,16 @@ struct _pi_event { private: // This constructor is private to force programmers to use the make_native / // make_user static members in order to create a pi_event for HIP. - _pi_event(pi_command_type type, pi_context context, pi_queue queue); + _pi_event(pi_command_type type, pi_context context, pi_queue queue, + hipStream_t stream, pi_uint32 stream_token); pi_command_type commandType_; // The type of command associated with event. std::atomic_uint32_t refCount_; // Event reference count. - bool isCompleted_; // Signifies whether the operations have completed - // + bool hasBeenWaitedOn_; // Signifies whether the event has been waited + // on through a call to wait(), which implies + // that it has completed. bool isRecorded_; // Signifies wether a native HIP event has been recorded // yet. @@ -491,6 +657,7 @@ struct _pi_event { // PI event has started or not // + pi_uint32 streamToken_; pi_uint32 eventId_; // Queue identifier of the event. native_type evEnd_; // HIP event handle. If this _pi_event represents a user @@ -504,6 +671,9 @@ struct _pi_event { pi_queue queue_; // pi_queue associated with the event. If this is a user // event, this will be nullptr. + hipStream_t stream_; // hipStream_t associated with the event. If this is a + // user event, this will be uninitialized. + pi_context context_; // pi_context associated with the event. If this is a // native event, this will be the same context associated // with the queue_ member.