Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/src/reference/pointtopoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

```@docs
MPI.Request
MPI.RequestSet
MPI.Status
```

Expand Down
8 changes: 4 additions & 4 deletions src/collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ If `comm` is an intercommunicator, then it blocks until all members of the other
$(_doc_external("MPI_Ibarrier"))
"""
function Ibarrier(comm::Comm)
req = Request()
rreq = Ref{MPI_Request}()
# int MPI_Ibarrier(MPI_Comm comm, MPI_Req req)
@mpichk ccall((:MPI_Ibarrier, libmpi), Cint, (MPI_Comm, Ptr{MPI_Request}), comm, req)
return req
@mpichk ccall((:MPI_Ibarrier, libmpi), Cint, (MPI_Comm, Ptr{MPI_Request}), comm, rreq)
return Request(rreq[])
end


Expand Down Expand Up @@ -911,7 +911,7 @@ function Neighbor_allgather!(sendbuf::Buffer, recvbuf::UBuffer, graph_comm::Comm
(MPIPtr, Cint, MPI_Datatype, MPIPtr, Cint, MPI_Datatype, MPI_Comm),
sendbuf.data, sendbuf.count, sendbuf.datatype,
recvbuf.data, recvbuf.count, recvbuf.datatype, graph_comm) v"3.0"

return recvbuf.data
end
Neighbor_allgather!(sendbuf, recvbuf::UBuffer, graph_comm::Comm) =
Expand Down
120 changes: 26 additions & 94 deletions src/nonblocking.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,31 @@ checked by other means.

See also [`Cancel!`](@ref).
"""
mutable struct Request
struct Request
val::MPI_Request
buffer
end
Base.:(==)(a::Request, b::Request) = a.val == b.val
Base.cconvert(::Type{MPI_Request}, request::Request) = request
Base.unsafe_convert(::Type{MPI_Request}, request::Request) = request.val
Base.unsafe_convert(::Type{Ptr{MPI_Request}}, request::Request) = convert(Ptr{MPI_Request}, pointer_from_objref(request))
Base.cconvert(::Type{Ptr{MPI_Request}}, request::Request) = Ref(request.val)

const REQUEST_NULL = Request(Consts.MPI_REQUEST_NULL[], nothing)
add_load_time_hook!(() -> REQUEST_NULL.val = Consts.MPI_REQUEST_NULL[])
@static if VERSION < v"1.8"
REQUEST_NULL = Request(Consts.MPI_REQUEST_NULL[])
else
REQUEST_NULL::Request = Request(Consts.MPI_REQUEST_NULL[])
end
add_load_time_hook!(() -> global REQUEST_NULL = Request(Consts.MPI_REQUEST_NULL[]))

Request() = Request(REQUEST_NULL.val, nothing)
isnull(req::Request) = req == REQUEST_NULL

function free(req::Request)
if req != REQUEST_NULL && !MPI.Finalized()
# int MPI_Request_free(MPI_Request *req)
@mpichk ccall((:MPI_Request_free, libmpi), Cint, (Ptr{MPI_Request},), req)
end
req.buffer = nothing
return nothing
end


"""
status = Probe(src::Integer, tag::Integer, comm::Comm)

Expand Down Expand Up @@ -153,9 +153,6 @@ function Wait(req::Request, status::Union{Ref{Status}, Nothing}=nothing)
@mpichk ccall((:MPI_Wait, libmpi), Cint,
(Ptr{MPI_Request}, MPIPtr),
req, something(status, Consts.MPI_STATUS_IGNORE[]))
if isnull(req) # only clear the buffer for non-persistent requests
req.buffer = nothing
end
return nothing
end
function Wait(req::Request, ::Type{Status})
Expand All @@ -181,9 +178,6 @@ function Test(req::Request, status::Union{Ref{Status}, Nothing}=nothing)
@mpichk ccall((:MPI_Test, libmpi), Cint,
(Ptr{MPI_Request}, Ptr{Cint}, Ptr{Status}),
req, flag, something(status, Consts.MPI_STATUS_IGNORE[]))
if isnull(req)
req.buffer = nothing
end
return flag[] != 0
end
function Test(req::Request, ::Type{Status})
Expand All @@ -193,56 +187,6 @@ function Test(req::Request, ::Type{Status})
end


"""
RequestSet(requests::Vector{Request})

