Skip to content

Commit cc7e39c

Browse files
committed
ch4/ofi: fix thread critical sections in rndv algorithms
MPIX async progress is invoked without vci critical section. Make sure to enter VCI CS when we need access genq private pool, call libfabric, or free request.
1 parent 1078363 commit cc7e39c

File tree

3 files changed

+53
-16
lines changed

3 files changed

+53
-16
lines changed

src/mpid/ch4/netmod/ofi/ofi_pipeline.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,17 @@ int MPIDI_OFI_pipeline_recv_chunk_event(struct fi_cq_tagged_entry *wc, MPIR_Requ
148148
/* async send chunks until done */
149149
static int pipeline_send_poll(MPIX_Async_thing thing)
150150
{
151+
int ret = MPIX_ASYNC_NOPROGRESS;
151152
MPIR_Request *sreq = MPIR_Async_thing_get_state(thing);
152153
MPIDI_OFI_pipeline_t *p = &MPIDI_OFI_AMREQ_PIPELINE(sreq);
153154

155+
/* CS required for genq pool and gpu imemcpy */
156+
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(p->vci_local));
157+
154158
while (p->u.send.copy_offset < p->remote_data_sz) {
155159
/* limit copy_infly so it doesn't overwhelm async progress */
156160
if (p->u.send.copy_infly >= MPIDI_OFI_PIPILINE_INFLY_CHUNKS) {
157-
return MPIX_ASYNC_NOPROGRESS;
161+
goto fn_exit;
158162
}
159163

160164
void *chunk_buf;
@@ -167,7 +171,7 @@ static int pipeline_send_poll(MPIX_Async_thing thing)
167171
MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.per_vci[p->vci_local].pipeline_pool,
168172
&chunk_buf);
169173
if (!chunk_buf) {
170-
return MPIX_ASYNC_NOPROGRESS;
174+
goto fn_exit;
171175
}
172176

173177
/* async copy */
@@ -186,7 +190,10 @@ static int pipeline_send_poll(MPIX_Async_thing thing)
186190
p->u.send.copy_infly++;
187191
}
188192

189-
return MPIX_ASYNC_DONE;
193+
ret = MPIX_ASYNC_DONE;
194+
fn_exit:
195+
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(p->vci_local));
196+
return ret;
190197
}
191198

192199
/* ---- send_copy ---- */
@@ -325,10 +332,14 @@ static int pipeline_recv_poll(MPIX_Async_thing thing)
325332
return MPIX_ASYNC_NOPROGRESS;
326333
}
327334

335+
int ret = MPIX_ASYNC_NOPROGRESS;
336+
/* CS required for genq pool and gpu imemcpy */
337+
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(p->vci_local));
338+
328339
while (p->u.recv.recv_offset < p->remote_data_sz) {
329340
/* only need issue enough recv_infly to match send_infly */
330341
if (p->u.recv.recv_infly >= MPIDI_OFI_PIPILINE_INFLY_CHUNKS) {
331-
return MPIX_ASYNC_NOPROGRESS;
342+
goto fn_exit;
332343
}
333344

334345
void *chunk_buf;
@@ -341,7 +352,7 @@ static int pipeline_recv_poll(MPIX_Async_thing thing)
341352
MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.per_vci[p->vci_local].pipeline_pool,
342353
&chunk_buf);
343354
if (!chunk_buf) {
344-
return MPIX_ASYNC_NOPROGRESS;
355+
goto fn_exit;
345356
}
346357

347358
struct recv_chunk_req *chunk_req =
@@ -367,10 +378,13 @@ static int pipeline_recv_poll(MPIX_Async_thing thing)
367378
p->u.recv.recv_infly++;
368379
}
369380

370-
return MPIX_ASYNC_DONE;
381+
ret = MPIX_ASYNC_DONE;
382+
fn_exit:
383+
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(p->vci_local));
384+
return ret;
371385
fn_fail:
372386
MPIR_Assert(0);
373-
return MPIX_ASYNC_NOPROGRESS;
387+
goto fn_exit;
374388
}
375389

376390
static void recv_chunk_copy(MPIR_Request * rreq, void *chunk_buf, MPI_Aint chunk_sz,
@@ -433,6 +447,7 @@ static int recv_copy_poll(MPIX_Async_thing thing)
433447
static void recv_copy_complete(MPIR_Request * rreq, void *chunk_buf, MPI_Aint chunk_sz)
434448
{
435449
MPIDI_OFI_pipeline_t *p = &MPIDI_OFI_AMREQ_PIPELINE(rreq);
450+
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(p->vci_local));
436451

437452
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.per_vci[p->vci_local].pipeline_pool,
438453
chunk_buf);
@@ -442,4 +457,5 @@ static void recv_copy_complete(MPIR_Request * rreq, void *chunk_buf, MPI_Aint ch
442457
MPIR_Datatype_release_if_not_builtin(p->datatype);
443458
MPIDI_Request_complete_fast(rreq);
444459
}
460+
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(p->vci_local));
445461
}

