Skip to content

Commit 07bda63

Browse files
committed
Add support for worker statuses
1 parent 9d9cf14 commit 07bda63

7 files changed

Lines changed: 200 additions & 18 deletions

File tree

Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ version = "1.1.1"
44

55
[deps]
66
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
7+
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
78
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
89
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
910

@@ -20,6 +21,7 @@ LibSSH = "0.7"
2021
LinearAlgebra = "1"
2122
Random = "1"
2223
Revise = "3.7.0"
24+
ScopedValues = "1.6.0"
2325
Serialization = "1"
2426
Sockets = "1"
2527
Test = "1"

docs/src/_changelog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ This documents notable changes in DistributedNext.jl. The format is based on
1212
### Added
1313
- Implemented callback support for workers being added/removed etc ([#17]).
1414
- Added a package extension to support Revise.jl ([#17]).
15+
- Added support for setting worker statuses with [`setstatus`](@ref) and
16+
[`getstatus`](@ref) ([#17]).
1517

1618
### Fixed
1719
- Modified the default implementations of methods like `take!` and `wait` on

docs/src/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ DistributedNext.rmprocs
1414
DistributedNext.interrupt
1515
DistributedNext.myid
1616
DistributedNext.pmap
17+
DistributedNext.getstatus
18+
DistributedNext.@getstatus
19+
DistributedNext.setstatus!
20+
DistributedNext.@setstatus!
1721
DistributedNext.RemoteException
1822
DistributedNext.ProcessExitedException
1923
DistributedNext.Future

src/DistributedNext.jl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ using Serialization, Sockets
2323
import Serialization: serialize, deserialize
2424
import Sockets: connect, wait_connected
2525

26+
@static if VERSION < v"1.11"
27+
using ScopedValues: ScopedValue, @with
28+
else
29+
using Base.ScopedValues: ScopedValue, @with
30+
end
31+
2632
# NOTE: clusterserialize.jl imports additional symbols from Serialization for use
2733

2834
export

src/cluster.jl

Lines changed: 125 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,8 @@ const HDR_COOKIE_LEN = 16
891891
const map_pid_wrkr = Lockable(Dict{Int, Union{Worker, LocalProcess}}())
892892
const map_sock_wrkr = Lockable(IdDict())
893893
const map_del_wrkr = Lockable(Set{Int}())
894+
const _exited_callback_pid = ScopedValue{Int}(-1)
895+
const map_pid_statuses = Lockable(Dict{Int, Any}())
894896
const worker_starting_callbacks = Dict{Any, Base.Callable}()
895897
const worker_started_callbacks = Dict{Any, Base.Callable}()
896898
const worker_exiting_callbacks = Dict{Any, Base.Callable}()
@@ -1042,9 +1044,9 @@ segfaulting etc). Chooses and returns a unique key for the callback if `key` is
10421044
not specified.
10431045
10441046
The callback will be called with the worker ID and the final
1045-
`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an
1046-
enum, a value of `WorkerState_terminated` means a graceful exit and a value of
1047-
`WorkerState_exterminated` means the worker died unexpectedly.
1047+
`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is
1048+
an enum, a value of `WorkerState_terminated` means a graceful exit and a value
1049+
of `WorkerState_exterminated` means the worker died unexpectedly.
10481050
10491051
All worker-exited callbacks will be executed concurrently. If a callback throws
10501052
an exception it will be caught and printed.
@@ -1238,6 +1240,112 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out.
12381240
"""
12391241
other_workers() = filter(!=(myid()), workers())
12401242

1243+
"""
1244+
@setstatus! x
1245+
@setstatus! x pid
1246+
1247+
Set the status for the calling module on worker `pid` (defaults to the current
1248+
worker) to `x`. `x` may be any serializable object but it's recommended to keep
1249+
it small enough to cheaply send over a network. Statuses can be retrieved inside
1250+
worker-exited callbacks (see [`add_worker_exited_callback`](@ref)) before the
1251+
worker is fully deregistered.
1252+
1253+
Statuses are keyed by the calling `Module`, so multiple libraries can
1254+
independently track their own status on the same worker without conflicting.
1255+
1256+
This can be handy if you want a way to know what a worker is doing at any given
1257+
time, or (in combination with a worker-exited callback) for knowing what a
1258+
worker was last doing before it died.
1259+
1260+
# Examples
1261+
```julia-repl
1262+
julia> DistributedNext.@setstatus! "working on dataset 42"
1263+
"working on dataset 42"
1264+
1265+
julia> DistributedNext.@getstatus
1266+
"working on dataset 42"
1267+
```
1268+
1269+
See also [`setstatus!`](@ref) for the function form that accepts an explicit module key.
1270+
"""
1271+
macro setstatus!(x)
1272+
mod = __module__
1273+
:(setstatus!($(esc(x)), $mod))
1274+
end
1275+
1276+
macro setstatus!(x, pid)
1277+
mod = __module__
1278+
:(setstatus!($(esc(x)), $mod, $(esc(pid))))
1279+
end
1280+
1281+
"""
1282+
setstatus!(x, mod::Module, pid::Int=myid())
1283+
1284+
Function form of [`@setstatus!`](@ref). Sets the status for module `mod` on
1285+
worker `pid` to `x`.
1286+
"""
1287+
function setstatus!(x, mod::Module, pid::Int=myid())
1288+
if !id_in_procs(pid)
1289+
throw(ArgumentError("Worker $(pid) does not exist, cannot set its status"))
1290+
end
1291+
1292+
if myid() == 1
1293+
@lock map_pid_statuses begin
1294+
statuses = get!(map_pid_statuses[], pid, Dict{Module, Any}())
1295+
statuses[mod] = x
1296+
end
1297+
else
1298+
remotecall_fetch(setstatus!, 1, x, mod, myid())
1299+
end
1300+
end
1301+
1302+
function _getstatus(pid, mod)
1303+
@lock map_pid_statuses begin
1304+
statuses = get(map_pid_statuses[], pid, nothing)
1305+
isnothing(statuses) ? nothing : get(statuses, mod, nothing)
1306+
end
1307+
end
1308+
1309+
"""
1310+
@getstatus
1311+
@getstatus pid
1312+
1313+
Get the status set by the calling module for worker `pid` (defaults to the
1314+
current worker). If one was never explicitly set with [`@setstatus!`](@ref)
1315+
this will return `nothing`.
1316+
1317+
See also [`getstatus`](@ref) for the function form.
1318+
"""
1319+
macro getstatus()
1320+
mod = __module__
1321+
:(getstatus($mod))
1322+
end
1323+
macro getstatus(pid)
1324+
mod = __module__
1325+
:(getstatus($mod, $(esc(pid))))
1326+
end
1327+
1328+
"""
1329+
getstatus(mod::Module, pid::Int=myid())
1330+
1331+
Function form of [`@getstatus`](@ref). Gets the status for module `mod` on
1332+
worker `pid`. Returns `nothing` if no status was set.
1333+
"""
1334+
function getstatus(mod::Module, pid::Int=myid())
1335+
# During the worker-exited callbacks this function may be called, at which
1336+
# point it will not exist in procs(). Thus we check whether the function is
1337+
# being called for an exited worker and allow it if so.
1338+
if !id_in_procs(pid) && _exited_callback_pid[] != pid
1339+
throw(ArgumentError("Worker $(pid) does not exist, cannot get its status"))
1340+
end
1341+
1342+
if myid() == 1
1343+
_getstatus(pid, mod)
1344+
else
1345+
remotecall_fetch(getstatus, 1, mod, pid)
1346+
end
1347+
end
1348+
12411349
function cluster_mgmt_from_master_check()
12421350
if myid() != 1
12431351
throw(ErrorException("Only process 1 can add and remove workers"))
@@ -1463,15 +1571,22 @@ function deregister_worker(pg, pid)
14631571
end
14641572
end
14651573

1466-
# Call callbacks on the master
14671574
if myid() == 1
1468-
for (name, callback) in worker_exited_callbacks
1469-
try
1470-
callback(pid, w.state)
1471-
catch ex
1472-
@error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace())
1473-
end
1575+
params = default_addprocs_params(w.manager)
1576+
warning_interval = params[:callback_warning_interval]
1577+
1578+
# Call callbacks on the master, with the scoped value set so that
1579+
# getstatus() can be called for the exiting worker without failing the
1580+
# pid check. We go to some effort to make sure this works after
1581+
# deregistering the worker because if it's called beforehand the worker
1582+
# will incorrectly be shown in e.g. procs().
1583+
@with _exited_callback_pid => pid begin
1584+
_run_callbacks_concurrently("worker-exited", worker_exited_callbacks,
1585+
warning_interval, [(pid, w.state)]; catch_exceptions=true)
14741586
end
1587+
1588+
# Delete its statuses
1589+
@lock map_pid_statuses delete!(map_pid_statuses[], pid)
14751590
end
14761591

14771592
return

test/distributed_exec.jl

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import Revise
44
using DistributedNext, Random, Serialization, Sockets
55
import DistributedNext
6-
import DistributedNext: launch, manage
6+
import DistributedNext: launch, manage, getstatus, setstatus!, @getstatus, @setstatus!
77

88

99
@test cluster_cookie() isa String
@@ -1826,7 +1826,9 @@ end
18261826
let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp
18271827
pkg_project = joinpath(Base.pkgdir(DistributedNext), "Project.toml")
18281828
project = mkdir(joinpath(tmp, "project"))
1829-
depots = [mkdir(joinpath(tmp, "depot1")), mkdir(joinpath(tmp, "depot2"))]
1829+
# Keep the writable depot in the depots list so that external
1830+
# dependencies (i.e. ScopedValues.jl) can be loaded.
1831+
depots = [mkdir(joinpath(tmp, "depot1")), mkdir(joinpath(tmp, "depot2")), Base.DEPOT_PATH[1]]
18301832
load_path = [mkdir(joinpath(tmp, "load_path")), "@stdlib", "@", pkg_project]
18311833
pathsep = Sys.iswindows() ? ";" : ":"
18321834
env = Dict(
@@ -1935,15 +1937,15 @@ end
19351937
project = mktempdir()
19361938
env = Dict(
19371939
"JULIA_LOAD_PATH" => string(LOAD_PATH[1], $(repr(pathsep)), "@stdlib", $(repr(pathsep)), "$(escaped_pkg_project)"),
1938-
"JULIA_DEPOT_PATH" => DEPOT_PATH[1],
1940+
"JULIA_DEPOT_PATH" => DEPOT_PATH[end],
19391941
"TMPDIR" => ENV["TMPDIR"],
19401942
)
19411943
addprocs(1; env = env, exeflags = `--project=\$(project)`)
19421944
env["JULIA_PROJECT"] = project
19431945
addprocs(1; env = env)
19441946
""" * funcscode * """
19451947
for w in workers()
1946-
@test remotecall_fetch(depot_path, w) == [DEPOT_PATH[1]]
1948+
@test remotecall_fetch(depot_path, w) == [DEPOT_PATH[end]]
19471949
@test remotecall_fetch(load_path, w) == [LOAD_PATH[1], "@stdlib", "$(escaped_pkg_project)"]
19481950
@test remotecall_fetch(active_project, w) == project
19491951
@test remotecall_fetch(Base.active_project, w) == joinpath(project, "Project.toml")
@@ -1983,7 +1985,40 @@ include("splitrange.jl")
19831985
end
19841986
end
19851987

1988+
@testset "Worker statuses" begin
1989+
rmprocs(other_workers())
1990+
1991+
# Test with the local worker using macros
1992+
@test isnothing(@getstatus())
1993+
@setstatus!("foo")
1994+
@test @getstatus() == "foo"
1995+
@test_throws ArgumentError getstatus(Main, 2)
1996+
1997+
# Test with a remote worker using the function form
1998+
pid = only(addprocs(1))
1999+
@test isnothing(getstatus(Main, pid))
2000+
remotecall_wait(setstatus!, pid, "bar", Main, pid)
2001+
@test remotecall_fetch(getstatus, pid, Main) == "bar"
2002+
2003+
# Test that different modules have independent statuses
2004+
setstatus!("from_main", Main, pid)
2005+
setstatus!("from_distributed", DistributedNext, pid)
2006+
@test getstatus(Main, pid) == "from_main"
2007+
@test getstatus(DistributedNext, pid) == "from_distributed"
2008+
2009+
rmprocs(pid)
2010+
end
2011+
19862012
@testset "Worker state callbacks" begin
2013+
# Helper function to wait for a worker to have been completely deregistered
2014+
# (including worker-exited callbacks finished) by waiting for the workers
2015+
# status to have been deleted. Only works if the worker has a status of
2016+
# course.
2017+
function wait_for_deregistration(pid)
2018+
statuses = DistributedNext.map_pid_statuses
2019+
@test timedwait(() -> @lock(statuses, !haskey(statuses[], pid)), 10) == :ok
2020+
end
2021+
19872022
rmprocs(other_workers())
19882023

19892024
# Adding a callback with an invalid signature should fail
@@ -2040,16 +2075,34 @@ end
20402075
@test length(exiting_workers) == 1
20412076
@test length(exited_workers) == 1
20422077

2043-
# Test that workers that were killed forcefully are detected as such
2078+
# Test that workers that were killed forcefully are detected as such, and
2079+
# that statuses can be retrieved in the callback.
20442080
exit_state = nothing
2045-
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state)
2081+
last_status = nothing
2082+
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> (exit_state = state; last_status = @getstatus(pid)))
20462083
pid = only(addprocs(1))
2084+
@setstatus!("foo", pid)
20472085

2086+
# Kill the process with stderr redirected so the error messages don't
2087+
# unnecessarily show up in the logs.
20482088
redirect_stderr(devnull) do
20492089
remote_do(exit, pid)
2050-
timedwait(() -> !isnothing(exit_state), 10)
2090+
wait_for_deregistration(pid)
20512091
end
20522092
@test exit_state == DistributedNext.WorkerState_exterminated
2093+
@test last_status == "foo"
2094+
DistributedNext.remove_worker_exited_callback(exited_key)
2095+
2096+
# Test that exceptions in worker-exited callbacks are caught
2097+
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> error("foo"))
2098+
@test_logs (:error, r"Error when running worker-exited callback.+") match_mode=:any begin
2099+
pid = only(addprocs(1))
2100+
# Set a dummy status so that wait_for_deregistration() works
2101+
@setstatus!("foo", pid)
2102+
rmprocs(pid)
2103+
2104+
wait_for_deregistration(pid)
2105+
end
20532106
DistributedNext.remove_worker_exited_callback(exited_key)
20542107
end
20552108

test/runtests.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ include("managers.jl")
3535
include("distributed_stdlib_detection.jl")
3636

3737
@testset "Aqua" begin
38-
Aqua.test_all(DistributedNext)
38+
Aqua.test_all(DistributedNext; stale_deps=(; ignore=[:ScopedValues]))
3939
end

0 commit comments

Comments
 (0)