A wrapper for an array of `Request`s that can be used to reduce intermediate memory
allocations in [`Waitall`](@ref), [`Testall`](@ref), [`Waitany`](@ref), [`Testany`](@ref),
[`Waitsome`](@ref) or [`Testsome`](@ref).

"""
mutable struct RequestSet <: AbstractVector{Request}
requests::Vector{Request}
vals::Vector{MPI_Request}
end

function RequestSet(requests::Vector{Request})
n = length(requests)
vals = Vector{MPI_Request}(undef, n)
for i = 1:n
vals[i] = requests[i].val
end
return RequestSet(requests, vals)
end

Base.length(reqs::RequestSet) = length(reqs.requests)
Base.getindex(reqs::RequestSet, i::Integer) = reqs.requests[i]
function Base.setindex!(reqs::RequestSet, req::Request, i::Integer)
reqs.vals[i] = req.val
reqs.requests[i] = req
end

function Base.push!(reqs::RequestSet, req::Request)
push!(reqs.vals, req.val)
push!(reqs.requests, req)
end


function update!(reqs::RequestSet, i::Integer)
req = reqs[i]
req.val = reqs.vals[i]
if isnull(req)
req.buffer = nothing
end
end
function update!(reqs::RequestSet)
n = length(reqs)
for i = 1:n
update!(reqs, i)
end
end


"""
Waitall(reqs::AbstractVector{Request}[, statuses::Vector{Status}])
statuses = Waitall(reqs::AbstractVector{Request}, Status)
Expand All @@ -255,24 +199,22 @@ each request.
# External links
$(_doc_external("MPI_Waitall"))
"""
function Waitall(reqs::RequestSet, statuses::Union{AbstractVector{Status},Nothing}=nothing)
function Waitall(reqs::AbstractVector{Request}, statuses::Union{AbstractVector{Status},Nothing}=nothing)
n = length(reqs)
n == 0 && return nothing
@assert isnothing(statuses) || length(statuses) >= n
# int MPI_Waitall(int count, MPI_Request array_of_requests[],
# MPI_Status array_of_statuses[])
@mpichk ccall((:MPI_Waitall, libmpi), Cint,
(Cint, Ptr{MPI_Request}, Ptr{Status}),
n, reqs.vals, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
update!(reqs)
n, reqs, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
return nothing
end
function Waitall(reqs::RequestSet, ::Type{Status})
function Waitall(reqs::AbstractVector{Request}, ::Type{Status})
statuses = Array{Status}(undef, length(reqs))
Waitall(reqs, statuses)
return statuses
end
Waitall(reqs::AbstractVector{Request}, args...) = Waitall(RequestSet(reqs), args...)


"""
Expand All @@ -289,24 +231,22 @@ each request.
# External links
$(_doc_external("MPI_Testall"))
"""
function Testall(reqs::RequestSet, statuses::Union{AbstractVector{Status},Nothing}=nothing)
function Testall(reqs::AbstractVector{Request}, statuses::Union{AbstractVector{Status},Nothing}=nothing)
n = length(reqs)
flag = Ref{Cint}()
@assert isnothing(statuses) || length(statuses) >= n
# int MPI_Testall(int count, MPI_Request array_of_requests[], int *flag,
# MPI_Status array_of_statuses[])
@mpichk ccall((:MPI_Testall, libmpi), Cint,
(Cint, Ptr{MPI_Request}, Ptr{Cint}, MPIPtr),
n, reqs.vals, flag, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
update!(reqs)
n, reqs, flag, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
return flag[] != 0
end
function Testall(reqs::RequestSet, ::Type{Status})
function Testall(reqs::AbstractVector{Request}, ::Type{Status})
statuses = Array{Status}(undef, length(reqs))
flag = Testall(reqs, statuses)
return flag, statuses
end
Testall(reqs::Vector{Request}, args...) = Testall(RequestSet(reqs), args...)

"""
i = Waitany(reqs::AbstractVector{Request}[, status::Ref{Status}])
Expand All @@ -323,28 +263,26 @@ The optional `status` argument can be used to obtain the return `Status` of the
# External links
$(_doc_external("MPI_Waitany"))
"""
function Waitany(reqs::RequestSet, status::Union{Ref{Status}, Nothing}=nothing)
function Waitany(reqs::AbstractVector{Request}, status::Union{Ref{Status}, Nothing}=nothing)
ref_idx = Ref{Cint}()
n = length(reqs)
# int MPI_Waitany(int count, MPI_Request array_of_requests[], int *index,
# MPI_Status *status)
@mpichk ccall((:MPI_Waitany, libmpi), Cint,
(Cint, Ptr{MPI_Request}, Ptr{Cint}, MPIPtr),
n, reqs.vals, ref_idx, something(status, Consts.MPI_STATUS_IGNORE[]))
n, reqs, ref_idx, something(status, Consts.MPI_STATUS_IGNORE[]))
idx = ref_idx[]
if idx == Consts.MPI_UNDEFINED[]
return nothing
end
i = Int(idx) + 1
update!(reqs, i)
return i
end
function Waitany(reqs::RequestSet, ::Type{Status})
function Waitany(reqs::AbstractVector{Request}, ::Type{Status})
status = Ref(STATUS_ZERO)
i = Waitany(reqs, status)
return i, status[]
end
Waitany(reqs::Vector{Request}, args...) = Waitany(RequestSet(reqs), args...)

"""
flag, idx = Testany(reqs::AbstractVector{Request}[, status::Ref{Status}])
Expand All @@ -364,31 +302,29 @@ The optional `status` argument can be used to obtain the return `Status` of the
# External links
$(_doc_external("MPI_Testany"))
"""
function Testany(reqs::RequestSet, status::Union{Ref{Status}, Nothing}=nothing)
function Testany(reqs::AbstractVector{Request}, status::Union{Ref{Status}, Nothing}=nothing)
ref_idx = Ref{Cint}()
rflag = Ref{Cint}()
n = length(reqs)
# int MPI_Testany(int count, MPI_Request array_of_requests[], int *index,
# int *flag, MPI_Status *status)
@mpichk ccall((:MPI_Testany, libmpi), Cint,
(Cint, Ptr{MPI_Request}, Ptr{Cint}, Ptr{Cint}, MPIPtr),
n, reqs.vals, ref_idx, rflag, something(status, Consts.MPI_STATUS_IGNORE[]))
n, reqs, ref_idx, rflag, something(status, Consts.MPI_STATUS_IGNORE[]))
idx = ref_idx[]
flag = rflag[] != 0

if idx == Consts.MPI_UNDEFINED[]
return flag, nothing
end
i = Int(idx) + 1
update!(reqs, i)
return flag, i
end
function Testany(reqs::RequestSet, ::Type{Status})
function Testany(reqs::AbstractVector{Request}, ::Type{Status})
status = Ref(STATUS_ZERO)
flag, i = Testany(reqs, status)
return flag, i, status[]
end
Testany(reqs::Vector{Request}, args...) = Testany(RequestSet(reqs), args...)

"""
inds = Waitsome(reqs::AbstractVector{Request}[, statuses::Vector{Status}])
Expand All @@ -404,7 +340,7 @@ completed request.
# External links
$(_doc_external("MPI_Waitsome"))
"""
function Waitsome(reqs::RequestSet, statuses::Union{AbstractVector{Status},Nothing}=nothing)
function Waitsome(reqs::AbstractVector{Request}, statuses::Union{AbstractVector{Status},Nothing}=nothing)
ref_nout = Ref{Cint}()
n = length(reqs)
idxs = Vector{Cint}(undef, n)
Expand All @@ -415,22 +351,20 @@ function Waitsome(reqs::RequestSet, statuses::Union{AbstractVector{Status},Nothi
# MPI_Status array_of_statuses[])
@mpichk ccall((:MPI_Waitsome, libmpi), Cint,
(Cint, Ptr{MPI_Request}, Ptr{Cint}, Ptr{Cint}, Ptr{Status}),
n, reqs.vals, ref_nout, idxs, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
n, reqs, ref_nout, idxs, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
nout = Int(ref_nout[])
# This can happen if there were no valid requests
if nout == Consts.MPI_UNDEFINED[]
return nothing
end
update!(reqs)
return [Int(idxs[i]) + 1 for i = 1:nout]
end
function Waitsome(reqs::RequestSet, ::Type{Status})
function Waitsome(reqs::AbstractVector{Request}, ::Type{Status})
statuses = Array{Status}(undef, length(reqs))
inds = Waitsome(reqs, statuses)
resize!(statuses, isnothing(inds) ? 0 : length(inds))
return inds, statuses
end
Waitsome(reqs::Vector{Request}, args...) = Waitsome(RequestSet(reqs), args...)

"""
inds = Testsome(reqs::AbstractVector{Request}[, statuses::Vector{Status}])
Expand All @@ -446,7 +380,7 @@ completed request.
# External links
$(_doc_external("MPI_Testsome"))
"""
function Testsome(reqs::RequestSet, statuses::Union{AbstractVector{Status},Nothing}=nothing)
function Testsome(reqs::AbstractVector{Request}, statuses::Union{AbstractVector{Status},Nothing}=nothing)
ref_nout = Ref{Cint}()
n = length(reqs)
idxs = Vector{Cint}(undef, n)
Expand All @@ -457,22 +391,20 @@ function Testsome(reqs::RequestSet, statuses::Union{AbstractVector{Status},Nothi
# MPI_Status array_of_statuses[])
@mpichk ccall((:MPI_Testsome, libmpi), Cint,
(Cint, Ptr{MPI_Request}, Ptr{Cint}, Ptr{Cint}, Ptr{Status}),
n, reqs.vals, ref_nout, idxs, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
n, reqs, ref_nout, idxs, something(statuses, Consts.MPI_STATUSES_IGNORE[]))
nout = Int(ref_nout[])
# This can happen if there were no valid requests
if nout == Consts.MPI_UNDEFINED[]
return nothing
end
update!(reqs)
return [Int(idxs[i]) + 1 for i = 1:nout]
end
function Testsome(reqs::RequestSet, ::Type{Status})
function Testsome(reqs::AbstractVector{Request}, ::Type{Status})
statuses = Array{Status}(undef, length(reqs))
inds = Testsome(reqs, statuses)
resize!(statuses, isnothing(inds) ? 0 : length(inds))
return inds, statuses
end
Testsome(reqs::Vector{Request}, args...) = Testsome(RequestSet(reqs), args...)

"""
Cancel!(req::Request)
Expand Down
16 changes: 6 additions & 10 deletions src/pointtopoint.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ Isend(data, comm::Comm; dest::Integer, tag::Integer=0) =
Isend(data, dest, tag, comm)

