|
| 1 | +using Test |
| 2 | +using Distributed, Base.Threads |
| 3 | +using Base.Iterators: product |
| 4 | + |
| 5 | +exeflags = ("--startup-file=no", |
| 6 | + "--check-bounds=yes", |
| 7 | + "--depwarn=error", |
| 8 | + "--threads=2") |
| 9 | + |
| 10 | +function call_on(f, wid, tid) |
| 11 | + remotecall(wid) do |
| 12 | + t = Task(f) |
| 13 | + ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1) |
| 14 | + schedule(t) |
| 15 | + @assert threadid(t) == tid |
| 16 | + t |
| 17 | + end |
| 18 | +end |
| 19 | + |
| 20 | +# Run function on process holding the data to only serialize the result of f. |
| 21 | +# This becomes useful for things that cannot be serialized (e.g. running tasks) |
| 22 | +# or that would be unnecessarily big if serialized. |
| 23 | +fetch_from_owner(f, rr) = remotecall_fetch(f∘fetch, rr.where, rr) |
| 24 | + |
| 25 | +isdone(rr) = fetch_from_owner(istaskdone, rr) |
| 26 | +isfailed(rr) = fetch_from_owner(istaskfailed, rr) |
| 27 | + |
| 28 | +@testset "RemoteChannel allows put!/take! from thread other than 1" begin |
| 29 | + ws = ts = product(1:2, 1:2) |
| 30 | + timeout = 10.0 |
| 31 | + @testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws |
| 32 | + @testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts |
| 33 | + # We want (the default) lazyness, so that we wait for `Worker.c_state`! |
| 34 | + procs_added = addprocs(2; exeflags, lazy=true) |
| 35 | + @everywhere procs_added using Base.Threads |
| 36 | + p1 = procs_added[w1] |
| 37 | + p2 = procs_added[w2] |
| 38 | + chan_id = first(procs_added) |
| 39 | + chan = RemoteChannel(chan_id) |
| 40 | + send = call_on(p1, t1) do |
| 41 | + put!(chan, nothing) |
| 42 | + end |
| 43 | + recv = call_on(p2, t2) do |
| 44 | + take!(chan) |
| 45 | + end |
| 46 | + timedwait(() -> isdone(send) && isdone(recv), timeout) |
| 47 | + @test isdone(send) |
| 48 | + @test isdone(recv) |
| 49 | + @test !isfailed(send) |
| 50 | + @test !isfailed(recv) |
| 51 | + rmprocs(procs_added) |
| 52 | + end |
| 53 | + end |
| 54 | +end |
0 commit comments