Skip to content

Commit 6cee031

Browse files
Merge pull request #202 from maciekpac/release/ccl_2021.15.9-arc
Intel(R) oneAPI Collective Communications Library (oneCCL) 2021.15.9
2 parents c0bfc31 + adaa35b commit 6cee031

13 files changed

Lines changed: 155 additions & 40 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ endif()
343343

344344
set(CCL_MAJOR_VERSION "2021")
345345
set(CCL_MINOR_VERSION "15")
346-
set(CCL_UPDATE_VERSION "8")
346+
set(CCL_UPDATE_VERSION "9")
347347
set(CCL_PRODUCT_STATUS "Gold")
348348
string(TIMESTAMP CCL_PRODUCT_BUILD_DATE "%Y-%m-%dT %H:%M:%SZ")
349349
get_vcs_properties("git")

man/doxconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PROJECT_NAME = "Intel® oneAPI Collective Communications Library"
2-
PROJECT_NUMBER = "2021.15.8"
2+
PROJECT_NUMBER = "2021.15.9"
33

44
INPUT = ../src/common/env/vars.hpp ../src/common/env/vars_experimental.hpp
55

src/atl/ofi/atl_ofi_comm.cpp

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,26 @@ atl_status_t atl_ofi_comm::barrier(size_t ep_idx, atl_req_t& req) {
5959
int tag_comm_id = (comm_id != atl_comm_id_storage::invalid_comm_id)
6060
? comm_id
6161
: atl_comm_id_storage::max_comm_id;
62-
int tagc = tag_counter++;
62+
int tagc = tag_counter_barrier++;
63+
64+
LOG_DEBUG("ofi_barrier: comm_rank: ",
65+
rank,
66+
", comm_size: ",
67+
size,
68+
", comm_id: ",
69+
comm_id,
70+
", tag_comm_id: ",
71+
tag_comm_id,
72+
", tag_counter: ",
73+
tagc);
74+
6375
int src, dst;
6476
const int len = 1;
6577
char sendbuf[len], recvbuf[len];
6678
int mask = 0x1;
6779
while (mask < size) {
6880
dst = (rank + mask) % size;
69-
src = (rank - mask + size) % size;
81+
src = (rank + size - mask) % size;
7082
atl_req send_req, recv_req;
7183
uint64_t op_tag = tag_creator->create(rank, tag_comm_id, tagc, 1);
7284
do {
@@ -98,6 +110,17 @@ atl_status_t atl_ofi_comm::barrier(size_t ep_idx, atl_req_t& req) {
98110
mask <<= 1;
99111
}
100112

113+
LOG_DEBUG("ofi_barrier done: comm_rank: ",
114+
rank,
115+
", comm_size: ",
116+
size,
117+
", comm_id: ",
118+
comm_id,
119+
", tag_comm_id: ",
120+
tag_comm_id,
121+
", tag_counter: ",
122+
tagc);
123+
101124
ofi_req->comp_state = ATL_OFI_COMP_COMPLETED;
102125
return ATL_STATUS_SUCCESS;
103126
}
@@ -205,6 +228,19 @@ atl_status_t atl_ofi_comm::allgatherv(size_t ep_idx,
205228
atl_ofi_req_t* ofi_req = ((atl_ofi_req_t*)req.internal);
206229
ofi_req->comp_state = ATL_OFI_COMP_COMPLETED;
207230

231+
LOG_DEBUG("ofi_allgatherv done: comm_rank: ",
232+
rank,
233+
", comm_size: ",
234+
size,
235+
", send_len: ",
236+
send_len,
237+
", comm_id: ",
238+
comm_id,
239+
", tag_comm_id: ",
240+
tag_comm_id,
241+
", tag_counter: ",
242+
tag_counter);
243+
208244
tag_counter++;
209245

210246
return ATL_STATUS_SUCCESS;

src/atl/ofi/atl_ofi_comm.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,5 @@ class atl_ofi_comm : public atl_base_comm {
185185
atl_status_t init_transport(bool is_new);
186186

187187
uint64_t tag_counter = 0;
188+
uint64_t tag_counter_barrier = 0;
188189
};

src/coll/algorithms/allgatherv/sycl/allgatherv_large_sycl.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,16 @@ ccl::event allgatherv_large(sycl::queue& q,
3333
std::shared_ptr<ccl_comm> pair_comm = comm->get_pair_comm();
3434
std::shared_ptr<ccl_comm> even_comm = comm->get_even_comm();
3535

36+
bool is_arc = is_arc_card(ccl::ze::get_device_family(global_stream->get_ze_device()));
37+
3638
const size_t dsize = ccl::global_data::get().dtypes->get(dtype).size();
3739
// use full vector (>= 8 bytes) if buffers and data size are 4 byte aligned
3840
bool use_full_vector = can_use_full_vector(send_buf, recv_buf, send_count * dsize);
3941
// TODO : generalize constraints for different hardware.
4042
// kernels with remote access is best performant at 64 bytes alignment (sycl_kernels_line_size/2) on PVC
41-
const size_t align_size = ccl::global_data::env().sycl_kernels_line_size / 2;
43+
// for BMG, relax the alignment check so that we don't change the path
44+
// tmp_buf = 1 path is not implemented in this branch
45+
const size_t align_size = is_arc ? dsize : ccl::global_data::env().sycl_kernels_line_size / 2;
4246
const bool is_aligned = (send_count * dsize) % align_size == 0;
4347
// use tmp buf for types < 4 byte size with odd count or non 4 byte aligned data
4448
// use tmp buf when data count bytes is not 64 byte aligned
@@ -47,6 +51,7 @@ ccl::event allgatherv_large(sycl::queue& q,
4751
((!use_full_vector || !is_aligned) && ccl::global_data::env().sycl_auto_use_tmp_buf);
4852

4953
if (is_tmp_used) {
54+
CCL_THROW_IF_NOT(!is_arc);
5055
CCL_THROW_IF_NOT(!ccl::global_data::env().sycl_copy_engine,
5156
"allgatherv using copy engines not supported with tmp buffer");
5257

@@ -72,7 +77,7 @@ ccl::event allgatherv_large(sycl::queue& q,
7277
}
7378
}
7479

75-
if (is_arc_card(ccl::ze::get_device_family(global_stream->get_ze_device()))) {
80+
if (is_arc) {
7681
// only need output buffer
7782
std::vector<void*> ptrs{ recv_buf }; // index 0
7883
auto [sched, exchange_entry] = do_ipc_exchange(comm, global_stream, ptrs);

src/coll/algorithms/allreduce/sycl/allreduce_large_sycl_ring.hpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -332,16 +332,18 @@ void allreduce_large_su_ring_write_multi_kernel(const void *send_buf,
332332
std::vector<sycl::event> dep_events = get_sycl_events(deps);
333333
sycl::event work_event;
334334

335-
const int pipeline_size = N;
335+
const int pipeline_size = std::max(N, ccl_large_tmp_bufs::buf_count);
336336

337337
const size_t my_offset_count = count_per_rank * rank;
338338
const size_t recv_count = count_per_rank;
339339
const size_t recv_bytes = recv_count * dsize;
340340
void *recv_buf_rank_offset = (T *)recv_buf + my_offset_count;
341341
void *peer_recv_buf_rank_offset = (T *)peer_recv + my_offset_count;
342-
const size_t chunk_size = ccl::global_data::env().sycl_tmp_buf_size / pipeline_size;
343-
const size_t rem_chunk_size = recv_bytes % chunk_size;
344-
const size_t num_chunks = recv_bytes / chunk_size + (rem_chunk_size != 0);
342+
const size_t chunk_size = ccl::global_data::env().sycl_tmp_buf_size / pipeline_size /
343+
sizeof(message_t) * sizeof(message_t);
344+
const size_t chunk_count = chunk_size / dsize;
345+
const size_t rem_chunk_count = recv_count % chunk_count;
346+
const size_t num_chunks = recv_count / chunk_count + (rem_chunk_count != 0);
345347

346348
std::array<void *, ARC_MAX_NUM> work_bufs;
347349
std::array<void *, ARC_MAX_NUM> remote_work_bufs;
@@ -356,13 +358,12 @@ void allreduce_large_su_ring_write_multi_kernel(const void *send_buf,
356358

357359
int slot = 0;
358360
for (size_t nc = 0; nc < num_chunks; nc++) {
359-
const size_t chunk_offset = nc * chunk_size;
360-
const size_t data_count =
361-
((nc < recv_bytes / chunk_size) ? chunk_size : rem_chunk_size) / dsize;
361+
const size_t chunk_offset = nc * chunk_count * dsize;
362+
const size_t data_count = (nc < recv_count / chunk_count) ? chunk_count : rem_chunk_count;
362363

363364
// starting indexes
364-
int s = (rank - 1 + N) % N;
365-
int r = (s - 1 + N) % N;
365+
int s = (rank + N - 1) % N;
366+
int r = (s + N - 1) % N;
366367

367368
for (int i = 0; i < N - 1; i++) {
368369
int next_slot = (slot + 1) % pipeline_size;
@@ -413,7 +414,7 @@ void allreduce_large_su_ring_write_multi_kernel(const void *send_buf,
413414
}
414415

415416
s = r;
416-
r = (s - 1 + N) % N;
417+
r = (s + N - 1) % N;
417418
slot = next_slot;
418419
}
419420
}

src/coll/algorithms/recv/sycl/recv_pcie.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,33 @@ ccl::event recv_ll(const void *recv_buf,
3434
const int comm_size = node_comm->size();
3535
const int comm_rank = node_comm->rank();
3636

37-
// coll_init(comm, global_stream);
38-
3937
auto ccl_dtype = ccl::global_data::get().dtypes->get(dtype);
4038
size_t dt_sz = ccl_dtype.size();
4139
size_t recv_size = recv_count * ccl_dtype.size();
4240

4341
bool p2p = node_comm->get_topo_manager().has_p2p_access();
4442
uint32_t pattern = node_comm->get_rt_pattern(pattern_type::recv, peer_rank);
4543

44+
LOG_DEBUG("recv_ll recv_count: ", recv_count, " peer rank: ", peer_rank);
45+
4646
std::vector<sycl::event> dep_events = get_sycl_events(deps);
4747

4848
auto lambda = [&]<typename T, template <typename, int> class Proto>(int NRanks) {
4949
T *peerbuf0[NRanks];
5050
T *peerbuf1[NRanks];
51+
T *ipcbuf0;
52+
T *ipcbuf1;
53+
// use small tmp buffer. Can not use large tmp buffer because
54+
// there is only single copy of the buffer.
55+
// Otherwise a barrier is needed her
56+
auto [local_tmp_buf, remote_ptrs] = node_comm->get_all_tmp_bufs(true);
5157
for (int i = 0; i < NRanks; i++) {
52-
peerbuf0[i] = (T *)get_remote_node_tmp_buf(0, comm)[i];
53-
peerbuf1[i] = (T *)get_remote_node_tmp_buf(1, comm)[i];
58+
peerbuf0[i] = (T *)remote_ptrs[i];
59+
peerbuf1[i] = (T *)((char *)remote_ptrs[i] + ccl_tmp_bufs::buf_size / 2);
5460
}
55-
T *ipcbuf0 = (T *)get_tmp_buf(0, comm);
56-
T *ipcbuf1 = (T *)get_tmp_buf(1, comm);
61+
ipcbuf0 = (T *)local_tmp_buf;
62+
ipcbuf1 = (T *)((char *)local_tmp_buf + ccl_tmp_bufs::buf_size / 2);
63+
5764
sycl::event e = Recv<T, Proto, RingTransmit>::launch(NRanks,
5865
(T *)recv_buf,
5966
ipcbuf0,

src/coll/algorithms/send/sycl/send_pcie.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,33 @@ ccl::event send_ll(const void *send_buf,
3434
const int comm_size = node_comm->size();
3535
const int comm_rank = node_comm->rank();
3636

37-
// coll_init(comm, global_stream);
38-
3937
auto ccl_dtype = ccl::global_data::get().dtypes->get(dtype);
4038
size_t dt_sz = ccl_dtype.size();
4139
size_t send_size = send_count * ccl_dtype.size();
4240

4341
bool p2p = node_comm->get_topo_manager().has_p2p_access();
4442
uint32_t pattern = node_comm->get_rt_pattern(pattern_type::send, peer_rank);
4543

44+
LOG_DEBUG("send_ll send_count: ", send_count, " peer rank: ", peer_rank);
45+
4646
std::vector<sycl::event> dep_events = get_sycl_events(deps);
4747

4848
auto lambda = [&]<typename T, template <typename, int> class Proto>(int NRanks) {
4949
T *peerbuf0[NRanks];
5050
T *peerbuf1[NRanks];
51+
T *ipcbuf0;
52+
T *ipcbuf1;
53+
// use small tmp buffer. Can not use large tmp buffer because
54+
// there is only single copy of the buffer.
55+
// Otherwise a barrier is needed here
56+
auto [local_tmp_buf, remote_ptrs] = node_comm->get_all_tmp_bufs(true);
5157
for (int i = 0; i < NRanks; i++) {
52-
peerbuf0[i] = (T *)get_remote_node_tmp_buf(0, comm)[i];
53-
peerbuf1[i] = (T *)get_remote_node_tmp_buf(1, comm)[i];
58+
peerbuf0[i] = (T *)remote_ptrs[i];
59+
peerbuf1[i] = (T *)((char *)remote_ptrs[i] + ccl_tmp_bufs::buf_size / 2);
5460
}
55-
T *ipcbuf0 = (T *)get_tmp_buf(0, comm);
56-
T *ipcbuf1 = (T *)get_tmp_buf(1, comm);
61+
ipcbuf0 = (T *)local_tmp_buf;
62+
ipcbuf1 = (T *)((char *)local_tmp_buf + ccl_tmp_bufs::buf_size / 2);
63+
5764
sycl::event e = Send<T, Proto, RingTransmit>::launch(NRanks,
5865
(T *)send_buf,
5966
ipcbuf0,

src/coll/algorithms/utils/sycl_coll_base.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ void coll_init(ccl_comm *comm, ccl_stream *global_stream) {
331331

332332
//set up temp buf to be used for large collectives
333333
// WA : use smaller tmp buffer for client GPUs
334-
if (is_arc &&
334+
if (is_arc && ccl::global_data::env().sycl_ll_buffer_global &&
335335
ccl::global_data::env().sycl_tmp_buf_size == tmp_bufs_count * 128 * 1024 * 1024) {
336336
ccl::global_data::env().sycl_tmp_buf_size =
337337
tmp_bufs_count * calculate_ll_buf_size(q);
@@ -607,7 +607,8 @@ void coll_initExt(ccl_comm *comm,
607607
//set up temp buf to be used for large collectives
608608
// WA : use smaller tmp buffer for client GPUs
609609
if (is_arc_card(ccl::ze::get_device_family(global_stream->get_ze_device())) &&
610-
ccl::global_data::env().sycl_tmp_buf_size == 3 * 128 * 1024 * 1024) {
610+
ccl::global_data::env().sycl_ll_buffer_global &&
611+
ccl::global_data::env().sycl_tmp_buf_size == tmp_bufs_count * 128 * 1024 * 1024) {
611612
ccl::global_data::env().sycl_tmp_buf_size =
612613
tmp_bufs_count * calculate_ll_buf_size(q);
613614
LOG_DEBUG("MT: Allocate LL ring buffer of size: ",

src/coll/algorithms/utils/transmit/ring_transmit.hpp

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,20 @@ class RingTransmit : public Proto<T, SubGroupSize> {
181181
retry |= recvMessages(messages, localScatterSink[peer][slot][wireId], flag);
182182
} while (sycl::any_of_group(sycl::ext::oneapi::this_work_item::get_sub_group(), retry));
183183

184-
shuffleData(v);
185-
accumMessages(v, messages);
184+
if constexpr (sizeof(T) > sizeof(flag)) {
185+
// restore data, accumulate, shuffle
186+
restoreData(messages);
187+
accumMessages(v, messages);
188+
shuffleData(v);
189+
}
190+
else {
191+
// datasize is smaller than flag,
192+
// no overlap between datatype and flag
193+
// can accumulate shuffled data:
194+
// less ops to perform, faster
195+
shuffleData(v);
196+
accumMessages(v, messages);
197+
}
186198
insertFlags(v, flag);
187199

188200
sendMessages(scatterSink[peer][slot][wireId], v);
@@ -208,8 +220,20 @@ class RingTransmit : public Proto<T, SubGroupSize> {
208220
retry |= recvMessages(messages, localScatterSink[peer][slot][wireId], flag);
209221
} while (sycl::any_of_group(sycl::ext::oneapi::this_work_item::get_sub_group(), retry));
210222

211-
shuffleData(v);
212-
accumMessages(v, messages);
223+
if constexpr (sizeof(T) > sizeof(flag)) {
224+
// restore data, accumulate, shuffle
225+
restoreData(messages);
226+
accumMessages(v, messages);
227+
shuffleData(v);
228+
}
229+
else {
230+
// datasize is smaller than flag,
231+
// no overlap between datatype and flag
232+
// can accumulate shuffled data:
233+
// less ops to perform, faster
234+
shuffleData(v);
235+
accumMessages(v, messages);
236+
}
213237

214238
insertFlags(v, flag);
215239
sendMessages(gatherSink[peer][slot][wireId], v);
@@ -294,8 +318,20 @@ class RingTransmit : public Proto<T, SubGroupSize> {
294318
retry |= recvMessages(messages, localScatterSink[peer][slot][wireId], flag);
295319
} while (sycl::any_of_group(sycl::ext::oneapi::this_work_item::get_sub_group(), retry));
296320

297-
shuffleData(v);
298-
accumMessages(v, messages);
321+
if constexpr (sizeof(T) > sizeof(flag)) {
322+
// restore data, accumulate, shuffle
323+
restoreData(messages);
324+
accumMessages(v, messages);
325+
shuffleData(v);
326+
}
327+
else {
328+
// datasize is smaller than flag,
329+
// no overlap between datatype and flag
330+
// can accumulate shuffled data:
331+
// less ops to perform, faster
332+
shuffleData(v);
333+
accumMessages(v, messages);
334+
}
299335

300336
insertFlags(v, flag);
301337
restoreData(v);

0 commit comments

Comments
 (0)