-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathDistributedNext.jl
More file actions
152 lines (131 loc) · 4.34 KB
/
DistributedNext.jl
File metadata and controls
152 lines (131 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# This file is a part of Julia. License is MIT: https://julialang.org/license
"""
Tools for distributed parallel processing. This is a soft fork of Distributed.jl
for the purposes of testing new things before merging upstream. Here be dragons!
"""
module DistributedNext
# imports for extension
import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
hash, ==, kill, close, isopen, showerror, iterate, IteratorSize
# imports for use
using Base: Process, Semaphore, JLOptions, buffer_writes, @async_unwrap,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, shell_escape_csh,
shell_escape_wincmd, escape_microsoft_c_args,
uv_error, something, notnothing, isbuffered, mapany, SizeUnknown
using Base.Threads: Event
using Serialization, Sockets
import Serialization: serialize, deserialize
import Sockets: connect, wait_connected
# NOTE: clusterserialize.jl imports additional symbols from Serialization for use
export
@spawn,
@spawnat,
@fetch,
@fetchfrom,
@everywhere,
@distributed,
AbstractWorkerPool,
addprocs,
CachingPool,
clear!,
ClusterManager,
default_worker_pool,
init_worker,
interrupt,
launch,
manage,
myid,
nprocs,
nworkers,
pmap,
procs,
remote,
remotecall,
remotecall_fetch,
remotecall_wait,
remote_do,
rmprocs,
workers,
WorkerPool,
RemoteChannel,
Future,
WorkerConfig,
RemoteException,
ProcessExitedException,
process_messages,
remoteref_id,
channel_from_id,
worker_id_from_socket,
cluster_cookie,
start_worker,
# Used only by shared arrays.
check_same_host
function _check_distributed_active()
# Find the Distributed module if it's been loaded
distributed_pkgid = Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")
if !haskey(Base.loaded_modules, distributed_pkgid)
return false
end
if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && inited[]
@warn "DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other."
return true
else
return false
end
end
function _require_callback(mod::Base.PkgId)
if Base.toplevel_load[] && myid() == 1 && nprocs() > 1
# broadcast top-level (e.g. from Main) import/using from node 1 (only)
@sync for p in procs()
p == 1 && continue
# Extensions are already loaded on workers by their triggers being loaded
# so no need to fire the callback upon extension being loaded on master.
Base.loading_extension && continue
@async_unwrap remotecall_wait(p) do
Base.require(mod)
nothing
end
end
end
end
const REF_ID = Threads.Atomic{Int}(1)
next_ref_id() = Threads.atomic_add!(REF_ID, 1)
struct RRID
whence::Int
id::Int
RRID() = RRID(myid(), next_ref_id())
RRID(whence, id) = new(whence, id)
end
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)
include("network_interfaces.jl")
include("clusterserialize.jl")
include("cluster.jl") # cluster setup and management, addprocs
include("messages.jl")
include("process_messages.jl") # process incoming messages
include("remotecall.jl") # the remotecall* api
include("macros.jl") # @spawn and friends
include("workerpool.jl")
include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
include("precompile.jl")
function __init__()
init_parallel()
if ccall(:jl_generating_output, Cint, ()) == 0
# Start a task to watch for the Distributed stdlib being loaded and
# initialized to support multiple workers. We do this by checking if the
# cluster cookie has been set, which is most likely to have been done
# through Distributed.init_multi() being called by Distributed.addprocs() or
# something.
watcher_task = Threads.@spawn while true
if _check_distributed_active()
return
end
sleep(1)
end
errormonitor(watcher_task)
end
end
end