function Isend(buf::Buffer, dest::Integer, tag::Integer, comm::Comm)
req = Request()
rreq = Ref{MPI_Request}()
# int MPI_Isend(const void* buf, int count, MPI_Datatype datatype, int dest,
# int tag, MPI_Comm comm, MPI_Request *request)
@mpichk ccall((:MPI_Isend, libmpi), Cint,
(MPIPtr, Cint, MPI_Datatype, Cint, Cint, MPI_Comm, Ptr{MPI_Request}),
buf.data, buf.count, buf.datatype, dest, tag, comm, req)
req.buffer = buf
finalizer(free, req)
return req
buf.data, buf.count, buf.datatype, dest, tag, comm, rreq)
return Request(rreq[])
end
Isend(data, dest::Integer, tag::Integer, comm::Comm) =
Isend(Buffer_send(data), dest, tag, comm)
Expand Down Expand Up @@ -205,15 +203,13 @@ $(_doc_external("MPI_Irecv"))
Irecv!(recvbuf, comm::Comm; source::Integer=Consts.MPI_ANY_SOURCE[], tag::Integer=Consts.MPI_ANY_TAG[]) =
Irecv!(recvbuf, source, tag, comm)
function Irecv!(buf::Buffer, source::Integer, tag::Integer, comm::Comm)
req = Request()
rreq = Ref{MPI_Request}()
# int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source,
# int tag, MPI_Comm comm, MPI_Request *request)
@mpichk ccall((:MPI_Irecv, libmpi), Cint,
(MPIPtr, Cint, MPI_Datatype, Cint, Cint, MPI_Comm, Ptr{MPI_Request}),
buf.data, buf.count, buf.datatype, source, tag, comm, req)
req.buffer = buf
finalizer(free, req)
return req
buf.data, buf.count, buf.datatype, source, tag, comm, rreq)
return Request(rreq[])
end
Irecv!(data, source::Integer, tag::Integer, comm::Comm) =
Irecv!(Buffer(data), source, tag, comm)
Expand Down
Loading