Skip to content

Commit f41c46a

Browse files
committed
Add support for in-process workers to LocalManager
1 parent 869bfd5 commit f41c46a

4 files changed

Lines changed: 59 additions & 14 deletions

File tree

src/DistributedNext.jl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ function Base.close(ctx::ClusterContext)
222222
if !isnothing(ctx.stdlib_watcher_timer)
223223
close(ctx.stdlib_watcher_timer::Timer)
224224
end
225+
226+
# Close all tracked sockets
227+
@lock ctx.map_sock_wrkr for sock in keys(ctx.map_sock_wrkr[])
228+
close(sock)
229+
end
225230
end
226231

227232
const CTX = ScopedValue(ClusterContext())

src/cluster.jl

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ mutable struct LocalProcess
231231
bind_port_hint::Int
232232
bind_port::Int
233233
cookie::String
234+
in_process::Bool # true when running as a task, not a separate OS process
234235
LocalProcess() = new(1)
235236
end
236237

@@ -260,7 +261,7 @@ should not be relied upon to always pick the fastest interface.
260261
It does not return.
261262
"""
262263
start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...)
263-
function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
264+
function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true, exit_on_close::Bool=true)
264265
init_multi()
265266

266267
if close_stdin # workers will not use it
@@ -278,8 +279,15 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
278279
sock = listen(interface, CTX[].lproc.bind_port)
279280
end
280281
errormonitor(@async while isopen(sock)
281-
client = accept(sock)
282-
process_messages(client, client, true)
282+
try
283+
client = accept(sock)
284+
process_messages(client, client, true)
285+
catch ex
286+
# An IOError is thrown when the socket is closed
287+
if !(ex isa Base.IOError)
288+
rethrow()
289+
end
290+
end
283291
end)
284292
print(out, "julia_worker:") # print header
285293
print(out, "$(CTX[].lproc.bind_port)#") # print port
@@ -298,13 +306,17 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
298306
# To prevent hanging processes on remote machines, newly launched workers exit if the
299307
# master process does not connect in time.
300308
check_master_connect()
301-
while true; wait(); end
309+
while isopen(out)
310+
sleep(0.1)
311+
end
302312
catch err
303313
print(stderr, "unhandled exception on $(myid()): $(err)\nexiting.\n")
304314
end
305315

306316
close(sock)
307-
exit(0)
317+
if exit_on_close
318+
exit(0)
319+
end
308320
end
309321

310322

src/managers.jl

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,10 @@ end
458458
# LocalManager
459459
struct LocalManager <: ClusterManager
460460
np::Int
461-
restrict::Bool # Restrict binding to 127.0.0.1 only
461+
restrict::Bool # Restrict binding to 127.0.0.1 only
462+
in_process::Bool # Run workers as tasks in the same process (internal)
462463
end
464+
LocalManager(np, restrict) = LocalManager(np, restrict, false)
463465

464466
"""
465467
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
@@ -527,13 +529,30 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
527529
end
528530

529531
for i in 1:manager.np
530-
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to $(get_worker_arg())`
531-
io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+")
532-
write_cookie(io)
533-
534532
wconfig = WorkerConfig()
535-
wconfig.process = io
536-
wconfig.io = io.out
533+
534+
if manager.in_process
535+
cookie = cluster_cookie()
536+
worker_ctx = ClusterContext()
537+
worker_ctx.lproc.in_process = true
538+
pipe = Pipe()
539+
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
540+
541+
task = Threads.@spawn @with CTX => worker_ctx begin
542+
start_worker(pipe.in, cookie; close_stdin=false, stderr_to_stdout=false, exit_on_close=false)
543+
end
544+
545+
wconfig.io = pipe.out
546+
wconfig.userdata = (; ctx=worker_ctx, task, pipe)
547+
else
548+
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to $(get_worker_arg())`
549+
proc = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+")
550+
551+
write_cookie(proc)
552+
wconfig.io = proc.out
553+
wconfig.process = proc
554+
end
555+
537556
wconfig.enable_threaded_blas = params[:enable_threaded_blas]
538557
push!(launched, wconfig)
539558
end
@@ -542,7 +561,7 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
542561
end
543562

544563
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
545-
if op === :interrupt
564+
if op === :interrupt && !manager.in_process
546565
kill(config.process::Process, 2)
547566
end
548567
end
@@ -756,6 +775,15 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
756775
end
757776

758777
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15)
778+
if manager.in_process
779+
(; ctx, task, pipe) = config.userdata
780+
# Close the pipe, which causes the start_worker() task to exit
781+
close(pipe)
782+
wait(task)
783+
close(ctx)
784+
return nothing
785+
end
786+
759787
# profile_wait = 6 is 1s for profile, 5s for the report to show
760788
# First, try sending `exit()` to the remote over the usual control channels
761789
remote_do(exit, pid)

src/process_messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
228228
set_worker_state(werr, oldstate != WorkerState_terminating ? WorkerState_exterminated : WorkerState_terminated)
229229

230230
# If unhandleable error occurred talking to pid 1, exit
231-
if wpid == 1
231+
if wpid == 1 && !ctx.lproc.in_process
232232
if isopen(w_stream)
233233
@error "Fatal error on process $(myid())" exception=e,catch_backtrace()
234234
end

0 commit comments

Comments
 (0)