Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 80 additions & 19 deletions sycl/plugins/cuda/pi_cuda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ pi_result enqueueEventsWait(pi_queue command_queue, CUstream stream,
auto result = forLatestEvents(
event_wait_list, num_events_in_wait_list,
[stream](pi_event event) -> pi_result {
return PI_CHECK_ERROR(cuStreamWaitEvent(stream, event->get(), 0));
if (event->get_stream() == stream) {
return PI_SUCCESS;
} else {
return PI_CHECK_ERROR(cuStreamWaitEvent(stream, event->get(), 0));
}
});

if (result != PI_SUCCESS) {
Expand Down Expand Up @@ -367,18 +371,58 @@ pi_result cuda_piEventRetain(pi_event event);

/// \endcond

CUstream _pi_queue::get_next_compute_stream() {
if (num_compute_streams_ < compute_streams_.size()) {
// the check above is for performance - so as not to lock mutex every time
std::lock_guard<std::mutex> guard(compute_stream_mutex_);
// The second check is done after mutex is locked so other threads can not
// change num_compute_streams_ after that
CUstream _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) {
pi_uint32 stream_i;
while (true) {
if (num_compute_streams_ < compute_streams_.size()) {
PI_CHECK_ERROR(
cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_));
// the check above is for performance - so as not to lock mutex every time
std::lock_guard<std::mutex> guard(compute_stream_mutex_);
// The second check is done after mutex is locked so other threads can not
// change num_compute_streams_ after that
if (num_compute_streams_ < compute_streams_.size()) {
PI_CHECK_ERROR(
cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_));
}
}
stream_i = compute_stream_idx_++;
// if a stream has been reused before it was next selected round-robin
// fashion, we want to delay its next use and instead select another one
// that is more likely to have completed all the enqueued work.
if (delay_compute_[stream_i % compute_streams_.size()]) {
delay_compute_[stream_i % compute_streams_.size()] = false;
} else {
break;
}
}
return compute_streams_[compute_stream_idx_++ % compute_streams_.size()];
if (stream_token) {
*stream_token = stream_i;
}
return compute_streams_[stream_i % compute_streams_.size()];
}

CUstream _pi_queue::get_next_compute_stream(pi_uint32 num_events_in_wait_list,
const pi_event *event_wait_list,
_pi_stream_guard &guard,
pi_uint32 *stream_token) {
for (pi_uint32 i = 0; i < num_events_in_wait_list; i++) {
pi_uint32 token = event_wait_list[i]->get_stream_token();
if (event_wait_list[i]->get_queue() == this && can_reuse_stream(token)) {
std::unique_lock<std::mutex> compute_sync_guard(
compute_stream_sync_mutex_);
// redo the check after lock to avoid data races on
// last_sync_compute_streams_
if (can_reuse_stream(token)) {
delay_compute_[token % delay_compute_.size()] = true;
if (stream_token) {
*stream_token = token;
}
guard = _pi_stream_guard{std::move(compute_sync_guard)};
return event_wait_list[i]->get_stream();
}
}
}
guard = {};
return get_next_compute_stream(stream_token);
}

CUstream _pi_queue::get_next_transfer_stream() {
Expand All @@ -399,9 +443,10 @@ CUstream _pi_queue::get_next_transfer_stream() {
}

_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue,
CUstream stream)
CUstream stream, pi_uint32 stream_token)
: commandType_{type}, refCount_{1}, hasBeenWaitedOn_{false},
isRecorded_{false}, isStarted_{false}, evEnd_{nullptr}, evStart_{nullptr},
isRecorded_{false}, isStarted_{false},
streamToken_{stream_token}, evEnd_{nullptr}, evStart_{nullptr},
evQueued_{nullptr}, queue_{queue}, stream_{stream}, context_{context} {

bool profilingEnabled = queue_->properties_ & PI_QUEUE_PROFILING_ENABLE;
Expand Down Expand Up @@ -2838,7 +2883,10 @@ pi_result cuda_piEnqueueKernelLaunch(

std::unique_ptr<_pi_event> retImplEv{nullptr};

CUstream cuStream = command_queue->get_next_compute_stream();
pi_uint32 stream_token;
_pi_stream_guard guard;
CUstream cuStream = command_queue->get_next_compute_stream(
num_events_in_wait_list, event_wait_list, guard, &stream_token);
CUfunction cuFunc = kernel->get();

retError = enqueueEventsWait(command_queue, cuStream,
Expand All @@ -2863,8 +2911,9 @@ pi_result cuda_piEnqueueKernelLaunch(
auto &argIndices = kernel->get_arg_indices();

if (event) {
retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native(
PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue, cuStream));
retImplEv = std::unique_ptr<_pi_event>(
_pi_event::make_native(PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue,
cuStream, stream_token));
retImplEv->start();
}

Expand Down Expand Up @@ -3699,7 +3748,12 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue,
auto result =
forLatestEvents(event_wait_list, num_events_in_wait_list,
[command_queue](pi_event event) -> pi_result {
return enqueueEventWait(command_queue, event);
if (event->get_queue()->has_been_synchronized(
event->get_stream_token())) {
return PI_SUCCESS;
} else {
return enqueueEventWait(command_queue, event);
}
});

if (result != PI_SUCCESS) {
Expand All @@ -3708,8 +3762,12 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue,
}

if (event) {
pi_uint32 stream_token;
_pi_stream_guard guard;
CUstream cuStream = command_queue->get_next_compute_stream(
num_events_in_wait_list, event_wait_list, guard, &stream_token);
*event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue,
command_queue->get_next_compute_stream());
cuStream, stream_token);
(*event)->start();
(*event)->record();
}
Expand Down Expand Up @@ -4767,12 +4825,15 @@ pi_result cuda_piextUSMEnqueueMemset(pi_queue queue, void *ptr, pi_int32 value,

try {
ScopedContext active(queue->get_context());
CUstream cuStream = queue->get_next_compute_stream();
pi_uint32 stream_token;
_pi_stream_guard guard;
CUstream cuStream = queue->get_next_compute_stream(
num_events_in_waitlist, events_waitlist, guard, &stream_token);
result = enqueueEventsWait(queue, cuStream, num_events_in_waitlist,
events_waitlist);
if (event) {
event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native(
PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, cuStream));
PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, cuStream, stream_token));
event_ptr->start();
}
result = PI_CHECK_ERROR(cuMemsetD8Async(
Expand Down
95 changes: 79 additions & 16 deletions sycl/plugins/cuda/pi_cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pi_result cuda_piKernelGetGroupInfo(pi_kernel kernel, pi_device device,
/// \endcond
}

using _pi_stream_guard = std::unique_lock<std::mutex>;

/// A PI platform stores all known PI devices,
/// in the CUDA plugin this is just a vector of
/// available devices since initialization is done
Expand Down Expand Up @@ -382,6 +384,11 @@ struct _pi_queue {

std::vector<native_type> compute_streams_;
std::vector<native_type> transfer_streams_;
// delay_compute_ keeps track of which streams have been recently reused and
// their next use should be delayed. If a stream has been recently reused it
// will be skipped the next time it would be selected round-robin style. When
// skipped, its delay flag is cleared.
std::vector<bool> delay_compute_;
_pi_context *context_;
_pi_device *device_;
pi_queue_properties properties_;
Expand All @@ -394,6 +401,10 @@ struct _pi_queue {
unsigned int last_sync_compute_streams_;
unsigned int last_sync_transfer_streams_;
unsigned int flags_;
// When compute_stream_sync_mutex_ and compute_stream_mutex_ both need to be
// locked at the same time, compute_stream_sync_mutex_ should be locked first
// to avoid deadlocks
std::mutex compute_stream_sync_mutex_;
std::mutex compute_stream_mutex_;
std::mutex transfer_stream_mutex_;

Expand All @@ -402,7 +413,8 @@ struct _pi_queue {
_pi_device *device, pi_queue_properties properties,
unsigned int flags)
: compute_streams_{std::move(compute_streams)},
transfer_streams_{std::move(transfer_streams)}, context_{context},
transfer_streams_{std::move(transfer_streams)},
delay_compute_(compute_streams_.size(), false), context_{context},
device_{device}, properties_{properties}, refCount_{1}, eventCount_{0},
compute_stream_idx_{0}, transfer_stream_idx_{0},
num_compute_streams_{0}, num_transfer_streams_{0},
Expand All @@ -419,10 +431,47 @@ struct _pi_queue {

// get_next_compute/transfer_stream() functions return streams from
// appropriate pools in round-robin fashion
native_type get_next_compute_stream();
native_type get_next_compute_stream(pi_uint32 *stream_token = nullptr);
// this overload tries select a stream that was used by one of dependancies.
// If that is not possible returns a new stream. If a stream is reused it
// returns a lock that needs to remain locked as long as the stream is in use
native_type get_next_compute_stream(pi_uint32 num_events_in_wait_list,
const pi_event *event_wait_list,
_pi_stream_guard &guard,
pi_uint32 *stream_token = nullptr);
native_type get_next_transfer_stream();
native_type get() { return get_next_compute_stream(); };

bool has_been_synchronized(pi_uint32 stream_token) {
// stream token not associated with one of the compute streams
if (stream_token == std::numeric_limits<pi_uint32>::max()) {
return false;
}
return last_sync_compute_streams_ >= stream_token;
}

bool can_reuse_stream(pi_uint32 stream_token) {
// stream token not associated with one of the compute streams
if (stream_token == std::numeric_limits<pi_uint32>::max()) {
return true;
}
// If the command represented by the stream token was not the last command
// enqueued to the stream we can not reuse the stream - we need to allow for
// commands enqueued after it and the one we are about to enqueue to run
// concurrently
bool is_last_command =
(compute_stream_idx_ - stream_token) <= compute_streams_.size();
// If there was a barrier enqueued to the queue after the command
// represented by the stream token we should not reuse the stream, as we can
// not take that stream into account for the bookkeeping for the next
// barrier - such a stream would not be synchronized with. Performance-wise
// it does not matter that we do not reuse the stream, as the work
// represented by the stream token is guaranteed to be complete by the
// barrier before any work we are about to enqueue to the stream will start,
// so the event does not need to be synchronized with.
return is_last_command && !has_been_synchronized(stream_token);
}

template <typename T> void for_each_stream(T &&f) {
{
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);
Expand All @@ -445,30 +494,39 @@ struct _pi_queue {
}

template <typename T> void sync_streams(T &&f) {
auto sync = [&f](const std::vector<CUstream> &streams, unsigned int start,
unsigned int stop) {
auto sync_compute = [&f, &streams = compute_streams_,
&delay = delay_compute_](unsigned int start,
unsigned int stop) {
for (unsigned int i = start; i < stop; i++) {
f(streams[i]);
delay[i] = false;
}
};
auto sync_transfer = [&f, &streams = transfer_streams_](unsigned int start,
unsigned int stop) {
for (unsigned int i = start; i < stop; i++) {
f(streams[i]);
}
};
{
unsigned int size = static_cast<unsigned int>(compute_streams_.size());
std::lock_guard compute_sync_guard(compute_stream_sync_mutex_);
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);
unsigned int start = last_sync_compute_streams_;
unsigned int end = num_compute_streams_ < size
? num_compute_streams_
: compute_stream_idx_.load();
last_sync_compute_streams_ = end;
if (end - start >= size) {
sync(compute_streams_, 0, size);
sync_compute(0, size);
} else {
start %= size;
end %= size;
if (start < end) {
sync(compute_streams_, start, end);
sync_compute(start, end);
} else {
sync(compute_streams_, start, size);
sync(compute_streams_, 0, end);
sync_compute(start, size);
sync_compute(0, end);
}
}
}
Expand All @@ -482,15 +540,15 @@ struct _pi_queue {
: transfer_stream_idx_.load();
last_sync_transfer_streams_ = end;
if (end - start >= size) {
sync(transfer_streams_, 0, size);
sync_transfer(0, size);
} else {
start %= size;
end %= size;
if (start < end) {
sync(transfer_streams_, start, end);
sync_transfer(start, end);
} else {
sync(transfer_streams_, start, size);
sync(transfer_streams_, 0, end);
sync_transfer(start, size);
sync_transfer(0, end);
}
}
}
Expand Down Expand Up @@ -530,6 +588,8 @@ struct _pi_event {

CUstream get_stream() const noexcept { return stream_; }

pi_uint32 get_stream_token() const noexcept { return streamToken_; }

pi_command_type get_command_type() const noexcept { return commandType_; }

pi_uint32 get_reference_count() const noexcept { return refCount_; }
Expand Down Expand Up @@ -573,9 +633,11 @@ struct _pi_event {
pi_uint64 get_end_time() const;

// construct a native CUDA. This maps closely to the underlying CUDA event.
static pi_event make_native(pi_command_type type, pi_queue queue,
CUstream stream) {
return new _pi_event(type, queue->get_context(), queue, stream);
static pi_event
make_native(pi_command_type type, pi_queue queue, CUstream stream,
pi_uint32 stream_token = std::numeric_limits<pi_uint32>::max()) {
return new _pi_event(type, queue->get_context(), queue, stream,
stream_token);
}

pi_result release();
Expand All @@ -586,7 +648,7 @@ struct _pi_event {
// This constructor is private to force programmers to use the make_native /
// make_user static members in order to create a pi_event for CUDA.
_pi_event(pi_command_type type, pi_context context, pi_queue queue,
CUstream stream);
CUstream stream, pi_uint32 stream_token);

pi_command_type commandType_; // The type of command associated with event.

Expand All @@ -602,6 +664,7 @@ struct _pi_event {
// PI event has started or not
//

pi_uint32 streamToken_;
pi_uint32 eventId_; // Queue identifier of the event.

native_type evEnd_; // CUDA event handle. If this _pi_event represents a user
Expand Down