Skip to content

Commit b2322b5

Browse files
vchuravyjonas-schulze
authored andcommitted
Make Distributed.Worker threadsafe (#38134)
Co-authored-by: Jonas Schulze <[email protected]>
1 parent e6aca89 commit b2322b5

File tree

3 files changed

+66
-4
lines changed

3 files changed

+66
-4
lines changed

stdlib/Distributed/src/cluster.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ mutable struct Worker
9999
add_msgs::Array{Any,1}
100100
gcflag::Bool
101101
state::WorkerState
102-
c_state::Condition # wait for state changes
102+
c_state::Event # wait for state changes
103103
ct_time::Float64 # creation time
104104
conn_func::Any # used to setup connections lazily
105105

@@ -133,7 +133,7 @@ mutable struct Worker
133133
if haskey(map_pid_wrkr, id)
134134
return map_pid_wrkr[id]
135135
end
136-
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
136+
w=new(id, [], [], false, W_CREATED, Event(), time(), conn_func)
137137
w.initialized = Event()
138138
register_worker(w)
139139
w
@@ -144,7 +144,7 @@ end
144144

145145
function set_worker_state(w, state)
146146
w.state = state
147-
notify(w.c_state; all=true)
147+
notify(w.c_state)
148148
end
149149

150150
function check_worker_state(w::Worker)
@@ -190,7 +190,7 @@ function wait_for_conn(w)
190190
timeout = worker_timeout() - (time() - w.ct_time)
191191
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
192192

193-
@async (sleep(timeout); notify(w.c_state; all=true))
193+
@async (sleep(timeout); notify(w.c_state))
194194
wait(w.c_state)
195195
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
196196
end

stdlib/Distributed/test/distributed_exec.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,4 +1696,5 @@ include("splitrange.jl")
16961696
# Run topology tests last after removing all workers, since a given
16971697
# cluster at any time only supports a single topology.
16981698
rmprocs(workers())
1699+
include("threads.jl")
16991700
include("topology.jl")

stdlib/Distributed/test/threads.jl

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using Test
2+
using Distributed, Base.Threads
3+
using Base.Iterators: product
4+
5+
exeflags = ("--startup-file=no",
6+
"--check-bounds=yes",
7+
"--depwarn=error",
8+
"--threads=2")
9+
10+
function call_on(f, wid, tid)
11+
remotecall(wid) do
12+
t = Task(f)
13+
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
14+
schedule(t)
15+
@assert threadid(t) == tid
16+
t
17+
end
18+
end
19+
20+
# Run function on process holding the data to only serialize the result of f.
21+
# This becomes useful for things that cannot be serialized (e.g. running tasks)
22+
# or that would be unnecessarily big if serialized.
23+
fetch_from_owner(f, rr) = remotecall_fetch(ffetch, rr.where, rr)
24+
25+
isdone(rr) = fetch_from_owner(istaskdone, rr)
26+
isfailed(rr) = fetch_from_owner(istaskfailed, rr)
27+
28+
@testset "RemoteChannel allows put!/take! from thread other than 1" begin
29+
ws = ts = product(1:2, 1:2)
30+
@testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws
31+
@testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts
32+
# We want (the default) lazyness, so that we wait for `Worker.c_state`!
33+
procs_added = addprocs(2; exeflags, lazy=true)
34+
@everywhere procs_added using Base.Threads
35+
p1 = procs_added[w1]
36+
p2 = procs_added[w2]
37+
chan_id = first(procs_added)
38+
chan = RemoteChannel(chan_id)
39+
send = call_on(p1, t1) do
40+
put!(chan, nothing)
41+
end
42+
recv = call_on(p2, t2) do
43+
take!(chan)
44+
end
45+
46+
# Wait on the spawned tasks on the owner
47+
@sync begin
48+
@async fetch_from_owner(wait, recv)
49+
@async fetch_from_owner(wait, send)
50+
end
51+
52+
# Check the tasks
53+
@test isdone(send)
54+
@test isdone(recv)
55+
56+
@test !isfailed(send)
57+
@test !isfailed(recv)
58+
rmprocs(procs_added)
59+
end
60+
end
61+
end

0 commit comments

Comments
 (0)