Skip to content

Commit 311f2ed

Browse files
committed
Add a precompilation workload
This makes use of the scoped context and in-process worker support to have both the master and worker running in the same process for precompilation. Also lowered the sleep time for the launch Condition notification to 0.1s. This helps since the Condition is edge-triggered and launching an in-process worker usually takes less than 1s.
1 parent f41c46a commit 311f2ed

6 files changed

Lines changed: 27 additions & 42 deletions

File tree

Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ uuid = "fab6aee4-877b-4bac-a744-3eca44acbb6f"
33
version = "1.2.0"
44

55
[deps]
6+
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
67
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
78
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
89
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
@@ -19,6 +20,7 @@ Aqua = "0.8"
1920
Distributed = "1"
2021
LibSSH = "0.7"
2122
LinearAlgebra = "1"
23+
PrecompileTools = "1"
2224
Random = "1"
2325
Revise = "3.7.0"
2426
ScopedValues = "1.6.0"

docs/src/_changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ This documents notable changes in DistributedNext.jl. The format is based on
1414
single struct ([#61]). This should not be a user-visible change, but of course
1515
it's possible that some things slipped through the cracks so please open an
1616
issue if you encounter any bugs.
17+
- A precompilation workload was added to improve TTFX ([#62]).
1718

1819
## [v1.2.0] - 2026-03-21
1920

ext/ReviseExt.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ module ReviseExt
33
import DistributedNext
44
import DistributedNext: myid, workers, remotecall
55

6-
import Revise
6+
using PrecompileTools: @recompile_invalidations
77

8+
# Sadly Revise causes quite a few invalidations. TODO: make DistributedNext more
9+
# resistant to invalidations.
10+
@recompile_invalidations import Revise
811

912
struct DistributedNextWorker <: Revise.AbstractWorker
1013
id::Int

src/DistributedNext.jl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ include("macros.jl") # @spawn and friends
146146
include("workerpool.jl")
147147
include("pmap.jl")
148148
include("managers.jl") # LocalManager and SSHManager
149-
include("precompile.jl")
150149

151150
# Bundles all mutable global state for a distributed cluster into a single
152151
# object. The active context is accessed via the `CTX` ScopedValue, allowing
@@ -212,6 +211,14 @@ include("precompile.jl")
212211
stdlib_watcher_timer::Union{Timer, Nothing} = nothing
213212
end
214213

214+
function ClusterContext(f::Base.Callable; kwargs...)
215+
ctx = ClusterContext(; kwargs...)
216+
ret = @with CTX => ctx f()
217+
close(ctx)
218+
219+
return ret
220+
end
221+
215222
function Base.close(ctx::ClusterContext)
216223
ctx.shutting_down[] = true
217224
if !isnothing(ctx.gc_msgs_task)
@@ -231,6 +238,8 @@ end
231238

232239
const CTX = ScopedValue(ClusterContext())
233240

241+
include("precompile.jl")
242+
234243
function __init__()
235244
init_parallel()
236245

src/cluster.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ function addprocs_locked(manager::ClusterManager, params)
539539
if isempty(launched)
540540
istaskdone(t_launch) && break
541541
@async begin
542-
sleep(1)
542+
sleep(0.1)
543543
notify(launch_ntfy)
544544
end
545545
wait(launch_ntfy)

src/precompile.jl

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,10 @@
1-
precompile(Tuple{typeof(DistributedNext.remotecall),Function,Int,Module,Vararg{Any, 100}})
2-
precompile(Tuple{typeof(DistributedNext.procs)})
3-
precompile(Tuple{typeof(DistributedNext.finalize_ref), DistributedNext.Future})
4-
precompile(Tuple{typeof(DistributedNext.setup_launched_worker), DistributedNext.LocalManager, DistributedNext.WorkerConfig, Vector{Int}})
5-
precompile(Tuple{typeof(DistributedNext.process_tcp_streams), Sockets.TCPSocket, Sockets.TCPSocket, Bool})
6-
precompile(Tuple{typeof(Serialization.serialize), DistributedNext.ClusterSerializer{Sockets.TCPSocket}, Array{Any, 1}})
7-
precompile(Tuple{typeof(DistributedNext.parse_connection_info), String})
8-
precompile(Tuple{typeof(DistributedNext._run_callbacks_concurrently), String, Base.Dict{Any, Union{Function, Type}}, Int64, Array{Tuple{DistributedNext.LocalManager, Base.Dict{Symbol, Any}}, 1}})
9-
precompile(Tuple{typeof(DistributedNext._run_callbacks_concurrently), String, Base.Dict{Any, Union{Function, Type}}, Int64, Array{Int64, 1}})
10-
precompile(Tuple{typeof(DistributedNext.read_worker_host_port), Base.PipeEndpoint})
11-
precompile(Tuple{typeof(Base.put!), Base.Channel{Any}, Int64})
12-
precompile(Tuple{typeof(Base.put!), Base.Channel{Any}, WeakRef})
1+
using PrecompileTools: @compile_workload
132

14-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.IdentifySocketMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
15-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.CallWaitMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
16-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.CallMsg{:call}, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
17-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.CallMsg{:call_fetch}, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
18-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.ResultMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
19-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.IdentifySocketAckMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
20-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.RemoteDoMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
21-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.JoinPGRPMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
22-
precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.JoinCompleteMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber})
23-
24-
# These lines were obtained from `addprocs(1; exeflags=`--trace-compile=compilation.txt --trace-compile-timing`)`
25-
precompile(Tuple{typeof(DistributedNext.start_worker)})
26-
precompile(Tuple{typeof(DistributedNext.start_worker), Base.PipeEndpoint, String})
27-
precompile(Tuple{typeof(DistributedNext.set_valid_processes), Array{Int64, 1}})
28-
precompile(Tuple{typeof(DistributedNext.terminate_all_workers)})
29-
30-
# This is disabled because it doesn't give much benefit
31-
# and the code in Distributed is poorly typed causing many invalidations
32-
# TODO: Maybe reenable now that Distributed is not in sysimage.
33-
#=
34-
precompile_script *= """
35-
using Distributed
36-
addprocs(2)
37-
pmap(x->iseven(x) ? 1 : 0, 1:4)
38-
@distributed (+) for i = 1:100 Int(rand(Bool)) end
39-
"""
40-
=#
3+
@compile_workload begin
4+
# Run the workload in a separate ClusterContext so the default one stays clean
5+
ClusterContext() do
6+
# Use an in-process worker to avoid spawning a real process during precompilation
7+
pid = only(addprocs(LocalManager(1, true, true)))
8+
rmprocs(pid)
9+
end
10+
end

0 commit comments

Comments
 (0)