@@ -88,7 +88,7 @@ function _check_distributed_active()
8888 return false
8989 end
9090
91- if isdefined (Base. loaded_modules[distributed_pkgid]. LPROC, :cookie ) && CTX. inited[]
91+ if isdefined (Base. loaded_modules[distributed_pkgid]. LPROC, :cookie ) && CTX[] . inited[]
9292 @warn " DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other."
9393 return true
9494 else
@@ -123,7 +123,7 @@ Base.lock(l::Lockable) = lock(l.lock)
123123Base. trylock (l:: Lockable ) = trylock (l. lock)
124124Base. unlock (l:: Lockable ) = unlock (l. lock)
125125
126- next_ref_id () = Threads. atomic_add! (CTX. ref_id, 1 )
126+ next_ref_id () = Threads. atomic_add! (CTX[] . ref_id, 1 )
127127
128128struct RRID
129129 whence:: Int
@@ -146,12 +146,11 @@ include("macros.jl") # @spawn and friends
146146include (" workerpool.jl" )
147147include (" pmap.jl" )
148148include (" managers.jl" ) # LocalManager and SSHManager
149- include (" precompile.jl" )
150149
151150# Bundles all mutable global state for a distributed cluster into a single
152- # object. Currently a single global instance ( `CTX`) is used, but multiple
153- # independent clusters could be supported in the future .
154- @kwdef struct ClusterContext
151+ # object. The active context is accessed via the `CTX` ScopedValue, allowing
152+ # multiple independent clusters to coexist in different task scopes .
153+ @kwdef mutable struct ClusterContext
155154 # Process identity
156155 lproc:: LocalProcess = LocalProcess ()
157156 role:: Ref{Symbol} = Ref {Symbol} (:master )
@@ -204,11 +203,42 @@ include("precompile.jl")
204203 # Scoped value for exited callback pid
205204 exited_callback_pid:: ScopedValue{Int} = ScopedValue (- 1 )
206205
206+ # GC messages task
207+ shutting_down:: Threads.Atomic{Bool} = Threads. Atomic {Bool} (false )
208+ gc_msgs_task:: Union{Task, Nothing} = nothing
209+
207210 # Stdlib watcher
208- stdlib_watcher_timer:: Ref{Union{Timer, Nothing}} = Ref {Union{Timer, Nothing}} (nothing )
211+ stdlib_watcher_timer:: Union{Timer, Nothing} = nothing
212+ end
213+
214+ function ClusterContext (f:: Base.Callable ; kwargs... )
215+ ctx = ClusterContext (; kwargs... )
216+ ret = @with CTX => ctx f ()
217+ close (ctx)
218+
219+ return ret
209220end
210221
211- const CTX = ClusterContext ()
222+ function Base. close (ctx:: ClusterContext )
223+ ctx. shutting_down[] = true
224+ if ! isnothing (ctx. gc_msgs_task)
225+ @lock ctx. any_gc_flag notify (ctx. any_gc_flag)
226+ wait (ctx. gc_msgs_task:: Task )
227+ end
228+
229+ if ! isnothing (ctx. stdlib_watcher_timer)
230+ close (ctx. stdlib_watcher_timer:: Timer )
231+ end
232+
233+ # Close all tracked sockets
234+ @lock ctx. map_sock_wrkr for sock in keys (ctx. map_sock_wrkr[])
235+ close (sock)
236+ end
237+ end
238+
239+ const CTX = ScopedValue (ClusterContext ())
240+
241+ include (" precompile.jl" )
212242
213243function __init__ ()
214244 init_parallel ()
@@ -219,13 +249,14 @@ function __init__()
219249 # cluster cookie has been set, which is most likely to have been done
220250 # through Distributed.init_multi() being called by Distributed.addprocs() or
221251 # something.
222- CTX. stdlib_watcher_timer[] = Timer (0 ; interval= 1 ) do timer
252+ CTX[] . stdlib_watcher_timer = Timer (0 ; interval= 1 ) do timer
223253 if _check_distributed_active ()
224254 close (timer)
225255 end
226256 end
227- atexit (() -> close (CTX. stdlib_watcher_timer[]))
228257 end
258+
259+ atexit (() -> close (CTX[]))
229260end
230261
231262end
0 commit comments