diff --git a/base/darray.jl b/base/darray.jl index e8ca15b979f6c..207d7a8db4f9e 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -50,6 +50,13 @@ DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(),maximum(dims) # new DArray similar to an existing one DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...]) +similar(d::DArray, T, dims::Dims)= DArray(I->Array(T, map(length,I)), dims, procs(d)) +similar(d::DArray, T)= similar(d, T, size(d)) +similar{T}(d::DArray{T}, dims::Dims)= similar(d, T, dims) +similar{T}(d::DArray{T})= similar(d, T, size(d)) + +eltype{T}(d::DArray{T}) = T + size(d::DArray) = d.dims procs(d::DArray) = d.pmap @@ -298,3 +305,13 @@ map(f::Callable, d::DArray) = DArray(I->map(f, localpart(d)), d) reduce(f::Function, d::DArray) = mapreduce(fetch, f, { @spawnat p reduce(f, localpart(d)) for p in procs(d) }) + + +function map!(f::Callable, d::DArray) + @sync begin + for p in procs(d) + @spawnat p map!(f, localpart(d)) + end + end +end + diff --git a/base/sharedarray.jl b/base/sharedarray.jl index 5a9bb15c6c804..16be546e48c6f 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -211,7 +211,31 @@ function shmem_randn(dims; kwargs...) end shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...) -similar(S::SharedArray, T, dims::Dims) = similar(S.s, T, dims) +similar(S::SharedArray, T, dims::Dims) = SharedArray(T, dims; pids=procs(S)) +similar(S::SharedArray, T) = similar(S, T, size(S)) +similar(S::SharedArray, dims::Dims) = similar(S, eltype(S), dims) +similar(S::SharedArray) = similar(S, eltype(S), size(S)) + +eltype(S::SharedArray) = eltype(S.s) + +map(f::Callable, S::SharedArray) = (S2 = similar(S); S2[:] = S[:]; map!(f, S2); S2) + +reduce(f::Function, S::SharedArray) = + mapreduce(fetch, f, + { @spawnat p reduce(f, S.loc_subarr_1d) for p in procs(S) }) + + +function map!(f::Callable, S::SharedArray) + @sync begin + for p in procs(S) + @spawnat p begin + for idx in localindexes(S) + S.s[idx] = f(S.s[idx]) + end + end + end + end +end function print_shmem_limits(slen) @@ -284,3 +308,4 @@ function assert_same_host(procs) return (first_privip != getipaddr()) ? false : true end + diff --git a/test/parallel.jl b/test/parallel.jl index 73d07df09f043..606fcd77c8525 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -20,6 +20,14 @@ a = convert(Matrix{Float64}, d) @test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1] @test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101] +d=DArray(I->fill(myid(), map(length,I)), (10,10), [id_me, id_other]) +d2 = map(x->1, d) +@test reduce(+, d2) == 100 + +@test reduce(+, d) == ((50*id_me) + (50*id_other)) +map!(x->1, d) +@test reduce(+, d) == 100 + @unix_only begin @@ -84,6 +92,15 @@ A = convert(SharedArray, AA) B = convert(SharedArray, AA') @test B*A == AA'*AA +d=SharedArray(Int64, (10,10); init = D->fill!(D.loc_subarr_1d, myid()), pids=[id_me, id_other]) +d2 = map(x->1, d) +@test reduce(+, d2) == 100 + +@test reduce(+, d) == ((50*id_me) + (50*id_other)) +map!(x->1, d) +@test reduce(+, d) == 100 + + end # @unix_only(SharedArray tests)