Skip to content

Commit d548dee

Browse files
authored
Merge pull request #61 from JuliaParallel/context
Giant refactor to move all global state into a `ClusterContext`
2 parents f79539f + ca7d19e commit d548dee

13 files changed

Lines changed: 270 additions & 247 deletions

docs/src/_changelog.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ CurrentModule = DistributedNext
77
This documents notable changes in DistributedNext.jl. The format is based on
88
[Keep a Changelog](https://keepachangelog.com).
99

10+
## Unreleased
11+
12+
### Changed
13+
- The internals were completely refactored to move all global variables into a
14+
single struct ([#61]). This should not be a user-visible change, but of course
15+
it's possible that some things slipped through the cracks so please open an
16+
issue if you encounter any bugs.
17+
1018
## [v1.2.0] - 2026-03-21
1119

1220
### Added

src/DistributedNext.jl

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ function _check_distributed_active()
8888
return false
8989
end
9090

91-
if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && inited[]
91+
if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX.inited[]
9292
@warn "DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other."
9393
return true
9494
else
@@ -123,8 +123,7 @@ Base.lock(l::Lockable) = lock(l.lock)
123123
Base.trylock(l::Lockable) = trylock(l.lock)
124124
Base.unlock(l::Lockable) = unlock(l.lock)
125125

126-
const REF_ID = Threads.Atomic{Int}(1)
127-
next_ref_id() = Threads.atomic_add!(REF_ID, 1)
126+
next_ref_id() = Threads.atomic_add!(CTX.ref_id, 1)
128127

129128
struct RRID
130129
whence::Int
@@ -149,10 +148,69 @@ include("pmap.jl")
149148
include("managers.jl") # LocalManager and SSHManager
150149
include("precompile.jl")
151150

152-
_stdlib_watcher_timer::Union{Timer, Nothing} = nothing
151+
# Bundles all mutable global state for a distributed cluster into a single
152+
# object. Currently a single global instance (`CTX`) is used, but multiple
153+
# independent clusters could be supported in the future.
154+
@kwdef struct ClusterContext
155+
# Process identity
156+
lproc::LocalProcess = LocalProcess()
157+
role::Ref{Symbol} = Ref{Symbol}(:master)
158+
159+
# Process group
160+
pgrp::ProcessGroup = ProcessGroup([])
161+
162+
# Worker registries
163+
map_pid_wrkr::Lockable{Dict{Int, Union{Worker, LocalProcess}}, ReentrantLock} = Lockable(Dict{Int, Union{Worker, LocalProcess}}())
164+
map_sock_wrkr::Lockable{IdDict{Any, Any}, ReentrantLock} = Lockable(IdDict())
165+
map_del_wrkr::Lockable{Set{Int}, ReentrantLock} = Lockable(Set{Int}())
166+
map_pid_statuses::Lockable{Dict{Int, Any}, ReentrantLock} = Lockable(Dict{Int, Any}())
167+
168+
# Lifecycle callbacks
169+
worker_starting_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
170+
worker_started_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
171+
worker_exiting_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
172+
worker_exited_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
173+
174+
# Cluster manager
175+
cluster_manager::Ref{ClusterManager} = Ref{ClusterManager}()
176+
177+
# Synchronization
178+
worker_lock::ReentrantLock = ReentrantLock()
179+
inited::Threads.Atomic{Bool} = Threads.Atomic{Bool}(false)
180+
next_pid::Threads.Atomic{Int} = Threads.Atomic{Int}(2) # 1 is reserved for the client (always)
181+
182+
# Remote references
183+
ref_id::Threads.Atomic{Int} = Threads.Atomic{Int}(1)
184+
# Tracks whether a particular `AbstractRemoteRef` (identified by its RRID)
185+
# exists on this worker. The `client_refs` lock is also used to synchronize
186+
# access to `.refs` and associated `clientset` state.
187+
client_refs::WeakKeyDict{AbstractRemoteRef, Nothing} = WeakKeyDict{AbstractRemoteRef, Nothing}() # used as a WeakKeySet
188+
any_gc_flag::Threads.Condition = Threads.Condition()
189+
190+
# Serialization state
191+
object_numbers::WeakKeyDict = WeakKeyDict()
192+
obj_number_salt::Ref{Int} = Ref(0)
193+
known_object_data::Dict{UInt64, Any} = Dict{UInt64, Any}()
194+
195+
# Worker pools / macros
196+
default_worker_pool::Ref{Union{AbstractWorkerPool, Nothing}} = Ref{Union{AbstractWorkerPool, Nothing}}(nothing)
197+
next_worker_idx::Threads.Atomic{Int} = Threads.Atomic{Int}(0)
198+
199+
# Network / SSH
200+
tunnel_counter::Threads.Atomic{Int} = Threads.Atomic{Int}(1)
201+
tunnel_hosts_map::Dict{String, Semaphore} = Dict{String, Semaphore}()
202+
client_port::Ref{UInt16} = Ref{UInt16}(0)
203+
204+
# Scoped value for exited callback pid
205+
exited_callback_pid::ScopedValue{Int} = ScopedValue(-1)
206+
207+
# Stdlib watcher
208+
stdlib_watcher_timer::Ref{Union{Timer, Nothing}} = Ref{Union{Timer, Nothing}}(nothing)
209+
end
210+
211+
const CTX = ClusterContext()
153212

154213
function __init__()
155-
global _stdlib_watcher_timer
156214
init_parallel()
157215

158216
if ccall(:jl_generating_output, Cint, ()) == 0
@@ -161,12 +219,12 @@ function __init__()
161219
# cluster cookie has been set, which is most likely to have been done
162220
# through Distributed.init_multi() being called by Distributed.addprocs() or
163221
# something.
164-
_stdlib_watcher_timer = Timer(0; interval=1) do timer
222+
CTX.stdlib_watcher_timer[] = Timer(0; interval=1) do timer
165223
if _check_distributed_active()
166224
close(timer)
167225
end
168226
end
169-
atexit(() -> close(_stdlib_watcher_timer))
227+
atexit(() -> close(CTX.stdlib_watcher_timer[]))
170228
end
171229
end
172230

0 commit comments

Comments
 (0)