From cbadc9aecb1fda6c8cbc58f329b3afd8ffa04840 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 28 Oct 2025 18:29:43 -0500 Subject: [PATCH 01/12] async: allow blocking MPI calls in async poll functions Since we need enter thread CS before calling async poll functions, and we may have a recursive situation and when the callback make blocking MPI calls and invoke MPI progress within the poll function. To allow that, we need skip async progress when we re-entering the progress. --- src/util/mpir_async_things.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/util/mpir_async_things.c b/src/util/mpir_async_things.c index 0657139e27b..5f1c10332e8 100644 --- a/src/util/mpir_async_things.c +++ b/src/util/mpir_async_things.c @@ -73,8 +73,18 @@ int MPIR_Async_things_add(int (*poll_fn) (struct MPIR_Async_thing * entry), void return MPI_SUCCESS; } +/* If we allow async callbacks to call MPI progress, e.g. make blocking MPI calls, + * we need prevent recursively entering async progress or invoking recursive CS lock. + */ +static MPL_TLS int in_async_progress = 0; + int MPIR_Async_things_progress(int vci, int *made_progress) { + if (in_async_progress) { + goto fn_exit; + } + in_async_progress = 1; + if (vci == -1) { vci = MPIR_MAX_VCIS; } @@ -97,5 +107,8 @@ int MPIR_Async_things_progress(int vci, int *made_progress) } } MPID_THREAD_CS_EXIT(VCI, async_things_mutex[vci]); + + in_async_progress = 0; + fn_exit: return MPI_SUCCESS; } From c4d081c7171deb84288ce7b3b7b94a7211f3bccb Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 28 Oct 2025 10:10:14 -0500 Subject: [PATCH 02/12] comm: re-implement nonblocking contextid allocation Re-implement the nonblocking contextid allocation algorithm using MPIX Async. --- src/mpi/comm/contextid.c | 317 ++++++++++++++++++++++----------------- 1 file changed, 178 insertions(+), 139 deletions(-) diff --git a/src/mpi/comm/contextid.c b/src/mpi/comm/contextid.c index 66dd26e7678..646ed850271 100644 --- a/src/mpi/comm/contextid.c +++ b/src/mpi/comm/contextid.c @@ -291,11 +291,22 @@ struct gcn_state { uint64_t tag; MPIR_Comm *comm_ptr; MPIR_Comm *comm_ptr_inter; - MPIR_Sched_t s; + MPIR_Request *req_ptr; MPIR_Comm *new_comm; - MPIR_Comm_kind_t gcn_cid_kind; + bool is_inter_comm; uint32_t local_mask[MPIR_MAX_CONTEXT_MASK + 1]; struct gcn_state *next; + enum { + MPIR_GCN__COPYMASK, + MPIR_GCN__ALLREDUCE, + MPIR_GCN__INTERCOMM_SENDRECV, + MPIR_GCN__INTERCOMM_BCAST, + } stage; + union { + MPIR_Request *allreduce_request; + MPIR_Request *sendrecv_reqs[2]; + MPIR_Request *bcast_request; + } u; }; struct gcn_state *next_gcn = NULL; @@ -613,55 +624,12 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr -static int sched_cb_gcn_copy_mask(MPIR_Comm * comm, int tag, void *state); -static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state); -static int sched_cb_gcn_bcast(MPIR_Comm * comm, int tag, void *state); -static int sched_cb_commit_comm(MPIR_Comm * comm, int tag, void *state) -{ - int mpi_errno = MPI_SUCCESS; - struct gcn_state *st = state; - - mpi_errno = MPIR_Comm_commit(st->new_comm); - MPIR_ERR_CHECK(mpi_errno); - - fn_fail: - return mpi_errno; - -} - -static int sched_cb_gcn_bcast(MPIR_Comm * comm, int tag, void *state) -{ - int mpi_errno = MPI_SUCCESS; - struct gcn_state *st = state; - - if (st->gcn_cid_kind == MPIR_COMM_KIND__INTERCOMM) { - if (st->comm_ptr_inter->rank == 0) { - mpi_errno = - MPIR_Sched_recv(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr_inter, - st->s); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = - MPIR_Sched_send(st->ctx0, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr_inter, - st->s); - MPIR_ERR_CHECK(mpi_errno); - MPIR_SCHED_BARRIER(st->s); - } - - mpi_errno = MPIR_Ibcast_intra_sched_auto(st->ctx1, 1, - MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr, - st->s); - MPIR_ERR_CHECK(mpi_errno); - MPIR_SCHED_BARRIER(st->s); - } - - mpi_errno = MPIR_Sched_cb(&sched_cb_commit_comm, st, st->s); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIR_Sched_cb(&MPIR_Sched_cb_free_buf, st, st->s); - MPIR_ERR_CHECK(mpi_errno); - - fn_fail: - return mpi_errno; -} +static int async_gcn_poll(MPIX_Async_thing thing); +static int gcn_copy_mask(struct gcn_state *st); +static int gcn_allocate_cid(struct gcn_state *st); +static int gcn_allreduce(struct gcn_state *st); +static int gcn_intercomm_sendrecv(struct gcn_state *st); +static int gcn_intercomm_bcast(struct gcn_state *st); /* Try to find a valid context id. * @@ -676,13 +644,11 @@ static int sched_cb_gcn_bcast(MPIR_Comm * comm, int tag, void *state) * execute and insert wrong order of entries to the nonblocking schedule and * cause errors. */ -static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state) +static int gcn_allocate_cid(struct gcn_state *st) { int mpi_errno = MPI_SUCCESS; - struct gcn_state *st = state, *tmp; - int newctxid; if (st->own_eager_mask) { - newctxid = find_and_allocate_context_id(st->local_mask); + int newctxid = find_and_allocate_context_id(st->local_mask); if (st->ctx0) *st->ctx0 = newctxid; if (st->ctx1) @@ -691,7 +657,7 @@ static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state) st->own_eager_mask = 0; eager_in_use = 0; } else if (st->own_mask) { - newctxid = find_and_allocate_context_id(st->local_mask); + int newctxid = find_and_allocate_context_id(st->local_mask); if (st->ctx0) *st->ctx0 = newctxid; if (st->ctx1) @@ -704,6 +670,7 @@ static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state) if (next_gcn == st) { next_gcn = st->next; } else { + struct gcn_state *tmp; for (tmp = next_gcn; tmp->next != st; tmp = tmp->next); tmp->next = st->next; } @@ -744,31 +711,9 @@ static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state) /* do not own mask, try again */ if (st->first_iter == 1) { st->first_iter = 0; - /* Set the Tag for the idup-operations. We have two problems here: - * 1.) The tag should not be used by another (blocking) context_id allocation. - * Therefore, we set tag_up as lower bound for the operation. tag_ub is used by - * most of the other blocking operations, but tag is always >0, so this - * should be fine. - * 2.) We need ordering between multiple idup operations on the same communicator. - * The problem here is that the iallreduce operations of the first iteration - * are not necessarily completed in the same order as they are issued, also on the - * same communicator. To avoid deadlocks, we cannot add the elements to the - * list bevfore the first iallreduce is completed. The "tag" is created for the - * scheduling - by calling MPIR_Sched_next_tag(comm_ptr, &tag) - and the same - * for a idup operation on all processes. So we use it here. */ - /* FIXME I'm not sure if there can be an overflows for this tag */ - st->tag = (uint64_t) tag + MPIR_Process.attrs.tag_ub; add_gcn_to_list(st); } - mpi_errno = MPIR_Sched_cb(&sched_cb_gcn_copy_mask, st, st->s); - MPIR_ERR_CHECK(mpi_errno); - MPIR_SCHED_BARRIER(st->s); } - } else { - /* Successfully allocated a context id */ - mpi_errno = MPIR_Sched_cb(&sched_cb_gcn_bcast, st, st->s); - MPIR_ERR_CHECK(mpi_errno); - MPIR_SCHED_BARRIER(st->s); } fn_exit: @@ -779,6 +724,7 @@ static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state) if (next_gcn == st) { next_gcn = st->next; } else { + struct gcn_state *tmp; for (tmp = next_gcn; tmp && tmp->next != st; tmp = tmp->next); tmp->next = st->next; } @@ -791,10 +737,9 @@ static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state) goto fn_exit; } -static int sched_cb_gcn_copy_mask(MPIR_Comm * comm, int tag, void *state) +static int gcn_copy_mask(struct gcn_state *st) { int mpi_errno = MPI_SUCCESS; - struct gcn_state *st = state; if (st->first_iter) { memset(st->local_mask, 0, (MPIR_MAX_CONTEXT_MASK + 1) * sizeof(int)); @@ -827,22 +772,146 @@ static int sched_cb_gcn_copy_mask(MPIR_Comm * comm, int tag, void *state) st->local_mask[ALL_OWN_MASK_FLAG] = 1; } } + return mpi_errno; +} + +static int gcn_allreduce(struct gcn_state *st) +{ + return MPIR_Iallreduce_impl(MPI_IN_PLACE, st->local_mask, MPIR_MAX_CONTEXT_MASK + 1, + MPIR_UINT32_T_INTERNAL, MPI_BAND, st->comm_ptr, + &st->u.allreduce_request); +} + +static int gcn_intercomm_sendrecv(struct gcn_state *st) +{ + int mpi_errno = MPI_SUCCESS; - mpi_errno = MPIR_Iallreduce_intra_sched_auto(MPI_IN_PLACE, st->local_mask, - MPIR_MAX_CONTEXT_MASK + 1, - MPIR_UINT32_T_INTERNAL, MPI_BAND, - st->comm_ptr, st->s); + mpi_errno = MPIC_Irecv(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, + 0, st->tag, st->comm_ptr_inter, &st->u.sendrecv_reqs[0]); + MPIR_ERR_CHECK(mpi_errno); + mpi_errno = MPIC_Isend(st->ctx0, 1, MPIR_CONTEXT_ID_T_DATATYPE, + 0, st->tag, st->comm_ptr_inter, &st->u.sendrecv_reqs[1], 0); MPIR_ERR_CHECK(mpi_errno); - MPIR_SCHED_BARRIER(st->s); - mpi_errno = MPIR_Sched_cb(&sched_cb_gcn_allocate_cid, st, st->s); + fn_fail: + return mpi_errno; +} + +static int gcn_intercomm_bcast(struct gcn_state *st) +{ + return MPIR_Ibcast_impl(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr, + &st->u.bcast_request); +} + +static int gcn_complete(struct gcn_state *st) +{ + int mpi_errno = MPI_SUCCESS; + + mpi_errno = MPIR_Comm_commit(st->new_comm); MPIR_ERR_CHECK(mpi_errno); - MPIR_SCHED_BARRIER(st->s); + + MPIR_Grequest_complete_impl(st->req_ptr); + + MPL_free(st); fn_fail: return mpi_errno; } +static int async_gcn_poll(MPIX_Async_thing thing) +{ + int mpi_errno = MPI_SUCCESS; + struct gcn_state *st = MPIR_Async_thing_get_state(thing); + + switch (st->stage) { + case MPIR_GCN__COPYMASK: + st->error = gcn_copy_mask(st); + MPIR_Assert(st->error == MPI_SUCCESS); + + st->error = gcn_allreduce(st); + MPIR_Assert(st->error == MPI_SUCCESS); + + st->stage = MPIR_GCN__ALLREDUCE; + return MPIX_ASYNC_NOPROGRESS; + + case MPIR_GCN__ALLREDUCE: + if (!MPIR_Request_is_complete(st->u.allreduce_request)) { + return MPIX_ASYNC_NOPROGRESS; + } else { + MPIR_Request_free(st->u.allreduce_request); + + mpi_errno = gcn_allocate_cid(st); + MPIR_Assert(mpi_errno == MPI_SUCCESS); + + if (*st->ctx0 == 0) { + /* TODO: error check */ + /* retry */ + st->stage = MPIR_GCN__COPYMASK; + return MPIX_ASYNC_NOPROGRESS; + } else if (!st->is_inter_comm) { + /* done */ + gcn_complete(st); + return MPIX_ASYNC_DONE; + } else if (st->comm_ptr_inter->rank == 0) { + st->error = gcn_intercomm_sendrecv(st); + MPIR_Assert(st->error == MPI_SUCCESS); + + st->stage = MPIR_GCN__INTERCOMM_SENDRECV; + return MPIX_ASYNC_NOPROGRESS; + } else { + st->error = gcn_intercomm_bcast(st); + MPIR_Assert(st->error == MPI_SUCCESS); + + st->stage = MPIR_GCN__INTERCOMM_BCAST; + return MPIX_ASYNC_NOPROGRESS; + } + } + + case MPIR_GCN__INTERCOMM_SENDRECV: + if (!MPIR_Request_is_complete(st->u.sendrecv_reqs[0]) || + !MPIR_Request_is_complete(st->u.sendrecv_reqs[1])) { + return MPIX_ASYNC_NOPROGRESS; + } else { + MPIR_Request_free(st->u.sendrecv_reqs[0]); + MPIR_Request_free(st->u.sendrecv_reqs[1]); + + st->error = gcn_intercomm_bcast(st); + MPIR_Assert(st->error == MPI_SUCCESS); + + st->stage = MPIR_GCN__INTERCOMM_BCAST; + return MPIX_ASYNC_NOPROGRESS; + } + + case MPIR_GCN__INTERCOMM_BCAST: + if (!MPIR_Request_is_complete(st->u.bcast_request)) { + return MPIX_ASYNC_NOPROGRESS; + } else { + MPIR_Request_free(st->u.bcast_request); + + gcn_complete(st); + return MPIX_ASYNC_DONE; + } + } + + MPIR_Assert(0); + return MPIX_ASYNC_NOPROGRESS; +} + +static int query_fn(void *extra_state, MPI_Status * status) +{ + /* status points to request->status */ + return status->MPI_ERROR; +} + +static int free_fn(void *extra_state) +{ + return MPI_SUCCESS; +} + +static int cancel_fn(void *extra_state, int complete) +{ + return MPI_SUCCESS; +} /** Allocating a new context ID collectively over the given communicator in a * nonblocking way. @@ -880,31 +949,40 @@ static int sched_cb_gcn_copy_mask(MPIR_Comm * comm, int tag, void *state) * To avoid deadlock or livelock, it uses the same eager protocol as * multi-threaded MPIR_Get_contextid_sparse_group. */ -static int sched_get_cid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcomm, - int *ctx0, int *ctx1, - MPIR_Sched_t s, MPIR_Comm_kind_t gcn_cid_kind) +static int async_get_cid(MPIR_Comm * comm_ptr, MPIR_Comm * newcomm, + int *ctx0, int *ctx1, bool is_inter_comm, MPIR_Request ** req) { int mpi_errno = MPI_SUCCESS; - struct gcn_state *st = NULL; MPIR_CHKPMEM_DECL(); + mpi_errno = MPIR_Grequest_start_impl(query_fn, free_fn, cancel_fn, NULL, req); + MPIR_ERR_CHECK(mpi_errno); + + struct gcn_state *st = NULL; MPIR_CHKPMEM_MALLOC(st, sizeof(struct gcn_state), MPL_MEM_COMM); + + int tag; + mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); + MPIR_ERR_CHECK(mpi_errno); + st->tag = tag; + st->ctx0 = ctx0; st->ctx1 = ctx1; - if (gcn_cid_kind == MPIR_COMM_KIND__INTRACOMM) { + if (!is_inter_comm) { st->comm_ptr = comm_ptr; st->comm_ptr_inter = NULL; } else { st->comm_ptr = comm_ptr->local_comm; st->comm_ptr_inter = comm_ptr; } - st->s = s; - st->gcn_cid_kind = gcn_cid_kind; + st->req_ptr = *req; + st->is_inter_comm = is_inter_comm; *(st->ctx0) = 0; st->own_eager_mask = 0; st->first_iter = 1; st->new_comm = newcomm; st->own_mask = 0; + st->stage = MPIR_GCN__COPYMASK; if (eager_nelem < 0) { /* Ensure that at least one word of deadlock-free context IDs is * always set aside for the base protocol */ @@ -912,61 +990,37 @@ static int sched_get_cid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcomm, MPIR_CVAR_CTXID_EAGER_SIZE < MPIR_MAX_CONTEXT_MASK - 1); eager_nelem = MPIR_CVAR_CTXID_EAGER_SIZE; } - mpi_errno = MPIR_Sched_cb(&sched_cb_gcn_copy_mask, st, s); + + mpi_errno = MPIR_Async_things_add(async_gcn_poll, st, NULL); MPIR_ERR_CHECK(mpi_errno); - MPIR_SCHED_BARRIER(s); fn_exit: return mpi_errno; - /* --BEGIN ERROR HANDLING-- */ fn_fail: MPIR_CHKPMEM_REAP(); goto fn_exit; - /* --END ERROR HANDLING-- */ } int MPIR_Get_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR_Request ** req) { int mpi_errno = MPI_SUCCESS; - int tag; - MPIR_Sched_t s; - - MPIR_FUNC_ENTER; - /* now create a schedule */ - mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIR_Sched_create(&s, MPIR_SCHED_KIND_GENERALIZED); - MPIR_ERR_CHECK(mpi_errno); - MPIR_Sched_set_tag(s, tag); - - /* add some entries to it */ - mpi_errno = - sched_get_cid_nonblock(comm_ptr, newcommp, &newcommp->context_id, &newcommp->recvcontext_id, - s, MPIR_COMM_KIND__INTRACOMM); - MPIR_ERR_CHECK(mpi_errno); - - /* finally, kick off the schedule and give the caller a request */ - mpi_errno = MPIR_Sched_start(s, comm_ptr, req); + mpi_errno = async_get_cid(comm_ptr, newcommp, &newcommp->context_id, &newcommp->recvcontext_id, + false, req); MPIR_ERR_CHECK(mpi_errno); fn_exit: MPIR_FUNC_EXIT; return mpi_errno; - /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; - /* --END ERROR HANDLING-- */ } int MPIR_Get_intercomm_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR_Request ** req) { int mpi_errno = MPI_SUCCESS; - int tag; - MPIR_Sched_t s; - MPIR_FUNC_ENTER; /* do as much local setup as possible */ @@ -975,23 +1029,8 @@ int MPIR_Get_intercomm_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newc MPIR_ERR_CHECK(mpi_errno); } - /* now create a schedule */ - mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIR_Sched_create(&s, MPIR_SCHED_KIND_GENERALIZED); - MPIR_ERR_CHECK(mpi_errno); - MPIR_Sched_set_tag(s, tag); - - /* add some entries to it */ - - /* first get a context ID over the local comm */ - mpi_errno = - sched_get_cid_nonblock(comm_ptr, newcommp, &newcommp->recvcontext_id, &newcommp->context_id, - s, MPIR_COMM_KIND__INTERCOMM); - MPIR_ERR_CHECK(mpi_errno); - - /* finally, kick off the schedule and give the caller a request */ - mpi_errno = MPIR_Sched_start(s, comm_ptr, req); + mpi_errno = async_get_cid(comm_ptr, newcommp, &newcommp->recvcontext_id, &newcommp->context_id, + true, req); MPIR_ERR_CHECK(mpi_errno); fn_fail: From 2ce9b3a24e08aa1071a109fccb6cdbbb0c7ca93d Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 28 Oct 2025 12:04:29 -0500 Subject: [PATCH 03/12] coll: add tagged version of nonblocking collectives Add internal nonblocking collective interfaces that accept an explicit tag. This allows asynchronous algorithms to internally call nonblocking collectives but not tied to a specific schedule framework. Specifically, it allows nonblocking algorithms using the MPIX Async interface. --- maint/gen_coll.py | 45 ++++++++++++++++++++++++++++---- src/mpi/coll/include/coll_impl.h | 7 ++--- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/maint/gen_coll.py b/maint/gen_coll.py index de4fbcbdfd6..d1c6b5e9b42 100644 --- a/maint/gen_coll.py +++ b/maint/gen_coll.py @@ -62,6 +62,7 @@ def dump_coll(name, blocking_type): dump_allcomm_sched_auto(name) dump_sched_impl(name) dump_mpir_impl_nonblocking(name) + dump_mpir_nonblocking_tag(name) elif blocking_type == "persistent": dump_mpir_impl_persistent(name) else: @@ -426,6 +427,36 @@ def dump_mpir_impl_nonblocking(name): G.out.append("goto fn_exit;") dump_close("}") +def dump_mpir_nonblocking_tag(name): + blocking_type = "nonblocking" + func = G.FUNCS["mpi_" + name] + params, args = get_params_and_args(func) + func_params = get_func_params(params, name, "tag") + + func_name = get_func_name(name, blocking_type) + Name = func_name.capitalize() + NAME = func_name.upper() + + G.out.append("") + add_prototype("int MPIR_%s_tag(%s)" % (Name, func_params)) + dump_split(0, "int MPIR_%s_tag(%s)" % (Name, func_params)) + dump_open('{') + G.out.append("int mpi_errno = MPI_SUCCESS;") + G.out.append("enum MPIR_sched_type sched_type;") + G.out.append("void *sched;") + G.out.append("") + G.out.append("*request = NULL;") + func_args = get_func_args(args, name, "mpir_impl_tag") + dump_split(1, "mpi_errno = MPIR_%s_sched_impl(%s);" % (Name, func_args)) + G.out.append("MPIR_ERR_CHECK(mpi_errno);") + G.out.append("MPII_SCHED_START(sched_type, sched, comm_ptr, request);") + G.out.append("") + G.out.append("fn_exit:") + G.out.append("return mpi_errno;") + G.out.append("fn_fail:") + G.out.append("goto fn_exit;") + dump_close("}") + def dump_mpir_impl_persistent(name): blocking_type = "persistent" func = G.FUNCS["mpi_" + name] @@ -693,14 +724,16 @@ def get_func_params(params, name, kind): func_params += ", int coll_attr" elif kind == "nonblocking": func_params += ", MPIR_Request ** request" + elif kind == "tag": + func_params += ", int tag, MPIR_Request ** request" elif kind == "persistent": func_params += ", MPIR_Info * info_ptr, MPIR_Request ** request" elif kind == "sched_auto": func_params += ", MPIR_Sched_t s" elif kind == "allcomm_sched_auto": - func_params += ", bool is_persistent, void **sched_p, enum MPIR_sched_type *sched_type_p" + func_params += ", int tag, bool is_persistent, void **sched_p, enum MPIR_sched_type *sched_type_p" elif kind == "sched_impl": - func_params += ", bool is_persistent, void **sched_p, enum MPIR_sched_type *sched_type_p" + func_params += ", int tag, bool is_persistent, void **sched_p, enum MPIR_sched_type *sched_type_p" else: raise Exception("get_func_params - unexpected kind = %s" % kind) @@ -716,11 +749,13 @@ def get_func_args(args, name, kind): elif kind == "persistent": func_args += ", info_ptr, request" elif kind == "allcomm_sched_auto": - func_args += ", is_persistent, sched_p, sched_type_p" + func_args += ", tag, is_persistent, sched_p, sched_type_p" elif kind == "mpir_impl_nonblocking": - func_args += ", false, &sched, &sched_type" + func_args += ", 0, false, &sched, &sched_type" + elif kind == "mpir_impl_tag": + func_args += ", tag, false, &sched, &sched_type" elif kind == "mpir_impl_persistent": - func_args += ", true, &req->u.persist_coll.sched, &req->u.persist_coll.sched_type" + func_args += ", 0, true, &req->u.persist_coll.sched, &req->u.persist_coll.sched_type" else: raise Exception("get_func_args - unexpected kind = %s" % kind) diff --git a/src/mpi/coll/include/coll_impl.h b/src/mpi/coll/include/coll_impl.h index 75eaeab6e94..259ab7a176a 100644 --- a/src/mpi/coll/include/coll_impl.h +++ b/src/mpi/coll/include/coll_impl.h @@ -76,9 +76,10 @@ int MPII_Coll_finalize(void); } \ mpi_errno = MPIR_Sched_create(&s, sched_kind); \ MPIR_ERR_CHECK(mpi_errno); \ - int tag = -1; \ - mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); \ - MPIR_ERR_CHECK(mpi_errno); \ + if (!tag) { \ + mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); \ + MPIR_ERR_CHECK(mpi_errno); \ + } \ MPIR_Sched_set_tag(s, tag); \ *sched_type_p = MPIR_SCHED_NORMAL; \ *sched_p = s; \ From ad870b544d34ca805a803d73f8b6a181df23fc2b Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 28 Oct 2025 18:14:39 -0500 Subject: [PATCH 04/12] comm/contextid: use tag nonblocking collectives Use the nonblocking collective interface with an explicit tag to in the nonblocking context_id allocation algorithm. --- src/mpi/comm/contextid.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/mpi/comm/contextid.c b/src/mpi/comm/contextid.c index 646ed850271..1dc0f23fa36 100644 --- a/src/mpi/comm/contextid.c +++ b/src/mpi/comm/contextid.c @@ -777,9 +777,9 @@ static int gcn_copy_mask(struct gcn_state *st) static int gcn_allreduce(struct gcn_state *st) { - return MPIR_Iallreduce_impl(MPI_IN_PLACE, st->local_mask, MPIR_MAX_CONTEXT_MASK + 1, - MPIR_UINT32_T_INTERNAL, MPI_BAND, st->comm_ptr, - &st->u.allreduce_request); + return MPIR_Iallreduce_tag(MPI_IN_PLACE, st->local_mask, MPIR_MAX_CONTEXT_MASK + 1, + MPIR_UINT32_T_INTERNAL, MPI_BAND, st->comm_ptr, + st->tag, &st->u.allreduce_request); } static int gcn_intercomm_sendrecv(struct gcn_state *st) @@ -799,8 +799,8 @@ static int gcn_intercomm_sendrecv(struct gcn_state *st) static int gcn_intercomm_bcast(struct gcn_state *st) { - return MPIR_Ibcast_impl(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr, - &st->u.bcast_request); + return MPIR_Ibcast_tag(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr, + st->tag, &st->u.bcast_request); } static int gcn_complete(struct gcn_state *st) From e0c1c7be0d30adb2f332edb7951449efa8adb9ec Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 28 Oct 2025 18:18:08 -0500 Subject: [PATCH 05/12] request: remove assertions on grequest in waitall The basic general request relies on external progress mechanism to complete the request rather than on the extension with wait_fn. We can create generalized request using MPIX Async mechanism and MPID_Progress_wait will complete the request. --- src/mpi/request/request_impl.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/mpi/request/request_impl.c b/src/mpi/request/request_impl.c index 6458b79075a..25d8de17d4d 100644 --- a/src/mpi/request/request_impl.c +++ b/src/mpi/request/request_impl.c @@ -774,9 +774,6 @@ int MPIR_Waitall_state(int count, MPIR_Request * request_ptrs[], MPI_Status arra /* wait for ith request to complete */ DEBUG_PROGRESS_START; while (!MPIR_Request_is_complete(request_ptrs[i])) { - /* generalized requests should already be finished */ - MPIR_Assert(request_ptrs[i]->kind != MPIR_REQUEST_KIND__GREQUEST); - mpi_errno = MPID_Progress_wait(state); MPIR_ERR_CHECK(mpi_errno); DEBUG_PROGRESS_CHECK; From 6c989ed6452244744a293bc7d74ce76d4c870de5 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 28 Oct 2025 18:22:06 -0500 Subject: [PATCH 06/12] test: remove idup xfails --- test/mpi/maint/jenkins/xfail.conf | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/mpi/maint/jenkins/xfail.conf b/test/mpi/maint/jenkins/xfail.conf index 5184dbd2be0..3c66d464da0 100644 --- a/test/mpi/maint/jenkins/xfail.conf +++ b/test/mpi/maint/jenkins/xfail.conf @@ -26,11 +26,6 @@ ################################################################################ -# xfail ch4 bugs -* * * ch4:ofi * /^idup_comm_gen/ xfail=ticket3794 threads/comm/testlist -* * * ch4:ofi * /^idup_nb/ xfail=ticket3794 threads/comm/testlist -* * * ch4:ucx * /^idup_comm_gen/ xfail=ticket3794 threads/comm/testlist -* * * ch4:ucx * /^idup_nb/ xfail=ticket3794 threads/comm/testlist ################################################################################ # misc special build * * nofast * * /^large_acc_flush_local/ xfail=issue4663 rma/testlist From 650cb8fea7c4dbc233d399bd036c9e7905598a9f Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 28 Oct 2025 21:30:08 -0500 Subject: [PATCH 07/12] sched: remove MPIR_SCHED_KIND_GENERALIZED MPIR_SCHED_KIND_GENERALIZED no longer needed. --- src/mpid/common/sched/mpidu_sched.c | 78 +++++++---------------------- src/mpid/common/sched/mpidu_sched.h | 1 - 2 files changed, 18 insertions(+), 61 deletions(-) diff --git a/src/mpid/common/sched/mpidu_sched.c b/src/mpid/common/sched/mpidu_sched.c index 753c5fe0410..da78ec050da 100644 --- a/src/mpid/common/sched/mpidu_sched.c +++ b/src/mpid/common/sched/mpidu_sched.c @@ -329,12 +329,7 @@ static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPI MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting CB entry %d\n", (int) idx); if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_1) { ret_errno = e->u.cb.u.cb_p(r->comm, s->tag, e->u.cb.cb_state); - if (s->kind == MPIR_SCHED_KIND_GENERALIZED) { - /* Sched entries list can be reallocated inside callback */ - e = &s->entries[idx]; - } else { - MPIR_Assert(e == &s->entries[idx]); - } + MPIR_Assert(e == &s->entries[idx]); if (unlikely(ret_errno)) { if (0 == r->u.nbc.coll_attr) { if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) { @@ -352,12 +347,7 @@ static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPI } } else if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_2) { ret_errno = e->u.cb.u.cb2_p(r->comm, s->tag, e->u.cb.cb_state, e->u.cb.cb_state2); - if (s->kind == MPIR_SCHED_KIND_GENERALIZED) { - /* Sched entries list can be reallocated inside callback */ - e = &s->entries[idx]; - } else { - MPIR_Assert(e == &s->entries[idx]); - } + MPIR_Assert(e == &s->entries[idx]); if (unlikely(ret_errno)) { if (0 == r->u.nbc.coll_attr) { if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) { @@ -679,10 +669,8 @@ int MPIDU_Sched_send(const void *buf, MPI_Aint count, MPI_Datatype datatype, int * release it at entry completion time */ MPIR_Comm_add_ref(comm); MPIR_Datatype_add_ref_if_not_builtin(datatype); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, comm->handle); - sched_add_ref(s, datatype); - } + sched_add_ref(s, comm->handle); + sched_add_ref(s, datatype); fn_exit: return mpi_errno; @@ -717,10 +705,8 @@ int MPIDU_Sched_pt2pt_send(const void *buf, MPI_Aint count, MPI_Datatype datatyp * release it at entry completion time */ MPIR_Comm_add_ref(comm); MPIR_Datatype_add_ref_if_not_builtin(datatype); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, comm->handle); - sched_add_ref(s, datatype); - } + sched_add_ref(s, comm->handle); + sched_add_ref(s, datatype); fn_exit: return mpi_errno; @@ -754,10 +740,8 @@ int MPIDU_Sched_send_defer(const void *buf, const MPI_Aint * count, MPI_Datatype * release it at entry completion time */ MPIR_Comm_add_ref(comm); MPIR_Datatype_add_ref_if_not_builtin(datatype); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, comm->handle); - sched_add_ref(s, datatype); - } + sched_add_ref(s, comm->handle); + sched_add_ref(s, datatype); fn_exit: return mpi_errno; @@ -788,10 +772,8 @@ int MPIDU_Sched_recv_status(void *buf, MPI_Aint count, MPI_Datatype datatype, in status->MPI_ERROR = MPI_SUCCESS; MPIR_Comm_add_ref(comm); MPIR_Datatype_add_ref_if_not_builtin(datatype); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, comm->handle); - sched_add_ref(s, datatype); - } + sched_add_ref(s, comm->handle); + sched_add_ref(s, datatype); fn_exit: return mpi_errno; @@ -822,10 +804,8 @@ int MPIDU_Sched_recv(void *buf, MPI_Aint count, MPI_Datatype datatype, int src, MPIR_Comm_add_ref(comm); MPIR_Datatype_add_ref_if_not_builtin(datatype); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, comm->handle); - sched_add_ref(s, datatype); - } + sched_add_ref(s, comm->handle); + sched_add_ref(s, datatype); fn_exit: return mpi_errno; @@ -857,10 +837,8 @@ int MPIDU_Sched_pt2pt_recv(void *buf, MPI_Aint count, MPI_Datatype datatype, MPIR_Comm_add_ref(comm); MPIR_Datatype_add_ref_if_not_builtin(datatype); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, comm->handle); - sched_add_ref(s, datatype); - } + sched_add_ref(s, comm->handle); + sched_add_ref(s, datatype); fn_exit: return mpi_errno; @@ -891,10 +869,8 @@ int MPIDU_Sched_reduce(const void *inbuf, void *inoutbuf, MPI_Aint count, MPI_Da MPIR_Datatype_add_ref_if_not_builtin(datatype); MPIR_Op_add_ref_if_not_builtin(op); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, datatype); - sched_add_ref(s, op); - } + sched_add_ref(s, datatype); + sched_add_ref(s, op); fn_exit: return mpi_errno; @@ -933,10 +909,8 @@ int MPIDU_Sched_copy(const void *inbuf, MPI_Aint incount, MPI_Datatype intype, MPIR_Datatype_add_ref_if_not_builtin(intype); MPIR_Datatype_add_ref_if_not_builtin(outtype); - if (s->kind != MPIR_SCHED_KIND_GENERALIZED) { - sched_add_ref(s, intype); - sched_add_ref(s, outtype); - } + sched_add_ref(s, intype); + sched_add_ref(s, outtype); /* some sanity checking up front */ #if defined(HAVE_ERROR_CHECKING) && !defined(NDEBUG) @@ -1055,10 +1029,6 @@ static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; MPIR_Request_free(e->u.send.sreq); e->u.send.sreq = NULL; - if (s->kind == MPIR_SCHED_KIND_GENERALIZED) { - MPIR_Comm_release(e->u.send.comm); - MPIR_Datatype_release_if_not_builtin(e->u.send.datatype); - } } break; case MPIDU_SCHED_ENTRY_RECV: @@ -1080,10 +1050,6 @@ static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; MPIR_Request_free(e->u.recv.rreq); e->u.recv.rreq = NULL; - if (s->kind == MPIR_SCHED_KIND_GENERALIZED) { - MPIR_Comm_release(e->u.recv.comm); - MPIR_Datatype_release_if_not_builtin(e->u.recv.datatype); - } } break; case MPIDU_SCHED_ENTRY_PT2PT_SEND: @@ -1095,10 +1061,6 @@ static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made } MPIR_Request_free(e->u.send.sreq); e->u.send.sreq = NULL; - if (s->kind == MPIR_SCHED_KIND_GENERALIZED) { - MPIR_Comm_release(e->u.send.comm); - MPIR_Datatype_release_if_not_builtin(e->u.send.datatype); - } } break; case MPIDU_SCHED_ENTRY_PT2PT_RECV: @@ -1110,10 +1072,6 @@ static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made } MPIR_Request_free(e->u.recv.rreq); e->u.recv.rreq = NULL; - if (s->kind == MPIR_SCHED_KIND_GENERALIZED) { - MPIR_Comm_release(e->u.recv.comm); - MPIR_Datatype_release_if_not_builtin(e->u.recv.datatype); - } } break; default: diff --git a/src/mpid/common/sched/mpidu_sched.h b/src/mpid/common/sched/mpidu_sched.h index 2454ad60c84..43df00df9f4 100644 --- a/src/mpid/common/sched/mpidu_sched.h +++ b/src/mpid/common/sched/mpidu_sched.h @@ -17,7 +17,6 @@ enum MPIR_Sched_kind { MPIR_SCHED_KIND_REGULAR = 0, MPIR_SCHED_KIND_PERSISTENT, /* used by persistent collectives, do not free on completion */ - MPIR_SCHED_KIND_GENERALIZED, /* used by contextid code, callbacks may alter entries */ }; enum MPIDU_Sched_entry_type { From 0d70f66c6318b38f494f61b3974d98d88f0d9f4a Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 29 Oct 2025 21:36:44 -0500 Subject: [PATCH 08/12] comm/contextid: replace first_iter with iter It's easier for debugging when we can track the iteration number between retries. --- src/mpi/comm/contextid.c | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/mpi/comm/contextid.c b/src/mpi/comm/contextid.c index 1dc0f23fa36..8b830d0aea8 100644 --- a/src/mpi/comm/contextid.c +++ b/src/mpi/comm/contextid.c @@ -287,7 +287,7 @@ struct gcn_state { int *ctx1; int own_mask; int own_eager_mask; - int first_iter; + int iter; uint64_t tag; MPIR_Comm *comm_ptr; MPIR_Comm *comm_ptr_inter; @@ -362,7 +362,7 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr MPIR_FUNC_ENTER; - st.first_iter = 1; + st.iter = 0; st.comm_ptr = comm_ptr; st.tag = tag; st.own_mask = 0; @@ -402,7 +402,7 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr * processes have called this routine. On the first iteration, use the * "eager" allocation protocol. */ - else if (st.first_iter) { + else if (st.iter == 0) { memset(st.local_mask, 0, MPIR_MAX_CONTEXT_MASK * sizeof(int)); st.own_eager_mask = 0; /* Attempt to reserve the eager mask segment */ @@ -581,8 +581,7 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr } /* --END ERROR HANDLING-- */ } - if (st.first_iter == 1) { - st.first_iter = 0; + if (st.iter == 0) { /* to avoid deadlocks, the element is not added to the list before the first iteration */ if (!ignore_id && *context_id == 0) { MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); @@ -590,6 +589,7 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); } } + st.iter++; } fn_exit: @@ -607,7 +607,7 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr mask_in_use = 0; } /*If in list, remove it */ - if (!st.first_iter && !ignore_id) { + if (st.iter > 0 && !ignore_id) { if (next_gcn == &st) { next_gcn = st.next; } else { @@ -709,18 +709,18 @@ static int gcn_allocate_cid(struct gcn_state *st) /* --END ERROR HANDLING-- */ } else { /* do not own mask, try again */ - if (st->first_iter == 1) { - st->first_iter = 0; + if (st->iter == 0) { add_gcn_to_list(st); } } } + st->iter++; fn_exit: return mpi_errno; fn_fail: /* make sure that the pending allocations are scheduled */ - if (!st->first_iter) { + if (st->iter > 0) { if (next_gcn == st) { next_gcn = st->next; } else { @@ -741,7 +741,7 @@ static int gcn_copy_mask(struct gcn_state *st) { int mpi_errno = MPI_SUCCESS; - if (st->first_iter) { + if (st->iter == 0) { memset(st->local_mask, 0, (MPIR_MAX_CONTEXT_MASK + 1) * sizeof(int)); st->own_eager_mask = 0; @@ -893,8 +893,8 @@ static int async_gcn_poll(MPIX_Async_thing thing) } } - MPIR_Assert(0); - return MPIX_ASYNC_NOPROGRESS; + fn_fail: + return mpi_errno; } static int query_fn(void *extra_state, MPI_Status * status) @@ -979,7 +979,7 @@ static int async_get_cid(MPIR_Comm * comm_ptr, MPIR_Comm * newcomm, st->is_inter_comm = is_inter_comm; *(st->ctx0) = 0; st->own_eager_mask = 0; - st->first_iter = 1; + st->iter = 0; st->new_comm = newcomm; st->own_mask = 0; st->stage = MPIR_GCN__COPYMASK; From c0f51d20aa692c2b45156ecf6eb4af396c9214b2 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 29 Oct 2025 21:38:54 -0500 Subject: [PATCH 09/12] comm/contextid: refactor context_id allocation code Refactor between the blocking and nonblocking algorithm to avoid duplications and inconsistencies. Fix the potential missed thread-safety in the nonblocking code. --- src/mpi/comm/contextid.c | 642 +++++++++++++++------------------------ 1 file changed, 252 insertions(+), 390 deletions(-) diff --git a/src/mpi/comm/contextid.c b/src/mpi/comm/contextid.c index 8b830d0aea8..956087be955 100644 --- a/src/mpi/comm/contextid.c +++ b/src/mpi/comm/contextid.c @@ -238,21 +238,6 @@ static int allocate_context_bit(uint32_t mask[], int id) return id; } -/* Allocates the first available context ID from context_mask based on the available - * bits given in local_mask. This function will clear the corresponding bit in - * context_mask if allocation was successful. - * - * Returns 0 on failure. Returns the allocated context ID on success. */ -static int find_and_allocate_context_id(uint32_t local_mask[]) -{ - int context_id; - context_id = locate_context_bit(local_mask); - if (context_id != 0) { - context_id = allocate_context_bit(context_mask, context_id); - } - return context_id; -} - /* EAGER CONTEXT ID ALLOCATION: Attempt to allocate the context ID during the * initial synchronization step. If eager protocol fails, threads fall back to * the base algorithm. @@ -293,7 +278,11 @@ struct gcn_state { MPIR_Comm *comm_ptr_inter; MPIR_Request *req_ptr; MPIR_Comm *new_comm; + MPIR_Group *group_ptr; + bool ignore_id; bool is_inter_comm; + bool gcn_in_list; + int error; uint32_t local_mask[MPIR_MAX_CONTEXT_MASK + 1]; struct gcn_state *next; enum { @@ -310,32 +299,82 @@ struct gcn_state { }; struct gcn_state *next_gcn = NULL; +static void gcn_init(struct gcn_state *st, MPIR_Comm * comm_ptr, int tag, + bool ignore_id, MPIR_Group * group_ptr); +static void add_gcn_to_list(struct gcn_state *st); +static void remove_gcn_from_list(struct gcn_state *st); +static int gcn_copy_mask(struct gcn_state *st); +static int gcn_allocate_cid(struct gcn_state *st); +static int gcn_allreduce(struct gcn_state *st); +static int gcn_check_id_exhaustion(struct gcn_state *st); +static int gcn_intercomm_sendrecv(struct gcn_state *st); +static int gcn_intercomm_bcast(struct gcn_state *st); + +static void gcn_init(struct gcn_state *st, MPIR_Comm * comm_ptr, int tag, bool ignore_id, + MPIR_Group * group_ptr) +{ + if (eager_nelem < 0) { + /* Ensure that at least one word of deadlock-free context IDs is + * always set aside for the base protocol */ + MPIR_Assert(MPIR_CVAR_CTXID_EAGER_SIZE >= 0 && + MPIR_CVAR_CTXID_EAGER_SIZE < MPIR_MAX_CONTEXT_MASK - 1); + eager_nelem = MPIR_CVAR_CTXID_EAGER_SIZE; + } + + MPIR_Assert(comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM); + + st->iter = 0; + st->comm_ptr = comm_ptr; + st->tag = tag; + st->ignore_id = ignore_id; + st->group_ptr = group_ptr; + st->own_mask = 0; + st->own_eager_mask = 0; + st->req_ptr = NULL; + st->error = 0; + st->gcn_in_list = false; +} + /* All pending context_id allocations are added to a list. The context_id allocations are ordered * according to the context_id of of parent communicator and the tag, wherby blocking context_id * allocations can have the same tag, while nonblocking operations cannot. In the non-blocking * case, the user is responsible for the right tags if "comm_create_group" is used */ -static void add_gcn_to_list(struct gcn_state *new_state) +static void add_gcn_to_list(struct gcn_state *st) { + /* we'll only call this from gcn_allocate_cid within CS */ struct gcn_state *tmp = NULL; if (next_gcn == NULL) { - next_gcn = new_state; - new_state->next = NULL; - } else if (next_gcn->comm_ptr->context_id > new_state->comm_ptr->context_id || - (next_gcn->comm_ptr->context_id == new_state->comm_ptr->context_id && - next_gcn->tag > new_state->tag)) { - new_state->next = next_gcn; - next_gcn = new_state; + next_gcn = st; + st->next = NULL; + } else if (next_gcn->comm_ptr->context_id > st->comm_ptr->context_id || + (next_gcn->comm_ptr->context_id == st->comm_ptr->context_id && + next_gcn->tag > st->tag)) { + st->next = next_gcn; + next_gcn = st; } else { for (tmp = next_gcn; tmp->next != NULL && - ((new_state->comm_ptr->context_id > tmp->next->comm_ptr->context_id) || - ((new_state->comm_ptr->context_id == tmp->next->comm_ptr->context_id) && - (new_state->tag >= tmp->next->tag))); tmp = tmp->next); + ((st->comm_ptr->context_id > tmp->next->comm_ptr->context_id) || + ((st->comm_ptr->context_id == tmp->next->comm_ptr->context_id) && + (st->tag >= tmp->next->tag))); tmp = tmp->next); - new_state->next = tmp->next; - tmp->next = new_state; + st->next = tmp->next; + tmp->next = st; + } + st->gcn_in_list = true; +} +static void remove_gcn_from_list(struct gcn_state *st) +{ + /* we'll only call this from gcn_allocate_cid within CS */ + if (next_gcn == st) { + next_gcn = st->next; + } else { + struct gcn_state *tmp; + for (tmp = next_gcn; tmp->next != st; tmp = tmp->next); + tmp->next = st->next; } + st->gcn_in_list = false; } /* Allocates a new context ID collectively over the given communicator. This @@ -357,19 +396,14 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr int *context_id, int ignore_id) { int mpi_errno = MPI_SUCCESS; - struct gcn_state st; - struct gcn_state *tmp; - MPIR_FUNC_ENTER; - st.iter = 0; - st.comm_ptr = comm_ptr; - st.tag = tag; - st.own_mask = 0; - st.own_eager_mask = 0; /* Group-collective and ignore_id should never be combined */ MPIR_Assert(!(group_ptr != NULL && ignore_id)); + struct gcn_state st; + gcn_init(&st, comm_ptr, tag, ignore_id, group_ptr); + *context_id = 0; MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE, (MPL_DBG_FDEST, @@ -377,219 +411,24 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr mask_in_use, eager_in_use, comm_ptr->context_id, tag)); while (*context_id == 0) { - /* We lock only around access to the mask (except in the global locking - * case). If another thread is using the mask, we take a mask of zero. */ - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - - if (eager_nelem < 0) { - /* Ensure that at least one word of deadlock-free context IDs is - * always set aside for the base protocol */ - MPIR_Assert(MPIR_CVAR_CTXID_EAGER_SIZE >= 0 && - MPIR_CVAR_CTXID_EAGER_SIZE < MPIR_MAX_CONTEXT_MASK - 1); - eager_nelem = MPIR_CVAR_CTXID_EAGER_SIZE; - } - - if (ignore_id) { - /* We are not participating in the resulting communicator, so our - * context ID space doesn't matter. Set the mask to "all available". */ - memset(st.local_mask, 0xff, MPIR_MAX_CONTEXT_MASK * sizeof(int)); - st.own_mask = 0; - /* don't need to touch mask_in_use/lowest_context_id b/c our thread - * doesn't ever need to "win" the mask */ - } - - /* Deadlock avoidance: Only participate in context id loop when all - * processes have called this routine. On the first iteration, use the - * "eager" allocation protocol. - */ - else if (st.iter == 0) { - memset(st.local_mask, 0, MPIR_MAX_CONTEXT_MASK * sizeof(int)); - st.own_eager_mask = 0; - /* Attempt to reserve the eager mask segment */ - if (!eager_in_use && eager_nelem > 0) { - int i; - for (i = 0; i < eager_nelem; i++) - st.local_mask[i] = context_mask[i]; - - eager_in_use = 1; - st.own_eager_mask = 1; - } - } - - else { - MPIR_Assert(next_gcn != NULL); - /*If we are here, at least one element must be in the list, at least myself */ - - /* only the first element in the list can own the mask. However, maybe the mask is used - * by another thread, which added another allocation to the list before. So we have to check, - * if the mask is used and mark, if we own it */ - if (mask_in_use || &st != next_gcn) { - memset(st.local_mask, 0, MPIR_MAX_CONTEXT_MASK * sizeof(int)); - st.own_mask = 0; - MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE, (MPL_DBG_FDEST, - "Mask is in use, my context_id is %d, owner context id is %d", - st.comm_ptr->context_id, - next_gcn->comm_ptr->context_id)); - } else { - int i; - /* Copy safe mask segment to local_mask */ - for (i = 0; i < eager_nelem; i++) - st.local_mask[i] = 0; - for (i = eager_nelem; i < MPIR_MAX_CONTEXT_MASK; i++) - st.local_mask[i] = context_mask[i]; - - mask_in_use = 1; - st.own_mask = 1; - MPL_DBG_MSG(MPIR_DBG_COMM, VERBOSE, "Copied local_mask"); - } - } - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - - /* Note: MPIR_MAX_CONTEXT_MASK elements of local_mask are used by the - * context ID allocation algorithm. The additional element is ignored - * by the context ID mask access routines and is used as a flag for - * detecting context ID exhaustion (explained below). */ - if (st.own_mask || ignore_id) - st.local_mask[ALL_OWN_MASK_FLAG] = 1; - else - st.local_mask[ALL_OWN_MASK_FLAG] = 0; - - /* Now, try to get a context id */ - MPIR_Assert(comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM); - /* In the global and brief-global cases, note that this routine will - * release that global lock when it needs to wait. That will allow - * other processes to enter the global or brief global critical section. - */ - if (group_ptr != NULL) { - int coll_tag = tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ - mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, st.local_mask, MPIR_MAX_CONTEXT_MASK + 1, - MPIR_INT_INTERNAL, MPI_BAND, comm_ptr, group_ptr, - coll_tag, MPIR_COLL_ATTR_SYNC); - } else { - mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, st.local_mask, MPIR_MAX_CONTEXT_MASK + 1, - MPIR_INT_INTERNAL, MPI_BAND, comm_ptr, - MPIR_COLL_ATTR_SYNC); - } + mpi_errno = gcn_copy_mask(&st); MPIR_ERR_CHECK(mpi_errno); - /* MT FIXME 2/3 cases don't seem to need the CONTEXTID CS, check and - * narrow this region */ - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - if (ignore_id) { - /* we don't care what the value was, but make sure that everyone - * who did care agreed on a value */ - *context_id = locate_context_bit(st.local_mask); - /* used later in out-of-context ids check and outer while loop condition */ - } else if (st.own_eager_mask) { - /* There is a chance that we've found a context id */ - /* Find_and_allocate_context_id updates the context_mask if it finds a match */ - *context_id = find_and_allocate_context_id(st.local_mask); - MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "Context id is now %d", *context_id); - - st.own_eager_mask = 0; - eager_in_use = 0; - if (*context_id <= 0) { - /* else we did not find a context id. Give up the mask in case - * there is another thread (with a lower input context id) - * waiting for it. We need to ensure that any other threads - * have the opportunity to run, hence yielding */ - /* FIXME: Do we need to do an GLOBAL yield here? - * When we do a collective operation, we anyway yield - * for other others */ - MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX); - MPID_THREAD_CS_YIELD(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - } - } else if (st.own_mask) { - /* There is a chance that we've found a context id */ - /* Find_and_allocate_context_id updates the context_mask if it finds a match */ - *context_id = find_and_allocate_context_id(st.local_mask); - MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "Context id is now %d", *context_id); - - mask_in_use = 0; - - if (*context_id > 0) { - /* If we found a new context id, we have to remove the element from the list, so the - * next allocation can own the mask */ - if (next_gcn == &st) { - next_gcn = st.next; - } else { - for (tmp = next_gcn; tmp->next != &st; tmp = tmp->next); /* avoid compiler warnings */ - tmp->next = st.next; - } - } else { - /* else we did not find a context id. Give up the mask in case - * there is another thread in the gcn_next_list - * waiting for it. We need to ensure that any other threads - * have the opportunity to run, hence yielding */ - /* FIXME: Do we need to do an GLOBAL yield here? - * When we do a collective operation, we anyway yield - * for other others */ - MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX); - MPID_THREAD_CS_YIELD(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - } - } else { - /* As above, force this thread to yield */ - /* FIXME: Do we need to do an GLOBAL yield here? When we - * do a collective operation, we anyway yield for other - * others */ - MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX); - MPID_THREAD_CS_YIELD(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - } - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - - /* Test for context ID exhaustion: All threads that will participate in - * the new communicator owned the mask and could not allocate a context - * ID. This indicates that either some process has no context IDs - * available, or that some are available, but the allocation cannot - * succeed because there is no common context ID. */ - if (*context_id == 0 && st.local_mask[ALL_OWN_MASK_FLAG] == 1) { - /* --BEGIN ERROR HANDLING-- */ - int nfree = 0; - int ntotal = 0; - int minfree; - - if (st.own_mask) { - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - mask_in_use = 0; - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - } + mpi_errno = gcn_allreduce(&st); + MPIR_ERR_CHECK(mpi_errno); - context_mask_stats(&nfree, &ntotal); - if (ignore_id) - minfree = INT_MAX; - else - minfree = nfree; + *context_id = gcn_allocate_cid(&st); - if (group_ptr != NULL) { - int coll_tag = tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ - mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, - MPI_MIN, comm_ptr, group_ptr, coll_tag, - MPIR_COLL_ATTR_SYNC); - } else { - mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, - MPI_MIN, comm_ptr, MPIR_COLL_ATTR_SYNC); - } + if (*context_id == 0) { + mpi_errno = st.error; + MPIR_ERR_CHECK(mpi_errno); - if (minfree > 0) { - MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, - "**toomanycommfrag", "**toomanycommfrag %d %d %d", - nfree, ntotal, ignore_id); - } else { - MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, - "**toomanycomm", "**toomanycomm %d %d %d", - nfree, ntotal, ignore_id); - } - /* --END ERROR HANDLING-- */ - } - if (st.iter == 0) { - /* to avoid deadlocks, the element is not added to the list before the first iteration */ - if (!ignore_id && *context_id == 0) { - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - add_gcn_to_list(&st); - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - } + /* we did not find a context id. Give up the mask in case + * there is another thread in the gcn_next_list + * waiting for it. We need to ensure that any other threads + * have the opportunity to run, hence yielding */ + MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX); } - st.iter++; } fn_exit: @@ -599,38 +438,12 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr MPIR_FUNC_EXIT; return mpi_errno; - /* --BEGIN ERROR HANDLING-- */ fn_fail: - /* Release the masks */ - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - if (st.own_mask) { - mask_in_use = 0; - } - /*If in list, remove it */ - if (st.iter > 0 && !ignore_id) { - if (next_gcn == &st) { - next_gcn = st.next; - } else { - for (tmp = next_gcn; tmp->next != &st; tmp = tmp->next); - tmp->next = st.next; - } - } - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - - goto fn_exit; - /* --END ERROR HANDLING-- */ } -static int async_gcn_poll(MPIX_Async_thing thing); -static int gcn_copy_mask(struct gcn_state *st); -static int gcn_allocate_cid(struct gcn_state *st); -static int gcn_allreduce(struct gcn_state *st); -static int gcn_intercomm_sendrecv(struct gcn_state *st); -static int gcn_intercomm_bcast(struct gcn_state *st); - /* Try to find a valid context id. * * If the context id is found, then broadcast it; if not, then retry the @@ -646,102 +459,59 @@ static int gcn_intercomm_bcast(struct gcn_state *st); */ static int gcn_allocate_cid(struct gcn_state *st) { - int mpi_errno = MPI_SUCCESS; - if (st->own_eager_mask) { - int newctxid = find_and_allocate_context_id(st->local_mask); - if (st->ctx0) - *st->ctx0 = newctxid; - if (st->ctx1) - *st->ctx1 = newctxid; + int context_id = 0; + + MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + context_id = locate_context_bit(st->local_mask); + if (st->own_eager_mask) { st->own_eager_mask = 0; eager_in_use = 0; } else if (st->own_mask) { - int newctxid = find_and_allocate_context_id(st->local_mask); - if (st->ctx0) - *st->ctx0 = newctxid; - if (st->ctx1) - *st->ctx1 = newctxid; - - /* reset flag for the next try */ + st->own_mask = 0; mask_in_use = 0; - /* If we found a ctx, remove element form list */ - if (newctxid > 0) { - if (next_gcn == st) { - next_gcn = st->next; - } else { - struct gcn_state *tmp; - for (tmp = next_gcn; tmp->next != st; tmp = tmp->next); - tmp->next = st->next; - } - } } - if (*st->ctx0 == 0) { - if (st->local_mask[ALL_OWN_MASK_FLAG] == 1) { - /* --BEGIN ERROR HANDLING-- */ - int nfree = 0; - int ntotal = 0; - int minfree; - context_mask_stats(&nfree, &ntotal); - minfree = nfree; - /* hzhou: we are inside a MPID_Progress_wait/test and running MPIR_Allreduce here - * rellay complicates things up. I am commenting off the code below as it seems all it - * does is to have more detail in the error message, which I don't think it worth the - * complication. - * If we are to do this, we need explain exactly how things will work out. Otherwise, - * we are just placing a mine field here. - */ - /* FIXME: study and resolve */ - /* - * mpi_errno = MPIR_Allreduce(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, MPI_MIN, st->comm_ptr, 0); - * MPIR_ERR_CHECK(mpi_errno); - */ - if (minfree > 0) { - MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, - "**toomanycommfrag", "**toomanycommfrag %d %d %d", - nfree, ntotal, minfree); - } else { - MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, - "**toomanycomm", "**toomanycomm %d %d %d", - nfree, ntotal, minfree); - } - /* --END ERROR HANDLING-- */ - } else { - /* do not own mask, try again */ - if (st->iter == 0) { - add_gcn_to_list(st); - } + if (context_id > 0) { + if (!st->ignore_id) { + int id = allocate_context_bit(context_mask, context_id); + MPIR_Assertp(id == context_id); + } + if (st->gcn_in_list) { + remove_gcn_from_list(st); + } + } else { + st->error = gcn_check_id_exhaustion(st); + if (!st->error && !st->gcn_in_list && !st->ignore_id) { + add_gcn_to_list(st); + } else if (st->error && st->gcn_in_list) { + remove_gcn_from_list(st); } } + MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + st->iter++; - fn_exit: - return mpi_errno; - fn_fail: - /* make sure that the pending allocations are scheduled */ - if (st->iter > 0) { - if (next_gcn == st) { - next_gcn = st->next; - } else { - struct gcn_state *tmp; - for (tmp = next_gcn; tmp && tmp->next != st; tmp = tmp->next); - tmp->next = st->next; - } - } - /* In the case of failure, the new communicator was half created. - * So we need to clean the memory allocated for it. */ - MPII_COMML_FORGET(st->new_comm); - MPIR_Handle_obj_free(&MPIR_Comm_mem, st->new_comm); - MPL_free(st); - goto fn_exit; + return context_id; } static int gcn_copy_mask(struct gcn_state *st) { int mpi_errno = MPI_SUCCESS; + MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - if (st->iter == 0) { + if (st->ignore_id) { + /* We are not participating in the resulting communicator, so our + * context ID space doesn't matter. Set the mask to "all available". */ + memset(st->local_mask, 0xff, MPIR_MAX_CONTEXT_MASK * sizeof(int)); + st->own_mask = 0; + /* don't need to touch mask_in_use/lowest_context_id b/c our thread + * doesn't ever need to "win" the mask */ + } else if (st->iter == 0) { + /* Deadlock avoidance: Only participate in context id loop when all + * processes have called this routine. On the first iteration, use the + * "eager" allocation protocol. + */ memset(st->local_mask, 0, (MPIR_MAX_CONTEXT_MASK + 1) * sizeof(int)); st->own_eager_mask = 0; @@ -755,11 +525,15 @@ static int gcn_copy_mask(struct gcn_state *st) st->own_eager_mask = 1; } } else { - /* Same rules as for the blocking case */ + MPIR_Assert(next_gcn != NULL); + /*If we are here, at least one element must be in the list, at least myself */ + + /* only the first element in the list can own the mask. However, maybe the mask is used + * by another thread, which added another allocation to the list before. So we have to check, + * if the mask is used and mark, if we own it */ if (mask_in_use || st != next_gcn) { memset(st->local_mask, 0, MPIR_MAX_CONTEXT_MASK * sizeof(int)); st->own_mask = 0; - st->local_mask[ALL_OWN_MASK_FLAG] = 0; } else { /* Copy safe mask segment to local_mask */ int i; @@ -769,17 +543,96 @@ static int gcn_copy_mask(struct gcn_state *st) st->local_mask[i] = context_mask[i]; mask_in_use = 1; st->own_mask = 1; - st->local_mask[ALL_OWN_MASK_FLAG] = 1; } } + + /* Note: MPIR_MAX_CONTEXT_MASK elements of local_mask are used by the + * context ID allocation algorithm. The additional element is ignored + * by the context ID mask access routines and is used as a flag for + * detecting context ID exhaustion (explained below). */ + if (st->own_mask || st->ignore_id) + st->local_mask[ALL_OWN_MASK_FLAG] = 1; + else + st->local_mask[ALL_OWN_MASK_FLAG] = 0; + + MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + return mpi_errno; +} + +static int gcn_check_id_exhaustion(struct gcn_state *st) +{ + int mpi_errno = MPI_SUCCESS; + + /* Test for context ID exhaustion: All threads that will participate in + * the new communicator owned the mask and could not allocate a context + * ID. This indicates that either some process has no context IDs + * available, or that some are available, but the allocation cannot + * succeed because there is no common context ID. */ + if (st->local_mask[ALL_OWN_MASK_FLAG] == 1) { + int nfree = 0; + int ntotal = 0; + int minfree; + + /* NOTE: we only call this with CS from gcn_allocate_cid, so it's safe */ + context_mask_stats(&nfree, &ntotal); + + if (st->ignore_id) + minfree = INT_MAX; + else + minfree = nfree; + + if (!st->req_ptr) { + /* only run Allreduce if we are in a blocking call */ + if (st->group_ptr != NULL) { + int coll_tag = st->tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ + mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, + MPI_MIN, st->comm_ptr, st->group_ptr, coll_tag, + MPIR_COLL_ATTR_SYNC); + } else { + mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, + MPI_MIN, st->comm_ptr, MPIR_COLL_ATTR_SYNC); + } + } + + if (minfree > 0) { + MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, + "**toomanycommfrag", "**toomanycommfrag %d %d %d", + nfree, ntotal, st->ignore_id); + } else { + MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, + "**toomanycomm", "**toomanycomm %d %d %d", + nfree, ntotal, st->ignore_id); + } + } + + fn_fail: return mpi_errno; } static int gcn_allreduce(struct gcn_state *st) { - return MPIR_Iallreduce_tag(MPI_IN_PLACE, st->local_mask, MPIR_MAX_CONTEXT_MASK + 1, - MPIR_UINT32_T_INTERNAL, MPI_BAND, st->comm_ptr, - st->tag, &st->u.allreduce_request); + int mpi_errno; + + int count = MPIR_MAX_CONTEXT_MASK + 1; + MPI_Datatype datatype = MPIR_UINT32_T_INTERNAL; + MPI_Op op = MPI_BAND; + + if (st->req_ptr) { + mpi_errno = MPIR_Iallreduce_tag(MPI_IN_PLACE, st->local_mask, count, datatype, op, + st->comm_ptr, st->tag, &st->u.allreduce_request); + } else { + if (st->group_ptr != NULL) { + int coll_tag = st->tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ + mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, st->local_mask, count, datatype, op, + st->comm_ptr, st->group_ptr, + coll_tag, MPIR_COLL_ATTR_SYNC); + } else { + mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, st->local_mask, count, datatype, op, + st->comm_ptr, MPIR_COLL_ATTR_SYNC); + } + } + + return mpi_errno; } static int gcn_intercomm_sendrecv(struct gcn_state *st) @@ -807,8 +660,16 @@ static int gcn_complete(struct gcn_state *st) { int mpi_errno = MPI_SUCCESS; - mpi_errno = MPIR_Comm_commit(st->new_comm); - MPIR_ERR_CHECK(mpi_errno); + if (st->error) { + /* In the case of failure, the new communicator was half created. + * So we need to clean the memory allocated for it. */ + MPII_COMML_FORGET(st->new_comm); + MPIR_Handle_obj_free(&MPIR_Comm_mem, st->new_comm); + st->req_ptr->status.MPI_ERROR = st->error; + } else { + mpi_errno = MPIR_Comm_commit(st->new_comm); + MPIR_ERR_CHECK(mpi_errno); + } MPIR_Grequest_complete_impl(st->req_ptr); @@ -820,7 +681,6 @@ static int gcn_complete(struct gcn_state *st) static int async_gcn_poll(MPIX_Async_thing thing) { - int mpi_errno = MPI_SUCCESS; struct gcn_state *st = MPIR_Async_thing_get_state(thing); switch (st->stage) { @@ -840,11 +700,18 @@ static int async_gcn_poll(MPIX_Async_thing thing) } else { MPIR_Request_free(st->u.allreduce_request); - mpi_errno = gcn_allocate_cid(st); - MPIR_Assert(mpi_errno == MPI_SUCCESS); + int newctxid = gcn_allocate_cid(st); + if (newctxid) { + if (st->ctx0) + *st->ctx0 = newctxid; + if (st->ctx1) + *st->ctx1 = newctxid; + } - if (*st->ctx0 == 0) { - /* TODO: error check */ + if (st->error) { + gcn_complete(st); + return MPIX_ASYNC_DONE; + } else if (*st->ctx0 == 0) { /* retry */ st->stage = MPIR_GCN__COPYMASK; return MPIX_ASYNC_NOPROGRESS; @@ -893,8 +760,8 @@ static int async_gcn_poll(MPIX_Async_thing thing) } } - fn_fail: - return mpi_errno; + MPIR_Assert(0); + return MPIX_ASYNC_NOPROGRESS; } static int query_fn(void *extra_state, MPI_Status * status) @@ -949,8 +816,8 @@ static int cancel_fn(void *extra_state, int complete) * To avoid deadlock or livelock, it uses the same eager protocol as * multi-threaded MPIR_Get_contextid_sparse_group. */ -static int async_get_cid(MPIR_Comm * comm_ptr, MPIR_Comm * newcomm, - int *ctx0, int *ctx1, bool is_inter_comm, MPIR_Request ** req) +static int async_get_cid(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR_Comm * comm_inter, + MPIR_Request ** req) { int mpi_errno = MPI_SUCCESS; MPIR_CHKPMEM_DECL(); @@ -962,34 +829,31 @@ static int async_get_cid(MPIR_Comm * comm_ptr, MPIR_Comm * newcomm, MPIR_CHKPMEM_MALLOC(st, sizeof(struct gcn_state), MPL_MEM_COMM); int tag; - mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); + if (comm_inter) { + /* We need get the tag from the parent intercomm because the localcomm (comm_ptr) shares + * the same context_id as the parent. */ + mpi_errno = MPIR_Sched_next_tag(comm_inter, &tag); + } else { + mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); + } MPIR_ERR_CHECK(mpi_errno); - st->tag = tag; - st->ctx0 = ctx0; - st->ctx1 = ctx1; - if (!is_inter_comm) { - st->comm_ptr = comm_ptr; - st->comm_ptr_inter = NULL; + gcn_init(st, comm_ptr, tag, false /* ignore_id */ , NULL /* group_ptr */); + + st->new_comm = newcommp; + st->comm_ptr_inter = comm_inter; + st->ctx0 = &newcommp->recvcontext_id; + st->ctx1 = &newcommp->context_id; + if (comm_inter) { + st->is_inter_comm = true; } else { - st->comm_ptr = comm_ptr->local_comm; - st->comm_ptr_inter = comm_ptr; + st->is_inter_comm = false; } + st->req_ptr = *req; - st->is_inter_comm = is_inter_comm; + *(st->ctx0) = 0; - st->own_eager_mask = 0; - st->iter = 0; - st->new_comm = newcomm; - st->own_mask = 0; st->stage = MPIR_GCN__COPYMASK; - if (eager_nelem < 0) { - /* Ensure that at least one word of deadlock-free context IDs is - * always set aside for the base protocol */ - MPIR_Assert(MPIR_CVAR_CTXID_EAGER_SIZE >= 0 && - MPIR_CVAR_CTXID_EAGER_SIZE < MPIR_MAX_CONTEXT_MASK - 1); - eager_nelem = MPIR_CVAR_CTXID_EAGER_SIZE; - } mpi_errno = MPIR_Async_things_add(async_gcn_poll, st, NULL); MPIR_ERR_CHECK(mpi_errno); @@ -1006,8 +870,7 @@ int MPIR_Get_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR int mpi_errno = MPI_SUCCESS; MPIR_FUNC_ENTER; - mpi_errno = async_get_cid(comm_ptr, newcommp, &newcommp->context_id, &newcommp->recvcontext_id, - false, req); + mpi_errno = async_get_cid(comm_ptr, newcommp, NULL /* comm_inter */ , req); MPIR_ERR_CHECK(mpi_errno); fn_exit: @@ -1029,8 +892,7 @@ int MPIR_Get_intercomm_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newc MPIR_ERR_CHECK(mpi_errno); } - mpi_errno = async_get_cid(comm_ptr, newcommp, &newcommp->recvcontext_id, &newcommp->context_id, - true, req); + mpi_errno = async_get_cid(comm_ptr->local_comm, newcommp, comm_ptr, req); MPIR_ERR_CHECK(mpi_errno); fn_fail: From 81f24dbe7fc14b0c9c69ae0f2977bcd7df80efe4 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Thu, 30 Oct 2025 20:50:32 -0500 Subject: [PATCH 10/12] async_things: add MPII_async_things_pending Ch3 need be informed whether it can enter a blocking receive during progress or does it need continuously poll the progress. --- src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h | 3 +++ src/mpid/ch3/channels/sock/src/ch3_progress.c | 4 ++++ src/util/mpir_async_things.c | 4 ++++ 3 files changed, 11 insertions(+) diff --git a/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h b/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h index 00a9a876675..85bdeebf57a 100644 --- a/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h +++ b/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h @@ -84,12 +84,15 @@ static inline void MPID_nem_mpich_send_seg (void *buf, MPI_Aint count, MPI_Datat /* evaluates to TRUE if it is safe to block on recv operations in the progress * loop, FALSE otherwise */ +extern int MPII_async_things_pending; + #define MPID_nem_safe_to_block_recv() \ (!MPID_nem_local_lmt_pending && \ !MPIDI_CH3I_shm_active_send && \ !MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq) && \ !MPIDU_Sched_are_pending() && \ MPIR_Coll_safe_to_block() && \ + !MPII_async_things_pending && \ !MPIDI_RMA_Win_active_list_head) static inline int diff --git a/src/mpid/ch3/channels/sock/src/ch3_progress.c b/src/mpid/ch3/channels/sock/src/ch3_progress.c index 737d6a302f0..c893e3feb74 100644 --- a/src/mpid/ch3/channels/sock/src/ch3_progress.c +++ b/src/mpid/ch3/channels/sock/src/ch3_progress.c @@ -801,9 +801,13 @@ static int ReadMoreData(MPIDI_CH3I_Connection_t * conn, MPIR_Request * rreq) * The dynamic-library interface requires a unified Progress routine. * This is that routine. */ +extern int MPII_async_things_pending; + int MPIDI_CH3I_Progress(int blocking, MPID_Progress_state * state) { int mpi_errno; + if (MPII_async_things_pending) + blocking = 0; if (blocking) mpi_errno = MPIDI_CH3i_Progress_wait(state); else diff --git a/src/util/mpir_async_things.c b/src/util/mpir_async_things.c index 5f1c10332e8..2f539ddde4e 100644 --- a/src/util/mpir_async_things.c +++ b/src/util/mpir_async_things.c @@ -10,6 +10,8 @@ static struct MPIR_Async_thing *async_things_list[MPIR_MAX_VCIS + 1]; static MPID_Thread_mutex_t async_things_mutex[MPIR_MAX_VCIS + 1]; static int async_things_progress_hook_id[MPIR_MAX_VCIS + 1]; +int MPII_async_things_pending = 0; + int MPIR_Async_things_init(void) { int mpi_errno = MPI_SUCCESS; @@ -68,6 +70,7 @@ int MPIR_Async_things_add(int (*poll_fn) (struct MPIR_Async_thing * entry), void if (was_empty) { MPIR_Progress_hook_activate(async_things_progress_hook_id[vci]); + MPII_async_things_pending++; } return MPI_SUCCESS; @@ -103,6 +106,7 @@ int MPIR_Async_things_progress(int vci, int *made_progress) MPL_free(entry); if (async_things_list[vci] == NULL) { MPIR_Progress_hook_deactivate(async_things_progress_hook_id[vci]); + MPII_async_things_pending--; } } } From 495e529cbe5792a3097b695964e4029a22d85e87 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Fri, 31 Oct 2025 13:51:13 -0500 Subject: [PATCH 11/12] comm/context_id: organize code and better comments Re-organize code for better readability. Re-do the comments to remove stale parts and reflect the current code. --- src/include/mpir_tags.h | 3 +- src/mpi/comm/contextid.c | 986 ++++++++++++++++++++------------------- 2 files changed, 500 insertions(+), 489 deletions(-) diff --git a/src/include/mpir_tags.h b/src/include/mpir_tags.h index 40564cfeb9c..f8671156ec6 100644 --- a/src/include/mpir_tags.h +++ b/src/include/mpir_tags.h @@ -38,7 +38,8 @@ #define MPIR_REDUCE_SCATTER_BLOCK_TAG 28 #define MPIR_SHRINK_TAG 29 #define MPIR_AGREE_TAG 30 -#define MPIR_FIRST_HCOLL_TAG 31 +#define MPIR_CTXID_TAG 31 +#define MPIR_FIRST_HCOLL_TAG 32 #define MPIR_LAST_HCOLL_TAG (MPIR_FIRST_HCOLL_TAG + 255) #define MPIR_FIRST_NBC_TAG (MPIR_LAST_HCOLL_TAG + 1) diff --git a/src/mpi/comm/contextid.c b/src/mpi/comm/contextid.c index 956087be955..5528e5307f5 100644 --- a/src/mpi/comm/contextid.c +++ b/src/mpi/comm/contextid.c @@ -238,34 +238,56 @@ static int allocate_context_bit(uint32_t mask[], int id) return id; } +/* The CONTEXT ID allocation algorithms: + * A. The basic serial blocking allocation algorithm: + * 1) Each process copy the local mask (a map for all context id availability) + * 2) Run Allreduce using MPI_BAND, this results in a common mask + * 3) Call allocate_context_bit() to allocate the context_id + * B. Concurrent allocation (either from multiple threads or multiple nonblocking allocations) - + * 1) Only one allocation can use the mask at a time. Because there is no guarantee that + * all processes with the same allocation grab their masks at the same time, all need + * participate in Allreduce whether or not it owns the mask. + * 2) If an allreduce collects all the mask, it allocates an id and finish. Otherwise, it + * need retry until succeeded. + * 3) We collectively order the concurrent allocations by parentcomm's context_id and the tag + * to ensure the retry will eventually complete. + * C. Potential deadlock - user may block some allocations within a thread and inconsistently + * across processes, shown in the following diagram: + * ┌────────────────────────────────────────────────┬─────────────────────────────────────────────────┐ + * │ Process 0 │ Process 1 │ + * ├───────────────────────┬────────────────────────┼────────────────────────┬────────────────────────┤ + * │ Thread 0 │ Thread 1 │ Thread 0 │ Thread 1 │ + * │ │ │ │ │ + * │ │ +--------------------+ │ +--------------------+ │ │ + * │ │ | Dup(Comm_1) | │ | Dup(Comm_0) | │ │ + * │ +--------------------+│ | (holds mask) | │ | (holds mask) | │ +--------------------+ │ + * │ | Dup(Comm_2) |│ +--------------------+ │ +--------------------+ │ | Dup(Comm_2) | │ + * │ | (mask unavailable) |│ │ │ | (mask unavailable) | │ + * │ +--------------------+│ │ │ +--------------------+ │ + * │ │ │ │ │ + * │ [Dup(Comm_0)] │ │ │ [Dup(Comm_1)] │ + * │ (unreachable) │ │ │ (unreachable) │ + * │ │ │ │ │ + * └───────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┘ + * 1) The solution is to perform a barrier before each allocation can hold mask. In the above diagram, + * the barrier would let Comm_2 complete before Comm_1 or Comm_0 can hold mask. It is guaranteed + * by the collective semantics that barriers will not deadlock. + * 2) Instead of a barrier, alternatively we could split a portion for eager mask and try the first + * allreduce using eager mask. Allreduce will achieve the barrier synchronizatiion with an + * opportunity to complete id allocation for better performance. + * + */ + /* EAGER CONTEXT ID ALLOCATION: Attempt to allocate the context ID during the * initial synchronization step. If eager protocol fails, threads fall back to * the base algorithm. - * - * They are used to avoid deadlock in multi-threaded case. In single-threaded - * case, they are not used. */ static volatile int eager_nelem = -1; static volatile int eager_in_use = 0; -/* In multi-threaded case, mask_in_use is used to maintain thread safety. In - * single-threaded case, it is always 0. */ +/* Only one allocation can own the mask at a time. */ static volatile int mask_in_use = 0; -/* In multi-threaded case, lowest_context_id is used to prioritize access when - * multiple threads are contending for the mask, lowest_tag is used to break - * ties when MPI_Comm_create_group is invoked my multiple threads on the same - * parent communicator. In single-threaded case, lowest_context_id is always - * set to parent context id in sched_cb_gcn_copy_mask and lowest_tag is not - * used. - */ - -int MPIR_Get_contextid_sparse(MPIR_Comm * comm_ptr, int *context_id, int ignore_id) -{ - return MPIR_Get_contextid_sparse_group(comm_ptr, NULL /*group_ptr */ , - MPIR_Process.attrs.tag_ub /*tag */ , - context_id, ignore_id); -} struct gcn_state { int *ctx0; @@ -285,6 +307,7 @@ struct gcn_state { int error; uint32_t local_mask[MPIR_MAX_CONTEXT_MASK + 1]; struct gcn_state *next; + /* fields needed by the nonblocking algorithm */ enum { MPIR_GCN__COPYMASK, MPIR_GCN__ALLREDUCE, @@ -299,87 +322,33 @@ struct gcn_state { }; struct gcn_state *next_gcn = NULL; +/* routines used by both blocking and nonblocking algorithms */ static void gcn_init(struct gcn_state *st, MPIR_Comm * comm_ptr, int tag, bool ignore_id, MPIR_Group * group_ptr); static void add_gcn_to_list(struct gcn_state *st); static void remove_gcn_from_list(struct gcn_state *st); static int gcn_copy_mask(struct gcn_state *st); static int gcn_allocate_cid(struct gcn_state *st); -static int gcn_allreduce(struct gcn_state *st); static int gcn_check_id_exhaustion(struct gcn_state *st); +/* routines used by the nonblocking algorithm */ +static int async_get_cid(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR_Comm * comm_inter, + MPIR_Request ** req); +static int async_gcn_poll(MPIX_Async_thing thing); +static int gcn_allreduce(struct gcn_state *st); static int gcn_intercomm_sendrecv(struct gcn_state *st); static int gcn_intercomm_bcast(struct gcn_state *st); +static int gcn_complete(struct gcn_state *st); -static void gcn_init(struct gcn_state *st, MPIR_Comm * comm_ptr, int tag, bool ignore_id, - MPIR_Group * group_ptr) -{ - if (eager_nelem < 0) { - /* Ensure that at least one word of deadlock-free context IDs is - * always set aside for the base protocol */ - MPIR_Assert(MPIR_CVAR_CTXID_EAGER_SIZE >= 0 && - MPIR_CVAR_CTXID_EAGER_SIZE < MPIR_MAX_CONTEXT_MASK - 1); - eager_nelem = MPIR_CVAR_CTXID_EAGER_SIZE; - } - - MPIR_Assert(comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM); - - st->iter = 0; - st->comm_ptr = comm_ptr; - st->tag = tag; - st->ignore_id = ignore_id; - st->group_ptr = group_ptr; - st->own_mask = 0; - st->own_eager_mask = 0; - st->req_ptr = NULL; - st->error = 0; - st->gcn_in_list = false; -} - -/* All pending context_id allocations are added to a list. The context_id allocations are ordered - * according to the context_id of of parent communicator and the tag, wherby blocking context_id - * allocations can have the same tag, while nonblocking operations cannot. In the non-blocking - * case, the user is responsible for the right tags if "comm_create_group" is used */ -static void add_gcn_to_list(struct gcn_state *st) -{ - /* we'll only call this from gcn_allocate_cid within CS */ - struct gcn_state *tmp = NULL; - if (next_gcn == NULL) { - next_gcn = st; - st->next = NULL; - } else if (next_gcn->comm_ptr->context_id > st->comm_ptr->context_id || - (next_gcn->comm_ptr->context_id == st->comm_ptr->context_id && - next_gcn->tag > st->tag)) { - st->next = next_gcn; - next_gcn = st; - } else { - for (tmp = next_gcn; - tmp->next != NULL && - ((st->comm_ptr->context_id > tmp->next->comm_ptr->context_id) || - ((st->comm_ptr->context_id == tmp->next->comm_ptr->context_id) && - (st->tag >= tmp->next->tag))); tmp = tmp->next); - - st->next = tmp->next; - tmp->next = st; - } - st->gcn_in_list = true; -} - -static void remove_gcn_from_list(struct gcn_state *st) +int MPIR_Get_contextid_sparse(MPIR_Comm * comm_ptr, int *context_id, int ignore_id) { - /* we'll only call this from gcn_allocate_cid within CS */ - if (next_gcn == st) { - next_gcn = st->next; - } else { - struct gcn_state *tmp; - for (tmp = next_gcn; tmp->next != st; tmp = tmp->next); - tmp->next = st->next; - } - st->gcn_in_list = false; + return MPIR_Get_contextid_sparse_group(comm_ptr, NULL /*group_ptr */ , + MPIR_Process.attrs.tag_ub /*tag */ , + context_id, ignore_id); } /* Allocates a new context ID collectively over the given communicator. This * routine is "sparse" in the sense that while it is collective, some processes - * may not care about the value selected context ID. + * may not care about the value selected context ID (ignore_id==TRUE). * * One example of this case is processes who pass MPI_UNDEFINED as the color * value to MPI_Comm_split. Such processes should pass ignore_id==TRUE to @@ -406,10 +375,6 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr *context_id = 0; - MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE, (MPL_DBG_FDEST, - "Entering; shared state is %d:%d, my ctx id is %d, tag=%d", - mask_in_use, eager_in_use, comm_ptr->context_id, tag)); - while (*context_id == 0) { mpi_errno = gcn_copy_mask(&st); MPIR_ERR_CHECK(mpi_errno); @@ -423,10 +388,6 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr mpi_errno = st.error; MPIR_ERR_CHECK(mpi_errno); - /* we did not find a context id. Give up the mask in case - * there is another thread in the gcn_next_list - * waiting for it. We need to ensure that any other threads - * have the opportunity to run, hence yielding */ MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX); } } @@ -434,7 +395,6 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr fn_exit: if (ignore_id) *context_id = MPIR_INVALID_CONTEXT_ID; - MPL_DBG_MSG_S(MPIR_DBG_COMM, VERBOSE, "Context mask = %s", context_mask_to_str()); MPIR_FUNC_EXIT; return mpi_errno; @@ -442,328 +402,397 @@ int MPIR_Get_contextid_sparse_group(MPIR_Comm * comm_ptr, MPIR_Group * group_ptr goto fn_exit; } - - -/* Try to find a valid context id. - * - * If the context id is found, then broadcast it; if not, then retry the - * nonblocking context id allocation algorithm again. - * - * Note the subtle difference on thread handling between the nonblocking - * algorithm (sched_cb_gcn_allocate_cid) and the blocking algorithm - * (MPIR_Get_contextid_sparse_group). In nonblocking algorithm, there is no - * need to yield to another thread because this thread will not block the - * progress. On the contrary, unnecessary yield will allow other threads to - * execute and insert wrong order of entries to the nonblocking schedule and - * cause errors. - */ -static int gcn_allocate_cid(struct gcn_state *st) +/* Get a context for a new intercomm. + (a) Each local group gets a context_id using local_comm. + (b) Exchange context_id using parent intercomm using MPIC_Sendrecv. +*/ +int MPIR_Get_intercomm_contextid(MPIR_Comm * comm_ptr, int *context_id, int *recvcontext_id) { - int context_id = 0; + int mycontext_id, remote_context_id; + int mpi_errno = MPI_SUCCESS; + int tag = MPIR_CTXID_TAG; - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + MPIR_FUNC_ENTER; - context_id = locate_context_bit(st->local_mask); - if (st->own_eager_mask) { - st->own_eager_mask = 0; - eager_in_use = 0; - } else if (st->own_mask) { - st->own_mask = 0; - mask_in_use = 0; + if (!comm_ptr->local_comm) { + /* Manufacture the local communicator */ + mpi_errno = MPII_Setup_intercomm_localcomm(comm_ptr); + MPIR_ERR_CHECK(mpi_errno); } - if (context_id > 0) { - if (!st->ignore_id) { - int id = allocate_context_bit(context_mask, context_id); - MPIR_Assertp(id == context_id); - } - if (st->gcn_in_list) { - remove_gcn_from_list(st); - } - } else { - st->error = gcn_check_id_exhaustion(st); - if (!st->error && !st->gcn_in_list && !st->ignore_id) { - add_gcn_to_list(st); - } else if (st->error && st->gcn_in_list) { - remove_gcn_from_list(st); - } + mpi_errno = MPIR_Get_contextid_sparse(comm_ptr->local_comm, &mycontext_id, FALSE); + MPIR_ERR_CHECK(mpi_errno); + MPIR_Assert(mycontext_id != 0); + + /* MPIC routine uses an internal context id. The local leads (process 0) + * exchange data */ + remote_context_id = MPIR_INVALID_CONTEXT_ID; + if (comm_ptr->rank == 0) { + mpi_errno = MPIC_Sendrecv(&mycontext_id, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, tag, + &remote_context_id, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, tag, + comm_ptr, MPI_STATUS_IGNORE, MPIR_COLL_ATTR_SYNC); + MPIR_ERR_CHECK(mpi_errno); } - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - st->iter++; + /* Make sure that all of the local processes now have this + * id */ + mpi_errno = MPIR_Bcast_impl(&remote_context_id, 1, MPIR_CONTEXT_ID_T_DATATYPE, + 0, comm_ptr->local_comm, MPIR_COLL_ATTR_SYNC); + MPIR_ERR_CHECK(mpi_errno); + /* The recvcontext_id must be the one that was allocated out of the local + * group, not the remote group. Otherwise we could end up posting two + * MPI_ANY_SOURCE,MPI_ANY_TAG recvs on the same context IDs even though we + * are attempting to post them for two separate communicators. */ + *context_id = remote_context_id; + *recvcontext_id = mycontext_id; - return context_id; + fn_fail: + MPIR_FUNC_EXIT; + return mpi_errno; } -static int gcn_copy_mask(struct gcn_state *st) +int MPIR_Get_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR_Request ** req) { int mpi_errno = MPI_SUCCESS; - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + MPIR_FUNC_ENTER; - if (st->ignore_id) { - /* We are not participating in the resulting communicator, so our - * context ID space doesn't matter. Set the mask to "all available". */ - memset(st->local_mask, 0xff, MPIR_MAX_CONTEXT_MASK * sizeof(int)); - st->own_mask = 0; - /* don't need to touch mask_in_use/lowest_context_id b/c our thread - * doesn't ever need to "win" the mask */ - } else if (st->iter == 0) { - /* Deadlock avoidance: Only participate in context id loop when all - * processes have called this routine. On the first iteration, use the - * "eager" allocation protocol. - */ - memset(st->local_mask, 0, (MPIR_MAX_CONTEXT_MASK + 1) * sizeof(int)); - st->own_eager_mask = 0; + mpi_errno = async_get_cid(comm_ptr, newcommp, NULL /* comm_inter */ , req); + MPIR_ERR_CHECK(mpi_errno); - /* Attempt to reserve the eager mask segment */ - if (!eager_in_use && eager_nelem > 0) { - int i; - for (i = 0; i < eager_nelem; i++) - st->local_mask[i] = context_mask[i]; + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} - eager_in_use = 1; - st->own_eager_mask = 1; - } - } else { - MPIR_Assert(next_gcn != NULL); - /*If we are here, at least one element must be in the list, at least myself */ +int MPIR_Get_intercomm_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, + MPIR_Request ** req) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_FUNC_ENTER; - /* only the first element in the list can own the mask. However, maybe the mask is used - * by another thread, which added another allocation to the list before. So we have to check, - * if the mask is used and mark, if we own it */ - if (mask_in_use || st != next_gcn) { - memset(st->local_mask, 0, MPIR_MAX_CONTEXT_MASK * sizeof(int)); - st->own_mask = 0; - } else { - /* Copy safe mask segment to local_mask */ - int i; - for (i = 0; i < eager_nelem; i++) - st->local_mask[i] = 0; - for (i = eager_nelem; i < MPIR_MAX_CONTEXT_MASK; i++) - st->local_mask[i] = context_mask[i]; - mask_in_use = 1; - st->own_mask = 1; - } + /* do as much local setup as possible */ + if (!comm_ptr->local_comm) { + mpi_errno = MPII_Setup_intercomm_localcomm(comm_ptr); + MPIR_ERR_CHECK(mpi_errno); } - /* Note: MPIR_MAX_CONTEXT_MASK elements of local_mask are used by the - * context ID allocation algorithm. The additional element is ignored - * by the context ID mask access routines and is used as a flag for - * detecting context ID exhaustion (explained below). */ - if (st->own_mask || st->ignore_id) - st->local_mask[ALL_OWN_MASK_FLAG] = 1; - else - st->local_mask[ALL_OWN_MASK_FLAG] = 0; + mpi_errno = async_get_cid(comm_ptr->local_comm, newcommp, comm_ptr, req); + MPIR_ERR_CHECK(mpi_errno); - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + fn_fail: + MPIR_FUNC_EXIT; return mpi_errno; } -static int gcn_check_id_exhaustion(struct gcn_state *st) +void MPIR_Free_contextid(int context_id) { - int mpi_errno = MPI_SUCCESS; - - /* Test for context ID exhaustion: All threads that will participate in - * the new communicator owned the mask and could not allocate a context - * ID. This indicates that either some process has no context IDs - * available, or that some are available, but the allocation cannot - * succeed because there is no common context ID. */ - if (st->local_mask[ALL_OWN_MASK_FLAG] == 1) { - int nfree = 0; - int ntotal = 0; - int minfree; + int idx, bitpos, raw_prefix; - /* NOTE: we only call this with CS from gcn_allocate_cid, so it's safe */ - context_mask_stats(&nfree, &ntotal); + MPIR_FUNC_ENTER; - if (st->ignore_id) - minfree = INT_MAX; - else - minfree = nfree; + /* Convert the context id to the bit position */ + raw_prefix = MPIR_CONTEXT_READ_FIELD(PREFIX, context_id); + idx = raw_prefix / MPIR_CONTEXT_INT_BITS; + bitpos = raw_prefix % MPIR_CONTEXT_INT_BITS; - if (!st->req_ptr) { - /* only run Allreduce if we are in a blocking call */ - if (st->group_ptr != NULL) { - int coll_tag = st->tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ - mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, - MPI_MIN, st->comm_ptr, st->group_ptr, coll_tag, - MPIR_COLL_ATTR_SYNC); - } else { - mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, - MPI_MIN, st->comm_ptr, MPIR_COLL_ATTR_SYNC); - } - } + /* --BEGIN ERROR HANDLING-- */ + if (idx < 0 || idx >= MPIR_MAX_CONTEXT_MASK) { + MPID_Abort(0, MPI_ERR_INTERN, 1, "In MPIR_Free_contextid, idx is out of range"); + } + /* --END ERROR HANDLING-- */ - if (minfree > 0) { - MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, - "**toomanycommfrag", "**toomanycommfrag %d %d %d", - nfree, ntotal, st->ignore_id); - } else { - MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, - "**toomanycomm", "**toomanycomm %d %d %d", - nfree, ntotal, st->ignore_id); + /* The low order bits for dynamic context IDs don't have meaning the + * same way that low bits of non-dynamic ctx IDs do. So we have to + * check the dynamic case first. */ + if (MPIR_CONTEXT_READ_FIELD(DYNAMIC_PROC, context_id)) { + MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "skipping dynamic process ctx id, context_id=%d", + context_id); + goto fn_exit; + } else { /* non-dynamic context ID */ + /* In terms of the context ID bit vector, intercomms and their constituent + * localcomms have the same value. To avoid a double-free situation we just + * don't free the context ID for localcomms and assume it will be cleaned up + * when the parent intercomm is itself completely freed. */ + if (MPIR_CONTEXT_READ_FIELD(IS_LOCALCOMM, context_id)) { +#ifdef MPL_USE_DBG_LOGGING + char dump_str[1024]; + dump_context_id(context_id, dump_str, sizeof(dump_str)); + MPL_DBG_MSG_S(MPIR_DBG_COMM, VERBOSE, "skipping localcomm id: %s", dump_str); +#endif + goto fn_exit; + } else if (MPIR_CONTEXT_READ_FIELD(SUBCOMM, context_id)) { + MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, + "skipping non-parent communicator ctx id, context_id=%d", context_id); + goto fn_exit; } } - fn_fail: - return mpi_errno; -} + /* --BEGIN ERROR HANDLING-- */ + /* Check that this context id has been allocated */ + if ((context_mask[idx] & (0x1U << bitpos)) != 0) { +#ifdef MPL_USE_DBG_LOGGING + char dump_str[1024]; + dump_context_id(context_id, dump_str, sizeof(dump_str)); + MPL_DBG_MSG_S(MPIR_DBG_COMM, VERBOSE, "context dump: %s", dump_str); + MPL_DBG_MSG_S(MPIR_DBG_COMM, VERBOSE, "context mask = %s", context_mask_to_str()); +#endif + MPID_Abort(0, MPI_ERR_INTERN, 1, "In MPIR_Free_contextid, the context id is not in use"); + } + /* --END ERROR HANDLING-- */ -static int gcn_allreduce(struct gcn_state *st) -{ - int mpi_errno; + MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + /* MT: Note that this update must be done atomically in the multithreaedd + * case. In the "one, single lock" implementation, that lock is indeed + * held when this operation is called. */ + context_mask[idx] |= (0x1U << bitpos); + MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - int count = MPIR_MAX_CONTEXT_MASK + 1; - MPI_Datatype datatype = MPIR_UINT32_T_INTERNAL; - MPI_Op op = MPI_BAND; + MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE, + (MPL_DBG_FDEST, + "Freed context %d, mask[%d] bit %d (prefix=%#x)", + context_id, idx, bitpos, raw_prefix)); + fn_exit: + MPIR_FUNC_EXIT; +} - if (st->req_ptr) { - mpi_errno = MPIR_Iallreduce_tag(MPI_IN_PLACE, st->local_mask, count, datatype, op, - st->comm_ptr, st->tag, &st->u.allreduce_request); - } else { - if (st->group_ptr != NULL) { - int coll_tag = st->tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ - mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, st->local_mask, count, datatype, op, - st->comm_ptr, st->group_ptr, - coll_tag, MPIR_COLL_ATTR_SYNC); - } else { - mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, st->local_mask, count, datatype, op, - st->comm_ptr, MPIR_COLL_ATTR_SYNC); - } +/* internal routines used by both blocking and nonblocking algorithms */ + +static void gcn_init(struct gcn_state *st, MPIR_Comm * comm_ptr, int tag, bool ignore_id, + MPIR_Group * group_ptr) +{ + if (eager_nelem < 0) { + /* Ensure that at least one word of deadlock-free context IDs is + * always set aside for the base protocol */ + MPIR_Assert(MPIR_CVAR_CTXID_EAGER_SIZE >= 0 && + MPIR_CVAR_CTXID_EAGER_SIZE < MPIR_MAX_CONTEXT_MASK - 1); + eager_nelem = MPIR_CVAR_CTXID_EAGER_SIZE; } - return mpi_errno; + MPIR_Assert(comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM); + + st->iter = 0; + st->comm_ptr = comm_ptr; + st->tag = tag; + st->ignore_id = ignore_id; + st->group_ptr = group_ptr; + st->own_mask = 0; + st->own_eager_mask = 0; + st->req_ptr = NULL; + st->error = 0; + st->gcn_in_list = false; } -static int gcn_intercomm_sendrecv(struct gcn_state *st) +/* All pending context_id allocations are added to a list. The context_id allocations are ordered + * according to the context_id of of parent communicator and the tag, wherby blocking context_id + * allocations can have the same tag, while nonblocking operations cannot. In the non-blocking + * case, the user is responsible for the right tags if "comm_create_group" is used */ +static void add_gcn_to_list(struct gcn_state *st) { - int mpi_errno = MPI_SUCCESS; - - mpi_errno = MPIC_Irecv(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, - 0, st->tag, st->comm_ptr_inter, &st->u.sendrecv_reqs[0]); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIC_Isend(st->ctx0, 1, MPIR_CONTEXT_ID_T_DATATYPE, - 0, st->tag, st->comm_ptr_inter, &st->u.sendrecv_reqs[1], 0); - MPIR_ERR_CHECK(mpi_errno); + /* we'll only call this from gcn_allocate_cid within CS */ + struct gcn_state *tmp = NULL; + if (next_gcn == NULL) { + next_gcn = st; + st->next = NULL; + } else if (next_gcn->comm_ptr->context_id > st->comm_ptr->context_id || + (next_gcn->comm_ptr->context_id == st->comm_ptr->context_id && + next_gcn->tag > st->tag)) { + st->next = next_gcn; + next_gcn = st; + } else { + for (tmp = next_gcn; + tmp->next != NULL && + ((st->comm_ptr->context_id > tmp->next->comm_ptr->context_id) || + ((st->comm_ptr->context_id == tmp->next->comm_ptr->context_id) && + (st->tag >= tmp->next->tag))); tmp = tmp->next); - fn_fail: - return mpi_errno; + st->next = tmp->next; + tmp->next = st; + } + st->gcn_in_list = true; } -static int gcn_intercomm_bcast(struct gcn_state *st) +static void remove_gcn_from_list(struct gcn_state *st) { - return MPIR_Ibcast_tag(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr, - st->tag, &st->u.bcast_request); + /* we'll only call this from gcn_allocate_cid within CS */ + if (next_gcn == st) { + next_gcn = st->next; + } else { + struct gcn_state *tmp; + for (tmp = next_gcn; tmp->next != st; tmp = tmp->next); + tmp->next = st->next; + } + st->gcn_in_list = false; } -static int gcn_complete(struct gcn_state *st) +static int gcn_copy_mask(struct gcn_state *st) { int mpi_errno = MPI_SUCCESS; + MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - if (st->error) { - /* In the case of failure, the new communicator was half created. - * So we need to clean the memory allocated for it. */ - MPII_COMML_FORGET(st->new_comm); - MPIR_Handle_obj_free(&MPIR_Comm_mem, st->new_comm); - st->req_ptr->status.MPI_ERROR = st->error; + if (st->ignore_id) { + /* We are not participating in the resulting communicator, so our + * context ID space doesn't matter. Set the mask to "all available". */ + memset(st->local_mask, 0xff, MPIR_MAX_CONTEXT_MASK * sizeof(int)); + st->own_mask = 0; + /* don't need to touch mask_in_use/lowest_context_id b/c our thread + * doesn't ever need to "win" the mask */ + } else if (st->iter == 0) { + /* Deadlock avoidance: Only participate in context id loop when all + * processes have called this routine. On the first iteration, use the + * "eager" allocation protocol. + */ + memset(st->local_mask, 0, (MPIR_MAX_CONTEXT_MASK + 1) * sizeof(int)); + st->own_eager_mask = 0; + + /* Attempt to reserve the eager mask segment */ + if (!eager_in_use && eager_nelem > 0) { + int i; + for (i = 0; i < eager_nelem; i++) + st->local_mask[i] = context_mask[i]; + + eager_in_use = 1; + st->own_eager_mask = 1; + } } else { - mpi_errno = MPIR_Comm_commit(st->new_comm); - MPIR_ERR_CHECK(mpi_errno); - } + MPIR_Assert(next_gcn != NULL); + /*If we are here, at least one element must be in the list, at least myself */ - MPIR_Grequest_complete_impl(st->req_ptr); + /* only the first element in the list can own the mask. However, maybe the mask is used + * by another thread, which added another allocation to the list before. So we have to check, + * if the mask is used and mark, if we own it */ + if (mask_in_use || st != next_gcn) { + memset(st->local_mask, 0, MPIR_MAX_CONTEXT_MASK * sizeof(int)); + st->own_mask = 0; + } else { + /* Copy safe mask segment to local_mask */ + int i; + for (i = 0; i < eager_nelem; i++) + st->local_mask[i] = 0; + for (i = eager_nelem; i < MPIR_MAX_CONTEXT_MASK; i++) + st->local_mask[i] = context_mask[i]; + mask_in_use = 1; + st->own_mask = 1; + } + } - MPL_free(st); + /* Note: MPIR_MAX_CONTEXT_MASK elements of local_mask are used by the + * context ID allocation algorithm. The additional element is ignored + * by the context ID mask access routines and is used as a flag for + * detecting context ID exhaustion (explained below). */ + if (st->own_mask || st->ignore_id) + st->local_mask[ALL_OWN_MASK_FLAG] = 1; + else + st->local_mask[ALL_OWN_MASK_FLAG] = 0; - fn_fail: + MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); return mpi_errno; } -static int async_gcn_poll(MPIX_Async_thing thing) +/* Try to find a valid context id. + * + * If the context id is found, then broadcast it; if not, then retry the + * nonblocking context id allocation algorithm again. + * + * Note the subtle difference on thread handling between the nonblocking + * algorithm (sched_cb_gcn_allocate_cid) and the blocking algorithm + * (MPIR_Get_contextid_sparse_group). In nonblocking algorithm, there is no + * need to yield to another thread because this thread will not block the + * progress. On the contrary, unnecessary yield will allow other threads to + * execute and insert wrong order of entries to the nonblocking schedule and + * cause errors. + */ +static int gcn_allocate_cid(struct gcn_state *st) { - struct gcn_state *st = MPIR_Async_thing_get_state(thing); - - switch (st->stage) { - case MPIR_GCN__COPYMASK: - st->error = gcn_copy_mask(st); - MPIR_Assert(st->error == MPI_SUCCESS); - - st->error = gcn_allreduce(st); - MPIR_Assert(st->error == MPI_SUCCESS); + int context_id = 0; - st->stage = MPIR_GCN__ALLREDUCE; - return MPIX_ASYNC_NOPROGRESS; + MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - case MPIR_GCN__ALLREDUCE: - if (!MPIR_Request_is_complete(st->u.allreduce_request)) { - return MPIX_ASYNC_NOPROGRESS; - } else { - MPIR_Request_free(st->u.allreduce_request); + context_id = locate_context_bit(st->local_mask); + if (st->own_eager_mask) { + st->own_eager_mask = 0; + eager_in_use = 0; + } else if (st->own_mask) { + st->own_mask = 0; + mask_in_use = 0; + } - int newctxid = gcn_allocate_cid(st); - if (newctxid) { - if (st->ctx0) - *st->ctx0 = newctxid; - if (st->ctx1) - *st->ctx1 = newctxid; - } + if (context_id > 0) { + if (!st->ignore_id) { + int id = allocate_context_bit(context_mask, context_id); + MPIR_Assertp(id == context_id); + } + if (st->gcn_in_list) { + remove_gcn_from_list(st); + } + } else { + st->error = gcn_check_id_exhaustion(st); + if (!st->error && !st->gcn_in_list && !st->ignore_id) { + add_gcn_to_list(st); + } else if (st->error && st->gcn_in_list) { + remove_gcn_from_list(st); + } + } + MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - if (st->error) { - gcn_complete(st); - return MPIX_ASYNC_DONE; - } else if (*st->ctx0 == 0) { - /* retry */ - st->stage = MPIR_GCN__COPYMASK; - return MPIX_ASYNC_NOPROGRESS; - } else if (!st->is_inter_comm) { - /* done */ - gcn_complete(st); - return MPIX_ASYNC_DONE; - } else if (st->comm_ptr_inter->rank == 0) { - st->error = gcn_intercomm_sendrecv(st); - MPIR_Assert(st->error == MPI_SUCCESS); + st->iter++; - st->stage = MPIR_GCN__INTERCOMM_SENDRECV; - return MPIX_ASYNC_NOPROGRESS; - } else { - st->error = gcn_intercomm_bcast(st); - MPIR_Assert(st->error == MPI_SUCCESS); + return context_id; +} - st->stage = MPIR_GCN__INTERCOMM_BCAST; - return MPIX_ASYNC_NOPROGRESS; - } - } +static int gcn_check_id_exhaustion(struct gcn_state *st) +{ + int mpi_errno = MPI_SUCCESS; - case MPIR_GCN__INTERCOMM_SENDRECV: - if (!MPIR_Request_is_complete(st->u.sendrecv_reqs[0]) || - !MPIR_Request_is_complete(st->u.sendrecv_reqs[1])) { - return MPIX_ASYNC_NOPROGRESS; - } else { - MPIR_Request_free(st->u.sendrecv_reqs[0]); - MPIR_Request_free(st->u.sendrecv_reqs[1]); + /* Test for context ID exhaustion: All threads that will participate in + * the new communicator owned the mask and could not allocate a context + * ID. This indicates that either some process has no context IDs + * available, or that some are available, but the allocation cannot + * succeed because there is no common context ID. */ + if (st->local_mask[ALL_OWN_MASK_FLAG] == 1) { + int nfree = 0; + int ntotal = 0; + int minfree; - st->error = gcn_intercomm_bcast(st); - MPIR_Assert(st->error == MPI_SUCCESS); + /* NOTE: we only call this with CS from gcn_allocate_cid, so it's safe */ + context_mask_stats(&nfree, &ntotal); - st->stage = MPIR_GCN__INTERCOMM_BCAST; - return MPIX_ASYNC_NOPROGRESS; - } + if (st->ignore_id) + minfree = INT_MAX; + else + minfree = nfree; - case MPIR_GCN__INTERCOMM_BCAST: - if (!MPIR_Request_is_complete(st->u.bcast_request)) { - return MPIX_ASYNC_NOPROGRESS; + if (!st->req_ptr) { + /* only run Allreduce if we are in a blocking call */ + if (st->group_ptr != NULL) { + int coll_tag = st->tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ + mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, + MPI_MIN, st->comm_ptr, st->group_ptr, coll_tag, + MPIR_COLL_ATTR_SYNC); } else { - MPIR_Request_free(st->u.bcast_request); - - gcn_complete(st); - return MPIX_ASYNC_DONE; + mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, &minfree, 1, MPIR_INT_INTERNAL, + MPI_MIN, st->comm_ptr, MPIR_COLL_ATTR_SYNC); } + } + + if (minfree > 0) { + MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, + "**toomanycommfrag", "**toomanycommfrag %d %d %d", + nfree, ntotal, st->ignore_id); + } else { + MPIR_ERR_SETANDJUMP3(mpi_errno, MPI_ERR_OTHER, + "**toomanycomm", "**toomanycomm %d %d %d", + nfree, ntotal, st->ignore_id); + } } - MPIR_Assert(0); - return MPIX_ASYNC_NOPROGRESS; + fn_fail: + return mpi_errno; } +/* internal routines used by the nonblocking algorithm */ + static int query_fn(void *extra_state, MPI_Status * status) { /* status points to request->status */ @@ -865,176 +894,157 @@ static int async_get_cid(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR_Comm * goto fn_exit; } -int MPIR_Get_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR_Request ** req) +static int async_gcn_poll(MPIX_Async_thing thing) { - int mpi_errno = MPI_SUCCESS; - MPIR_FUNC_ENTER; + struct gcn_state *st = MPIR_Async_thing_get_state(thing); - mpi_errno = async_get_cid(comm_ptr, newcommp, NULL /* comm_inter */ , req); - MPIR_ERR_CHECK(mpi_errno); + switch (st->stage) { + case MPIR_GCN__COPYMASK: + st->error = gcn_copy_mask(st); + MPIR_Assert(st->error == MPI_SUCCESS); - fn_exit: - MPIR_FUNC_EXIT; - return mpi_errno; - fn_fail: - goto fn_exit; -} + st->error = gcn_allreduce(st); + MPIR_Assert(st->error == MPI_SUCCESS); -int MPIR_Get_intercomm_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, - MPIR_Request ** req) -{ - int mpi_errno = MPI_SUCCESS; - MPIR_FUNC_ENTER; + st->stage = MPIR_GCN__ALLREDUCE; + return MPIX_ASYNC_NOPROGRESS; - /* do as much local setup as possible */ - if (!comm_ptr->local_comm) { - mpi_errno = MPII_Setup_intercomm_localcomm(comm_ptr); - MPIR_ERR_CHECK(mpi_errno); - } + case MPIR_GCN__ALLREDUCE: + if (!MPIR_Request_is_complete(st->u.allreduce_request)) { + return MPIX_ASYNC_NOPROGRESS; + } else { + MPIR_Request_free(st->u.allreduce_request); - mpi_errno = async_get_cid(comm_ptr->local_comm, newcommp, comm_ptr, req); - MPIR_ERR_CHECK(mpi_errno); + int newctxid = gcn_allocate_cid(st); + if (newctxid) { + if (st->ctx0) + *st->ctx0 = newctxid; + if (st->ctx1) + *st->ctx1 = newctxid; + } - fn_fail: - MPIR_FUNC_EXIT; - return mpi_errno; -} + if (st->error) { + gcn_complete(st); + return MPIX_ASYNC_DONE; + } else if (*st->ctx0 == 0) { + /* retry */ + st->stage = MPIR_GCN__COPYMASK; + return MPIX_ASYNC_NOPROGRESS; + } else if (!st->is_inter_comm) { + /* done */ + gcn_complete(st); + return MPIX_ASYNC_DONE; + } else if (st->comm_ptr_inter->rank == 0) { + st->error = gcn_intercomm_sendrecv(st); + MPIR_Assert(st->error == MPI_SUCCESS); + st->stage = MPIR_GCN__INTERCOMM_SENDRECV; + return MPIX_ASYNC_NOPROGRESS; + } else { + st->error = gcn_intercomm_bcast(st); + MPIR_Assert(st->error == MPI_SUCCESS); -/* Get a context for a new intercomm. There are two approaches - here (for MPI-1 codes only) - (a) Each local group gets a context; the groups exchange, and - the low value is accepted and the high one returned. This - works because the context ids are taken from the same pool. - (b) Form a temporary intracomm over all processes and use that - with the regular algorithm. + st->stage = MPIR_GCN__INTERCOMM_BCAST; + return MPIX_ASYNC_NOPROGRESS; + } + } - In some ways, (a) is the better approach because it is the one that - extends to MPI-2 (where the last step, returning the context, is - not used and instead separate send and receive context id value - are kept). For this reason, we'll use (a). + case MPIR_GCN__INTERCOMM_SENDRECV: + if (!MPIR_Request_is_complete(st->u.sendrecv_reqs[0]) || + !MPIR_Request_is_complete(st->u.sendrecv_reqs[1])) { + return MPIX_ASYNC_NOPROGRESS; + } else { + MPIR_Request_free(st->u.sendrecv_reqs[0]); + MPIR_Request_free(st->u.sendrecv_reqs[1]); - Even better is to separate the local and remote context ids. Then - each group of processes can manage their context ids separately. -*/ -/* - * This uses the thread-safe (if necessary) routine to get a context id - * and does not need its own thread-safe version. - */ -int MPIR_Get_intercomm_contextid(MPIR_Comm * comm_ptr, int *context_id, int *recvcontext_id) -{ - int mycontext_id, remote_context_id; - int mpi_errno = MPI_SUCCESS; - int tag = 31567; /* FIXME - we need an internal tag or - * communication channel. Can we use a different - * context instead?. Or can we use the tag - * provided in the intercomm routine? (not on a dup, - * but in that case it can use the collective context) */ - MPIR_FUNC_ENTER; + st->error = gcn_intercomm_bcast(st); + MPIR_Assert(st->error == MPI_SUCCESS); - if (!comm_ptr->local_comm) { - /* Manufacture the local communicator */ - mpi_errno = MPII_Setup_intercomm_localcomm(comm_ptr); - MPIR_ERR_CHECK(mpi_errno); + st->stage = MPIR_GCN__INTERCOMM_BCAST; + return MPIX_ASYNC_NOPROGRESS; + } + + case MPIR_GCN__INTERCOMM_BCAST: + if (!MPIR_Request_is_complete(st->u.bcast_request)) { + return MPIX_ASYNC_NOPROGRESS; + } else { + MPIR_Request_free(st->u.bcast_request); + + gcn_complete(st); + return MPIX_ASYNC_DONE; + } } - mpi_errno = MPIR_Get_contextid_sparse(comm_ptr->local_comm, &mycontext_id, FALSE); - MPIR_ERR_CHECK(mpi_errno); - MPIR_Assert(mycontext_id != 0); + MPIR_Assert(0); + return MPIX_ASYNC_NOPROGRESS; +} - /* MPIC routine uses an internal context id. The local leads (process 0) - * exchange data */ - remote_context_id = MPIR_INVALID_CONTEXT_ID; - if (comm_ptr->rank == 0) { - mpi_errno = MPIC_Sendrecv(&mycontext_id, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, tag, - &remote_context_id, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, tag, - comm_ptr, MPI_STATUS_IGNORE, MPIR_COLL_ATTR_SYNC); - MPIR_ERR_CHECK(mpi_errno); +static int gcn_allreduce(struct gcn_state *st) +{ + int mpi_errno; + + int count = MPIR_MAX_CONTEXT_MASK + 1; + MPI_Datatype datatype = MPIR_UINT32_T_INTERNAL; + MPI_Op op = MPI_BAND; + + if (st->req_ptr) { + mpi_errno = MPIR_Iallreduce_tag(MPI_IN_PLACE, st->local_mask, count, datatype, op, + st->comm_ptr, st->tag, &st->u.allreduce_request); + } else { + if (st->group_ptr != NULL) { + int coll_tag = st->tag | MPIR_TAG_COLL_BIT; /* Shift tag into the tagged coll space */ + mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, st->local_mask, count, datatype, op, + st->comm_ptr, st->group_ptr, + coll_tag, MPIR_COLL_ATTR_SYNC); + } else { + mpi_errno = MPIR_Allreduce_impl(MPI_IN_PLACE, st->local_mask, count, datatype, op, + st->comm_ptr, MPIR_COLL_ATTR_SYNC); + } } - /* Make sure that all of the local processes now have this - * id */ - mpi_errno = MPIR_Bcast_impl(&remote_context_id, 1, MPIR_CONTEXT_ID_T_DATATYPE, - 0, comm_ptr->local_comm, MPIR_COLL_ATTR_SYNC); - MPIR_ERR_CHECK(mpi_errno); - /* The recvcontext_id must be the one that was allocated out of the local - * group, not the remote group. Otherwise we could end up posting two - * MPI_ANY_SOURCE,MPI_ANY_TAG recvs on the same context IDs even though we - * are attempting to post them for two separate communicators. */ - *context_id = remote_context_id; - *recvcontext_id = mycontext_id; - fn_fail: - MPIR_FUNC_EXIT; return mpi_errno; } -void MPIR_Free_contextid(int context_id) +static int gcn_intercomm_sendrecv(struct gcn_state *st) { - int idx, bitpos, raw_prefix; + int mpi_errno = MPI_SUCCESS; - MPIR_FUNC_ENTER; + mpi_errno = MPIC_Irecv(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, + 0, st->tag, st->comm_ptr_inter, &st->u.sendrecv_reqs[0]); + MPIR_ERR_CHECK(mpi_errno); + mpi_errno = MPIC_Isend(st->ctx0, 1, MPIR_CONTEXT_ID_T_DATATYPE, + 0, st->tag, st->comm_ptr_inter, &st->u.sendrecv_reqs[1], 0); + MPIR_ERR_CHECK(mpi_errno); - /* Convert the context id to the bit position */ - raw_prefix = MPIR_CONTEXT_READ_FIELD(PREFIX, context_id); - idx = raw_prefix / MPIR_CONTEXT_INT_BITS; - bitpos = raw_prefix % MPIR_CONTEXT_INT_BITS; + fn_fail: + return mpi_errno; +} - /* --BEGIN ERROR HANDLING-- */ - if (idx < 0 || idx >= MPIR_MAX_CONTEXT_MASK) { - MPID_Abort(0, MPI_ERR_INTERN, 1, "In MPIR_Free_contextid, idx is out of range"); - } - /* --END ERROR HANDLING-- */ +static int gcn_intercomm_bcast(struct gcn_state *st) +{ + return MPIR_Ibcast_tag(st->ctx1, 1, MPIR_CONTEXT_ID_T_DATATYPE, 0, st->comm_ptr, + st->tag, &st->u.bcast_request); +} - /* The low order bits for dynamic context IDs don't have meaning the - * same way that low bits of non-dynamic ctx IDs do. So we have to - * check the dynamic case first. */ - if (MPIR_CONTEXT_READ_FIELD(DYNAMIC_PROC, context_id)) { - MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "skipping dynamic process ctx id, context_id=%d", - context_id); - goto fn_exit; - } else { /* non-dynamic context ID */ - /* In terms of the context ID bit vector, intercomms and their constituent - * localcomms have the same value. To avoid a double-free situation we just - * don't free the context ID for localcomms and assume it will be cleaned up - * when the parent intercomm is itself completely freed. */ - if (MPIR_CONTEXT_READ_FIELD(IS_LOCALCOMM, context_id)) { -#ifdef MPL_USE_DBG_LOGGING - char dump_str[1024]; - dump_context_id(context_id, dump_str, sizeof(dump_str)); - MPL_DBG_MSG_S(MPIR_DBG_COMM, VERBOSE, "skipping localcomm id: %s", dump_str); -#endif - goto fn_exit; - } else if (MPIR_CONTEXT_READ_FIELD(SUBCOMM, context_id)) { - MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, - "skipping non-parent communicator ctx id, context_id=%d", context_id); - goto fn_exit; - } - } +static int gcn_complete(struct gcn_state *st) +{ + int mpi_errno = MPI_SUCCESS; - /* --BEGIN ERROR HANDLING-- */ - /* Check that this context id has been allocated */ - if ((context_mask[idx] & (0x1U << bitpos)) != 0) { -#ifdef MPL_USE_DBG_LOGGING - char dump_str[1024]; - dump_context_id(context_id, dump_str, sizeof(dump_str)); - MPL_DBG_MSG_S(MPIR_DBG_COMM, VERBOSE, "context dump: %s", dump_str); - MPL_DBG_MSG_S(MPIR_DBG_COMM, VERBOSE, "context mask = %s", context_mask_to_str()); -#endif - MPID_Abort(0, MPI_ERR_INTERN, 1, "In MPIR_Free_contextid, the context id is not in use"); + if (st->error) { + /* In the case of failure, the new communicator was half created. + * So we need to clean the memory allocated for it. */ + MPII_COMML_FORGET(st->new_comm); + MPIR_Handle_obj_free(&MPIR_Comm_mem, st->new_comm); + st->req_ptr->status.MPI_ERROR = st->error; + } else { + mpi_errno = MPIR_Comm_commit(st->new_comm); + MPIR_ERR_CHECK(mpi_errno); } - /* --END ERROR HANDLING-- */ - MPID_THREAD_CS_ENTER(VCI, MPIR_THREAD_VCI_CTX_MUTEX); - /* MT: Note that this update must be done atomically in the multithreaedd - * case. In the "one, single lock" implementation, that lock is indeed - * held when this operation is called. */ - context_mask[idx] |= (0x1U << bitpos); - MPID_THREAD_CS_EXIT(VCI, MPIR_THREAD_VCI_CTX_MUTEX); + MPIR_Grequest_complete_impl(st->req_ptr); - MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE, - (MPL_DBG_FDEST, - "Freed context %d, mask[%d] bit %d (prefix=%#x)", - context_id, idx, bitpos, raw_prefix)); - fn_exit: - MPIR_FUNC_EXIT; + MPL_free(st); + + fn_fail: + return mpi_errno; } From e9507f579209bc10b1e870744f7b00ace260e43d Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Sun, 2 Nov 2025 18:03:27 -0600 Subject: [PATCH 12/12] ch4: add CS_YIELD in netmod dynamic_sendrecv The dynamic_sendrecv is used in MPI_Intercomm_create. The mismatching between threads are protected by the user provided tag, thus it is okay to yield during the blocking progress. Without the yield, MPI_Intercomm_create may block another thread's progress when the remote processes are not present (blocked by other communications). In the dynamic process accept/connect path, we force peer_comm's context id to 0. This is okay because the leader exchange is established with a specific pair of addresses and there is no other communications yet during leader_exchange. --- src/mpid/ch4/ch4_api.txt | 3 ++- src/mpid/ch4/netmod/ofi/ofi_spawn.c | 23 ++++++++++++++++------- src/mpid/ch4/netmod/ucx/ucx_spawn.c | 14 ++++++++++++-- src/mpid/ch4/src/ch4_comm.c | 14 +++++++++----- src/mpid/ch4/src/ch4_spawn.c | 5 +++++ 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/mpid/ch4/ch4_api.txt b/src/mpid/ch4/ch4_api.txt index 631144bd573..a31034fe32e 100644 --- a/src/mpid/ch4/ch4_api.txt +++ b/src/mpid/ch4/ch4_api.txt @@ -91,7 +91,7 @@ Non Native API: dynamic_recv : int NM : tag, buf-2, size, timeout dynamic_sendrecv : int - NM : remote_lpid, tag, send_buf, send_size, recv_buf, recv_size, timeout + NM : remote_lpid, peer_comm, tag, send_buf, send_size, recv_buf, recv_size, timeout mpi_comm_commit_pre_hook : int NM : comm SHM : comm @@ -489,6 +489,7 @@ PARAM: origin_count: MPI_Aint origin_datatype: MPI_Datatype partner: MPIR_Request * + peer_comm: MPIR_Comm * port_name: const char * port_name-2: char * ptr: void * diff --git a/src/mpid/ch4/netmod/ofi/ofi_spawn.c b/src/mpid/ch4/netmod/ofi/ofi_spawn.c index ddd619ec297..170dbf6d362 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_spawn.c +++ b/src/mpid/ch4/netmod/ofi/ofi_spawn.c @@ -17,7 +17,7 @@ static int cancel_dynamic_request(MPIDI_OFI_dynamic_process_request_t * dynamic_req, bool is_send); static uint64_t get_dynamic_connection_match_bits(int tag); -static uint64_t get_dynamic_match_bits(MPIR_Lpid lpid, int tag); +static uint64_t get_dynamic_match_bits(MPIR_Lpid lpid, int context_id, int tag); int MPIDI_OFI_dynamic_send(MPIR_Lpid remote_lpid, int tag, const void *buf, int size, int timeout) { @@ -111,7 +111,7 @@ int MPIDI_OFI_dynamic_recv(int tag, void *buf, int size, int timeout) goto fn_exit; } -int MPIDI_OFI_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, +int MPIDI_OFI_dynamic_sendrecv(MPIR_Lpid remote_lpid, MPIR_Comm * peer_comm, int tag, const void *send_buf, int send_size, void *recv_buf, int recv_size, int timeout) { @@ -132,7 +132,7 @@ int MPIDI_OFI_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, send_req.event_id = MPIDI_OFI_EVENT_DYNPROC_DONE; if (send_size > 0) { - uint64_t match_bits = get_dynamic_match_bits(MPIR_Process.rank, tag); + uint64_t match_bits = get_dynamic_match_bits(MPIR_Process.rank, peer_comm->context_id, tag); MPIDI_OFI_CALL_RETRY(fi_tsend(MPIDI_OFI_global.ctx[ctx_idx].tx, send_buf, send_size, NULL, remote_addr, match_bits, (void *) &send_req.context), @@ -147,7 +147,7 @@ int MPIDI_OFI_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, if (recv_size > 0) { uint64_t mask_bits = 0; - uint64_t match_bits = get_dynamic_match_bits(remote_lpid, tag); + uint64_t match_bits = get_dynamic_match_bits(remote_lpid, peer_comm->recvcontext_id, tag); MPIDI_OFI_CALL_RETRY(fi_trecv(MPIDI_OFI_global.ctx[ctx_idx].rx, recv_buf, recv_size, NULL, remote_addr, match_bits, mask_bits, &recv_req.context), @@ -182,6 +182,8 @@ int MPIDI_OFI_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, break; } } + MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX); + MPID_THREAD_CS_YIELD(VCI, MPIDI_VCI_LOCK(vci)); } fn_exit: @@ -295,12 +297,13 @@ int MPIDI_OFI_insert_upid(MPIR_Lpid lpid, const char *upid, int upid_len) /* -- internal static routines */ /* NOTE: used by MPIDI_OFI_dynamic_sendrecv, exact source match */ -static uint64_t get_dynamic_match_bits(MPIR_Lpid lpid, int tag) +static uint64_t get_dynamic_match_bits(MPIR_Lpid lpid, int context_id, int tag) { /* normalize tag within (MPIDI_OFI_TAG_BITS - 1) bits, reserve 1 bit for dynamic connect/accept */ tag &= (1 << (MPIDI_OFI_TAG_BITS - 1)) - 1; - uint64_t match_bits = MPIDI_OFI_DYNPROC_SEND | tag; + uint64_t match_bits; + match_bits = context_id; if (!MPIDI_OFI_ENABLE_DATA) { /* FI_DIRECTED_RECV is not enabled, we have to embed source in the match_bits */ @@ -314,9 +317,15 @@ static uint64_t get_dynamic_match_bits(MPIR_Lpid lpid, int tag) HASH_VALUE(upid, sz, upid_hash); upid_hash &= (1 << MPIDI_OFI_SOURCE_BITS) - 1; - match_bits |= (upid_hash << MPIDI_OFI_TAG_BITS); + match_bits <<= MPIDI_OFI_SOURCE_BITS; + match_bits |= upid_hash; } + match_bits <<= MPIDI_OFI_TAG_BITS; + match_bits |= tag; + + match_bits |= MPIDI_OFI_DYNPROC_SEND; + return match_bits; } diff --git a/src/mpid/ch4/netmod/ucx/ucx_spawn.c b/src/mpid/ch4/netmod/ucx/ucx_spawn.c index 5fceaa3c27a..4974b8eae54 100644 --- a/src/mpid/ch4/netmod/ucx/ucx_spawn.c +++ b/src/mpid/ch4/netmod/ucx/ucx_spawn.c @@ -120,7 +120,12 @@ int MPIDI_UCX_dynamic_recv(int tag, void *buf, int size, int timeout) return mpi_errno; } -int MPIDI_UCX_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, +static uint64_t get_dynamic_match_bits(int context_id, int tag) +{ + return MPIDI_UCX_DYNPROC_MASK | ((uint64_t) context_id << MPIDI_UCX_TAG_BITS) | tag; +} + +int MPIDI_UCX_dynamic_sendrecv(MPIR_Lpid remote_lpid, MPIR_Comm * peer_comm, int tag, const void *send_buf, int send_size, void *recv_buf, int recv_size, int timeout) { @@ -132,7 +137,6 @@ int MPIDI_UCX_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, MPID_THREAD_ASSERT_IN_CS(VCI, MPIDI_VCI_LOCK(vci)); #endif - uint64_t ucx_tag = MPIDI_UCX_DYNPROC_MASK + tag; uint64_t tag_mask = 0xffffffffffffffff; /* for recv */ MPIDI_av_entry_t *av = MPIDIU_lpid_to_av_slow(remote_lpid); ucp_ep_h ep = MPIDI_UCX_AV_TO_EP(av, vci, vci); @@ -142,6 +146,8 @@ int MPIDI_UCX_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, /* send */ bool send_done = false; if (send_size > 0) { + uint64_t ucx_tag = get_dynamic_match_bits(peer_comm->context_id, tag); + ucp_request_param_t send_param = { .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA, .cb.send = dynamic_send_cb, @@ -163,6 +169,8 @@ int MPIDI_UCX_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, /* recv */ bool recv_done = false; if (recv_size > 0) { + uint64_t ucx_tag = get_dynamic_match_bits(peer_comm->recvcontext_id, tag); + ucp_request_param_t recv_param = { .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA, .cb.recv = dynamic_recv_cb, @@ -198,6 +206,8 @@ int MPIDI_UCX_dynamic_sendrecv(MPIR_Lpid remote_lpid, int tag, break; } } + MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX); + MPID_THREAD_CS_YIELD(VCI, MPIDI_VCI_LOCK(vci)); } fn_exit: diff --git a/src/mpid/ch4/src/ch4_comm.c b/src/mpid/ch4/src/ch4_comm.c index da7d7a9d3f9..b5e69c2ffca 100644 --- a/src/mpid/ch4/src/ch4_comm.c +++ b/src/mpid/ch4/src/ch4_comm.c @@ -323,7 +323,8 @@ int MPID_Comm_set_hints(MPIR_Comm * comm_ptr, MPIR_Info * info_ptr) * 1. leader exchange data. * 2. leader broadcast over local_comm. */ -static int leader_exchange(MPIR_Comm * local_comm, MPIR_Lpid remote_lpid, int tag, +static int leader_exchange(MPIR_Comm * local_comm, MPIR_Lpid remote_lpid, + MPIR_Comm * peer_comm, int tag, int context_id, int *remote_data_size_out, void **remote_data_out, int timeout); static int prepare_local_lpids(MPIR_Comm * local_comm, MPIR_Lpid ** lpids_out, @@ -356,7 +357,7 @@ int MPID_Intercomm_exchange(MPIR_Comm * local_comm, int local_leader, void *remote_data = NULL; if (is_local_leader) { MPIR_Lpid remote_lpid = MPIR_comm_rank_to_lpid(peer_comm, remote_leader); - mpi_errno = leader_exchange(local_comm, remote_lpid, tag, context_id, + mpi_errno = leader_exchange(local_comm, remote_lpid, peer_comm, tag, context_id, &remote_data_size, &remote_data, timeout); } @@ -624,7 +625,8 @@ static int extract_remote_data(void *remote_data, int *remote_size_out, } /* exchange data between leaders */ -static int leader_exchange(MPIR_Comm * local_comm, MPIR_Lpid remote_lpid, int tag, int context_id, +static int leader_exchange(MPIR_Comm * local_comm, MPIR_Lpid remote_lpid, + MPIR_Comm * peer_comm, int tag, int context_id, int *remote_data_size_out, void **remote_data_out, int timeout) { int mpi_errno = MPI_SUCCESS; @@ -672,14 +674,16 @@ static int leader_exchange(MPIR_Comm * local_comm, MPIR_Lpid remote_lpid, int ta /* exchange */ int remote_data_size; void *remote_data; - mpi_errno = MPIDI_NM_dynamic_sendrecv(remote_lpid, tag, &local_data_size, sizeof(int), + mpi_errno = MPIDI_NM_dynamic_sendrecv(remote_lpid, peer_comm, tag, + &local_data_size, sizeof(int), &remote_data_size, sizeof(int), timeout); MPIR_ERR_CHECK(mpi_errno); remote_data = MPL_malloc(remote_data_size, MPL_MEM_OTHER); MPIR_ERR_CHKANDJUMP(!remote_data, mpi_errno, MPI_ERR_OTHER, "**nomem"); - mpi_errno = MPIDI_NM_dynamic_sendrecv(remote_lpid, tag, local_data, local_data_size, + mpi_errno = MPIDI_NM_dynamic_sendrecv(remote_lpid, peer_comm, tag, + local_data, local_data_size, remote_data, remote_data_size, timeout); MPIR_ERR_CHECK(mpi_errno); diff --git a/src/mpid/ch4/src/ch4_spawn.c b/src/mpid/ch4/src/ch4_spawn.c index 489815720b8..05b0fde7b02 100644 --- a/src/mpid/ch4/src/ch4_spawn.c +++ b/src/mpid/ch4/src/ch4_spawn.c @@ -373,6 +373,11 @@ static int dynamic_intercomm_create(const char *port_name, MPIR_Info * info, int peer_comm->local_size = 1; peer_comm->rank = 0; peer_comm->local_group = NULL; + /* We have not exchanged context_id yet, set them to 0. This is okay since + * the dynamic exchange is established between a pair of addresses (lpids) that + * no other communications can happen yet. */ + peer_comm->context_id = 0; + peer_comm->recvcontext_id = 0; MPIR_Group_create_stride(1, 0, NULL, remote_lpid, 1, &peer_comm->remote_group);