Skip to content

Commit 869bfd5

Browse files
committed
Store the ClusterContext as a ScopedValue
1 parent e018d9c commit 869bfd5

11 files changed

Lines changed: 233 additions & 204 deletions

src/DistributedNext.jl

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ function _check_distributed_active()
8888
return false
8989
end
9090

91-
if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX.inited[]
91+
if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX[].inited[]
9292
@warn "DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other."
9393
return true
9494
else
@@ -123,7 +123,7 @@ Base.lock(l::Lockable) = lock(l.lock)
123123
Base.trylock(l::Lockable) = trylock(l.lock)
124124
Base.unlock(l::Lockable) = unlock(l.lock)
125125

126-
next_ref_id() = Threads.atomic_add!(CTX.ref_id, 1)
126+
next_ref_id() = Threads.atomic_add!(CTX[].ref_id, 1)
127127

128128
struct RRID
129129
whence::Int
@@ -149,9 +149,9 @@ include("managers.jl") # LocalManager and SSHManager
149149
include("precompile.jl")
150150

151151
# Bundles all mutable global state for a distributed cluster into a single
152-
# object. Currently a single global instance (`CTX`) is used, but multiple
153-
# independent clusters could be supported in the future.
154-
@kwdef struct ClusterContext
152+
# object. The active context is accessed via the `CTX` ScopedValue, allowing
153+
# multiple independent clusters to coexist in different task scopes.
154+
@kwdef mutable struct ClusterContext
155155
# Process identity
156156
lproc::LocalProcess = LocalProcess()
157157
role::Ref{Symbol} = Ref{Symbol}(:master)
@@ -204,11 +204,27 @@ include("precompile.jl")
204204
# Scoped value for exited callback pid
205205
exited_callback_pid::ScopedValue{Int} = ScopedValue(-1)
206206

207+
# GC messages task
208+
shutting_down::Threads.Atomic{Bool} = Threads.Atomic{Bool}(false)
209+
gc_msgs_task::Union{Task, Nothing} = nothing
210+
207211
# Stdlib watcher
208-
stdlib_watcher_timer::Ref{Union{Timer, Nothing}} = Ref{Union{Timer, Nothing}}(nothing)
212+
stdlib_watcher_timer::Union{Timer, Nothing} = nothing
209213
end
210214

211-
const CTX = ClusterContext()
215+
function Base.close(ctx::ClusterContext)
216+
ctx.shutting_down[] = true
217+
if !isnothing(ctx.gc_msgs_task)
218+
@lock ctx.any_gc_flag notify(ctx.any_gc_flag)
219+
wait(ctx.gc_msgs_task::Task)
220+
end
221+
222+
if !isnothing(ctx.stdlib_watcher_timer)
223+
close(ctx.stdlib_watcher_timer::Timer)
224+
end
225+
end
226+
227+
const CTX = ScopedValue(ClusterContext())
212228

213229
function __init__()
214230
init_parallel()
@@ -219,13 +235,14 @@ function __init__()
219235
# cluster cookie has been set, which is most likely to have been done
220236
# through Distributed.init_multi() being called by Distributed.addprocs() or
221237
# something.
222-
CTX.stdlib_watcher_timer[] = Timer(0; interval=1) do timer
238+
CTX[].stdlib_watcher_timer = Timer(0; interval=1) do timer
223239
if _check_distributed_active()
224240
close(timer)
225241
end
226242
end
227-
atexit(() -> close(CTX.stdlib_watcher_timer[]))
228243
end
244+
245+
atexit(() -> close(CTX[]))
229246
end
230247

231248
end

0 commit comments

Comments
 (0)