Skip to content

Commit 34de931

Browse files
authored
Merge pull request #21 from JuliaParallel/workerpool-state
Make AbstractWorkerPool methods thread-safe and more consistent
2 parents f3751ca + 1386b3d commit 34de931

3 files changed

Lines changed: 92 additions & 14 deletions

File tree

docs/src/_changelog.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ CurrentModule = DistributedNext
77
This documents notable changes in DistributedNext.jl. The format is based on
88
[Keep a Changelog](https://keepachangelog.com).
99

10+
## Unreleased
11+
12+
### Fixed
13+
- Modified the default implementations of methods like `take!` and `wait` on
14+
[`AbstractWorkerPool`](@ref) to be threadsafe and behave more consistently
15+
with each other ([#21]). This is technically breaking, but it's a strict
16+
bugfix to correct previous inconsistent behaviour so it will still land in a
17+
minor release.
18+
1019
## [v1.1.1] - 2026-03-09
1120

1221
### Fixed

src/workerpool.jl

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ The default implementations of the above (on a `AbstractWorkerPool`) require fie
1616
- `channel::Channel{Int}`
1717
- `workers::Set{Int}`
1818
where `channel` contains free worker pids and `workers` is the set of all workers associated with this pool.
19+
20+
The default implementations of the above handle dead workers by removing them
21+
from the pool. Be aware that since workers could die at any time, depending on
22+
the results of functions like `length` or `isready` is not thread-safe.
1923
"""
2024
abstract type AbstractWorkerPool end
2125

@@ -71,7 +75,43 @@ deserialize(S::AbstractSerializer, t::Type{T}) where {T<:WorkerPool} = T(deseria
7175

7276
wp_local_push!(pool::AbstractWorkerPool, w::Int) = (push!(pool.workers, w); put!(pool.channel, w); pool)
7377
wp_local_length(pool::AbstractWorkerPool) = length(pool.workers)
74-
wp_local_isready(pool::AbstractWorkerPool) = isready(pool.channel)
78+
79+
function check_valid_worker!(pool::AbstractWorkerPool, worker)
80+
if !id_in_procs(worker)
81+
# We abuse the Channel lock to provide thread-safety when we modify the
82+
# worker set.
83+
@lock pool.channel delete!(pool.workers, worker)
84+
return false
85+
else
86+
return true
87+
end
88+
end
89+
90+
function default_and_empty(pool::AbstractWorkerPool)
91+
length(pool) == 0 && pool === default_worker_pool()
92+
end
93+
94+
function wp_local_isready(pool::AbstractWorkerPool)
95+
if default_and_empty(pool)
96+
# This state wouldn't block take!() so we return true
97+
return true
98+
end
99+
100+
# Otherwise we lock the channel to prevent anyone else from touching it and
101+
# take!() until we either run out of workers or get a valid one. Locking is
102+
# necessary to avoid blocking on take!() or fetch().
103+
@lock pool.channel begin
104+
while isready(pool.channel)
105+
worker = take!(pool.channel)
106+
if check_valid_worker!(pool, worker)
107+
put!(pool.channel, worker)
108+
break
109+
end
110+
end
111+
112+
return isready(pool.channel)
113+
end
114+
end
75115

76116
function wp_local_put!(pool::AbstractWorkerPool, w::Int)
77117
# In case of default_worker_pool, the master is implicitly considered a worker, i.e.,
@@ -101,29 +141,39 @@ function wp_local_take!(pool::AbstractWorkerPool)
101141
# Find an active worker
102142
worker = 0
103143
while true
104-
if length(pool) == 0
105-
if pool === default_worker_pool()
106-
# No workers, the master process is used as a worker
107-
worker = 1
108-
break
109-
else
110-
throw(ErrorException("No active worker available in pool"))
111-
end
144+
if default_and_empty(pool)
145+
# No workers, the master process is used as a worker
146+
worker = 1
147+
break
148+
elseif length(pool) == 0
149+
throw(ErrorException("No active worker available in pool"))
112150
end
113151

114152
worker = take!(pool.channel)
115-
if id_in_procs(worker)
153+
if check_valid_worker!(pool, worker)
116154
break
117-
else
118-
delete!(pool.workers, worker) # Remove invalid worker from pool
119155
end
120156
end
121157
return worker
122158
end
123159

124160
function wp_local_wait(pool::AbstractWorkerPool)
125-
wait(pool.channel)
126-
return nothing
161+
if default_and_empty(pool)
162+
# This state wouldn't block take!() so we return
163+
return nothing
164+
end
165+
166+
while true
167+
# We don't use take!(::AbstractWorkerPool) because that will throw if
168+
# the pool is empty. This will wait forever until one becomes
169+
# available.
170+
worker = take!(pool.channel)
171+
172+
if check_valid_worker!(pool, worker)
173+
put!(pool.channel, worker)
174+
return nothing
175+
end
176+
end
127177
end
128178

129179
function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)

test/distributed_exec.jl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,25 @@ end
750750
status = timedwait(() -> isready(f), 10)
751751
@test status == :ok
752752

753+
# Test behaviour with missing workers. Note that pool_workers is assigned
754+
# such that the FIFO behaviour of Channel's will ensure that all the tested
755+
# methods will see the bad_worker first.
756+
bad_worker = maximum(workers()) + 1
757+
pool_workers = [bad_worker, 1]
758+
759+
wp = WorkerPool(pool_workers)
760+
@test take!(wp) == 1 # Test take!()
761+
@test !isready(wp)
762+
@test bad_worker wp.workers
763+
764+
@test !isready(WorkerPool([bad_worker]))
765+
766+
wp = WorkerPool(pool_workers)
767+
# This should not hang, and it should end up removing the dead worker
768+
wait(wp)
769+
@test isready(wp)
770+
@test bad_worker wp.workers
771+
753772
# CachingPool tests
754773
wp = CachingPool(workers())
755774
@test [1:100...] == pmap(x->x, wp, 1:100)

0 commit comments

Comments
 (0)