Skip to content

Commit bf86b16

Browse files
committed
Rename the WorkerState instances and add an exterminated state
The new `WorkerState_exterminated` state is for indicating that a worker was killed by something other than us.
1 parent 4ef4ac2 commit bf86b16

3 files changed

Lines changed: 34 additions & 19 deletions

File tree

src/cluster.jl

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,15 @@ mutable struct WorkerConfig
100100
end
101101
end
102102

103-
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED W_UNKNOWN_STATE
103+
@enum WorkerState begin
104+
WorkerState_created
105+
WorkerState_connected
106+
WorkerState_terminating # rmprocs() has been called on the worker
107+
WorkerState_terminated # Worker was gracefully removed
108+
WorkerState_exterminated # Worker was forcefully removed (not by us)
109+
WorkerState_unknown
110+
end
111+
104112
mutable struct Worker
105113
id::Int
106114
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
@@ -131,7 +139,7 @@ mutable struct Worker
131139
w.manager = manager
132140
w.config = config
133141
w.version = version
134-
set_worker_state(w, W_CONNECTED)
142+
set_worker_state(w, WorkerState_connected)
135143
register_worker_streams(w)
136144
w
137145
end
@@ -142,7 +150,7 @@ mutable struct Worker
142150
@lock map_pid_wrkr if haskey(map_pid_wrkr[], id)
143151
return map_pid_wrkr[][id]
144152
end
145-
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func)
153+
w=new(id, Threads.ReentrantLock(), [], [], false, WorkerState_created, Threads.Condition(), time(), conn_func)
146154
w.initialized = Event()
147155
register_worker(w)
148156
w
@@ -158,8 +166,15 @@ function set_worker_state(w, state)
158166
end
159167
end
160168

169+
# Helper function to check if a worker is dead or not. It's recommended to use
170+
# this instead of checking Worker.state manually.
171+
function is_worker_dead(w::Worker)
172+
state = @atomic w.state
173+
return state === WorkerState_terminated || state === WorkerState_exterminated
174+
end
175+
161176
function check_worker_state(w::Worker)
162-
if (@atomic w.state) === W_CREATED
177+
if (@atomic w.state) === WorkerState_created
163178
if !isclusterlazy()
164179
if PGRP.topology === :all_to_all
165180
# Since higher pids connect with lower pids, the remote worker
@@ -198,7 +213,7 @@ function exec_conn_func(w::Worker)
198213
end
199214

200215
function wait_for_conn(w)
201-
if (@atomic w.state) === W_CREATED
216+
if (@atomic w.state) === WorkerState_created
202217
timeout = worker_timeout() - (time() - w.ct_time)
203218
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
204219

@@ -211,7 +226,7 @@ function wait_for_conn(w)
211226
errormonitor(T)
212227
lock(w.c_state) do
213228
wait(w.c_state)
214-
(@atomic w.state) === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
229+
(@atomic w.state) === WorkerState_created && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
215230
end
216231
end
217232
nothing
@@ -673,7 +688,7 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
673688
if (jw.id != 1) && (jw.id < w.id)
674689
lock(jw.c_state) do
675690
# wait for wl to join
676-
if (@atomic jw.state) === W_CREATED
691+
if (@atomic jw.state) === WorkerState_created
677692
wait(jw.c_state)
678693
end
679694
end
@@ -700,7 +715,7 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
700715

701716
for wl in wlist
702717
lock(wl.c_state) do
703-
if (@atomic wl.state) === W_CREATED
718+
if (@atomic wl.state) === WorkerState_created
704719
# wait for wl to join
705720
wait(wl.c_state)
706721
end
@@ -918,7 +933,7 @@ function nprocs()
918933
n = length(PGRP.workers)
919934
# filter out workers in the process of being setup/shutdown.
920935
for jw in PGRP.workers
921-
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
936+
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== WorkerState_connected)
922937
n = n - 1
923938
end
924939
end
@@ -971,7 +986,7 @@ julia> procs()
971986
function procs()
972987
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
973988
# filter out workers in the process of being setup/shutdown.
974-
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
989+
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)]
975990
else
976991
return Int[x.id for x in PGRP.workers]
977992
end
@@ -988,7 +1003,7 @@ other_procs() = filter(!=(myid()), procs())
9881003
function id_in_procs(id) # faster version of `id in procs()`
9891004
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
9901005
for x in PGRP.workers
991-
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
1006+
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === WorkerState_connected)
9921007
return true
9931008
end
9941009
end
@@ -1012,7 +1027,7 @@ See also [`other_procs()`](@ref).
10121027
"""
10131028
function procs(pid::Integer)
10141029
if myid() == 1
1015-
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
1030+
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)]
10161031
if (pid == 1) || (isa(@lock(map_pid_wrkr, map_pid_wrkr[][pid].manager), LocalManager))
10171032
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
10181033
else
@@ -1121,7 +1136,7 @@ function _rmprocs(pids, waitfor)
11211136
else
11221137
w = @lock map_pid_wrkr get(map_pid_wrkr[], p, nothing)
11231138
if !isnothing(w)
1124-
set_worker_state(w, W_TERMINATING)
1139+
set_worker_state(w, WorkerState_terminating)
11251140
kill(w.manager, p, w.config)
11261141
push!(rmprocset, w)
11271142
end
@@ -1130,11 +1145,11 @@ function _rmprocs(pids, waitfor)
11301145

11311146
start = time_ns()
11321147
while (time_ns() - start) < waitfor*1e9
1133-
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
1148+
all(is_worker_dead, rmprocset) && break
11341149
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
11351150
end
11361151

1137-
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
1152+
unremoved = [wrkr.id for wrkr in filter(!is_worker_dead, rmprocset)]
11381153
if length(unremoved) > 0
11391154
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
11401155
throw(ErrorException(estr))

src/messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ end
194194
function flush_gc_msgs()
195195
try
196196
for w in (PGRP::ProcessGroup).workers
197-
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
197+
if isa(w,Worker) && ((@atomic w.state) == WorkerState_connected) && w.gcflag
198198
flush_gc_msgs(w)
199199
end
200200
end

src/process_messages.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
210210
handle_msg(msg, header, r_stream, w_stream, version)
211211
end
212212
catch e
213-
oldstate = W_UNKNOWN_STATE
213+
oldstate = WorkerState_unknown
214214

215215
# Check again as it may have been set in a message handler but not propagated to the calling block above
216216
if wpid < 1
@@ -223,7 +223,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
223223
elseif @lock(map_del_wrkr, !(wpid in map_del_wrkr[]))
224224
werr = worker_from_id(wpid)
225225
oldstate = @atomic werr.state
226-
set_worker_state(werr, W_TERMINATED)
226+
set_worker_state(werr, oldstate != WorkerState_terminating ? WorkerState_exterminated : WorkerState_terminated)
227227

228228
# If unhandleable error occurred talking to pid 1, exit
229229
if wpid == 1
@@ -243,7 +243,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
243243
close(w_stream)
244244

245245
if (myid() == 1) && (wpid > 1)
246-
if oldstate != W_TERMINATING
246+
if oldstate != WorkerState_terminating
247247
println(stderr, "Worker $wpid terminated.")
248248
rethrow()
249249
end

0 commit comments

Comments
 (0)