src/mpid/ch4/netmod/ofi/ofi_rndv_read.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,18 @@ int MPIDI_OFI_rndvread_recv_mrs_event(struct fi_cq_tagged_entry *wc, MPIR_Reques
158158

159159
static int rndvread_read_poll(MPIX_Async_thing thing)
160160
{
161+
int ret = MPIX_ASYNC_NOPROGRESS;
161162
int mpi_errno = MPI_SUCCESS;
162163
MPIR_Request *rreq = MPIR_Async_thing_get_state(thing);
163164
MPIDI_OFI_rndvread_t *p = &MPIDI_OFI_AMREQ_READ(rreq);
164165

166+
/* CS required for genq pool and gpu imemcpy */
167+
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(p->vci_local));
168+
165169
int num_nics = MPIDI_OFI_global.num_nics;
166170
while (p->u.recv.cur_chunk_index < p->u.recv.chunks_per_nic * num_nics) {
167171
if (p->u.recv.num_infly >= MPIDI_OFI_RNDVREAD_INFLY_CHUNKS) {
168-
return MPIX_ASYNC_NOPROGRESS;
172+
goto fn_exit;
169173
}
170174
int nic;
171175
MPI_Aint total_offset, nic_offset, chunk_sz;
@@ -180,7 +184,7 @@ static int rndvread_read_poll(MPIX_Async_thing thing)
180184
MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.
181185
per_vci[p->vci_local].pipeline_pool, &read_buf);
182186
if (!read_buf) {
183-
return MPIX_ASYNC_NOPROGRESS;
187+
goto fn_exit;
184188
}
185189
} else {
186190
read_buf = (char *) p->u.recv.u.data + total_offset;
@@ -202,9 +206,14 @@ static int rndvread_read_poll(MPIX_Async_thing thing)
202206
}
203207

204208
p->u.recv.all_issued = true;
205-
return MPIX_ASYNC_DONE;
209+
ret = MPIX_ASYNC_DONE;
210+
211+
fn_exit:
212+
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(p->vci_local));
213+
return ret;
206214
fn_fail:
207-
return MPIX_ASYNC_NOPROGRESS;
215+
ret = MPIX_ASYNC_NOPROGRESS;
216+
goto fn_exit;
208217
}
209218

210219
struct read_req {
@@ -325,11 +334,13 @@ static void recv_copy_complete(MPIR_Request * rreq, void *chunk_buf, MPI_Aint ch
325334
{
326335
MPIDI_OFI_rndvread_t *p = &MPIDI_OFI_AMREQ_READ(rreq);
327336

337+
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(p->vci_local));
328338
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.per_vci[p->vci_local].pipeline_pool,
329339
chunk_buf);
330340

331341
p->u.recv.u.copy_infly--;
332342
check_recv_complete(rreq);
343+
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(p->vci_local));
333344
}
334345

335346
static int check_recv_complete(MPIR_Request * rreq)

src/mpid/ch4/netmod/ofi/ofi_rndv_write.c

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,14 @@ int MPIDI_OFI_rndvwrite_recv_mrs_event(struct fi_cq_tagged_entry *wc, MPIR_Reque
9191

9292
static int rndvwrite_write_poll(MPIX_Async_thing thing)
9393
{
94+
int ret = MPIX_ASYNC_NOPROGRESS;
9495
int mpi_errno = MPI_SUCCESS;
9596
MPIR_Request *sreq = MPIR_Async_thing_get_state(thing);
9697
MPIDI_OFI_rndvwrite_t *p = &MPIDI_OFI_AMREQ_WRITE(sreq);
9798

99+
/* CS required for genq pool and gpu imemcpy */
100+
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(p->vci_local));
101+
98102
int num_nics = MPIDI_OFI_global.num_nics;
99103
while (p->u.send.cur_chunk_index < p->u.send.chunks_per_nic * num_nics) {
100104
int nic;
@@ -118,23 +122,23 @@ static int rndvwrite_write_poll(MPIX_Async_thing thing)
118122

119123
if (p->need_pack) {
120124
if (p->u.send.u.copy_infly >= MPIDI_OFI_RNDVWRITE_INFLY_CHUNKS) {
121-
return MPIX_ASYNC_NOPROGRESS;
125+
goto fn_exit;
122126
}
123127

124128
/* alloc a chunk */
125129
void *chunk_buf;
126130
MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.per_vci[p->vci_local].pipeline_pool,
127131
&chunk_buf);
128132
if (!chunk_buf) {
129-
return MPIX_ASYNC_NOPROGRESS;
133+
goto fn_exit;
130134
}
131135
/* issue async copy */
132136
mpi_errno = async_send_copy(sreq, nic, disp, chunk_buf, chunk_sz,
133137
p->buf, p->count, p->datatype, total_offset, &p->attr);
134138
MPIR_ERR_CHECK(mpi_errno);
135139
} else {
136140
if (p->u.send.write_infly >= MPIDI_OFI_RNDVWRITE_INFLY_CHUNKS) {
137-
return MPIX_ASYNC_NOPROGRESS;
141+
goto fn_exit;
138142
}
139143
void *write_buf = (char *) p->u.send.u.data + total_offset;
140144
/* issue rdma write */
@@ -144,9 +148,13 @@ static int rndvwrite_write_poll(MPIX_Async_thing thing)
144148
p->u.send.cur_chunk_index++;
145149
}
146150

147-
return MPIX_ASYNC_DONE;
151+
ret = MPIX_ASYNC_DONE;
152+
153+
fn_exit:
154+
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(p->vci_local));
155+
return ret;
148156
fn_fail:
149-
return MPIX_ASYNC_NOPROGRESS;
157+
goto fn_exit;
150158
}
151159

152160
struct send_copy {
@@ -248,9 +256,11 @@ static int send_issue_write(MPIR_Request * sreq, void *buf, MPI_Aint data_sz,
248256
fi_addr_t addr = MPIDI_OFI_av_to_phys(p->av, p->vci_local, nic, p->vci_remote, nic);
249257
uint64_t rkey = p->u.send.rkeys[nic];
250258

259+
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(p->vci_local));
251260
MPIDI_OFI_CALL_RETRY(fi_write(MPIDI_OFI_global.ctx[ctx_idx].tx,
252261
buf, data_sz, NULL, addr, disp, rkey, (void *) &t->context),
253262
p->vci_local, rdma_write);
263+
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(p->vci_local));
254264
p->u.send.write_infly++;
255265

256266
fn_exit:

0 commit comments

Comments
 (0)