Skip to content

Commit c3ff250

Browse files
authored
Merge pull request #65 from JuliaParallel/lock
Replace `lock()` do-method usage with `@lock`
2 parents f4d43ab + 57c20c9 commit c3ff250

3 files changed

Lines changed: 24 additions & 45 deletions

File tree

src/cluster.jl

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ mutable struct Worker
157157
end
158158

159159
function set_worker_state(w, state)
160-
lock(w.c_state) do
160+
@lock w.c_state begin
161161
@atomic w.state = state
162162
notify(w.c_state; all=true)
163163
end
@@ -696,11 +696,9 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
696696
# require the value of config.connect_at which is set only upon connection completion
697697
for jw in CTX[].pgrp.workers
698698
if (jw.id != 1) && (jw.id < w.id)
699-
lock(jw.c_state) do
699+
@lock jw.c_state if (@atomic jw.state) === WorkerState_created
700700
# wait for wl to join
701-
if (@atomic jw.state) === WorkerState_created
702-
wait(jw.c_state)
703-
end
701+
wait(jw.c_state)
704702
end
705703
push!(join_list, jw)
706704
end
@@ -724,11 +722,9 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
724722
end
725723

726724
for wl in wlist
727-
lock(wl.c_state) do
728-
if (@atomic wl.state) === WorkerState_created
729-
# wait for wl to join
730-
wait(wl.c_state)
731-
end
725+
@lock wl.c_state if (@atomic wl.state) === WorkerState_created
726+
# wait for wl to join
727+
wait(wl.c_state)
732728
end
733729
push!(join_list, wl)
734730
end
@@ -749,9 +745,7 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
749745
if timedwait(() -> isready(rr_ntfy_join), timeout) === :timed_out
750746
error("worker did not connect within $timeout seconds")
751747
end
752-
lock(CTX[].client_refs) do
753-
delete!(CTX[].pgrp.refs, ntfy_oid)
754-
end
748+
@lock CTX[].client_refs delete!(CTX[].pgrp.refs, ntfy_oid)
755749

756750
return w.id
757751
end
@@ -1399,8 +1393,7 @@ function rmprocs(pids...; waitfor=typemax(Int), callback_timeout=10)
13991393
end
14001394

14011395
function _rmprocs(pids, waitfor, callback_timeout)
1402-
lock(CTX[].worker_lock)
1403-
try
1396+
@lock CTX[].worker_lock begin
14041397
# Run the callbacks
14051398
callback_tasks = Tuple{Any, Task}[]
14061399
for pid in pids
@@ -1440,8 +1433,6 @@ function _rmprocs(pids, waitfor, callback_timeout)
14401433
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
14411434
throw(ErrorException(estr))
14421435
end
1443-
finally
1444-
unlock(CTX[].worker_lock)
14451436
end
14461437
end
14471438

@@ -1547,7 +1538,7 @@ function deregister_worker(pg, pid)
15471538
# delete this worker from our remote reference client sets
15481539
ids = []
15491540
tonotify = []
1550-
lock(CTX[].client_refs) do
1541+
@lock CTX[].client_refs begin
15511542
for (id, rv) in pg.refs
15521543
if in(pid, rv.clientset)
15531544
push!(ids, id)

src/messages.jl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ function send_msg_(w::Worker, header, msg, now::Bool)
174174
wait(w.initialized)
175175
end
176176
io = w.w_stream
177-
lock(io)
178-
try
177+
178+
@lock io begin
179179
reset_state(w.w_serializer)
180180
serialize_hdr_raw(io, header)
181181
invokelatest(serialize_msg, w.w_serializer, msg) # io is wrapped in w_serializer
@@ -186,8 +186,6 @@ function send_msg_(w::Worker, header, msg, now::Bool)
186186
else
187187
flush(io)
188188
end
189-
finally
190-
unlock(io)
191189
end
192190
end
193191

src/remotecall.jl

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,7 @@ A low-level API which returns the backing `AbstractChannel` for an `id` returned
160160
The call is valid only on the node where the backing channel exists.
161161
"""
162162
function channel_from_id(id)
163-
rv = lock(CTX[].client_refs) do
164-
return get(CTX[].pgrp.refs, id, false)
165-
end
163+
rv = @lock CTX[].client_refs get(CTX[].pgrp.refs, id, false)
166164
if rv === false
167165
throw(ErrorException("Local instance of remote reference not found"))
168166
end
@@ -171,16 +169,18 @@ end
171169

172170
lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(CTX[].pgrp, rrid, f)
173171
function lookup_ref(pg, rrid, f)
174-
return lock(CTX[].client_refs) do
172+
rv = @lock CTX[].client_refs begin
175173
rv = get(pg.refs, rrid, false)
176174
if rv === false
177175
# first we've heard of this ref
178176
rv = RemoteValue(invokelatest(f))
179177
pg.refs[rrid] = rv
180178
push!(rv.clientset, rrid.whence)
181179
end
182-
return rv
180+
rv
183181
end::RemoteValue
182+
183+
return rv
184184
end
185185

186186
"""
@@ -232,9 +232,7 @@ del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())
232232

233233
del_client(id, client) = del_client(CTX[].pgrp, id, client)
234234
function del_client(pg, id, client)
235-
lock(CTX[].client_refs) do
236-
_del_client(pg, id, client)
237-
end
235+
@lock CTX[].client_refs _del_client(pg, id, client)
238236
nothing
239237
end
240238

@@ -297,13 +295,11 @@ function send_del_client_no_lock(rr)
297295
end
298296

299297
function publish_del_msg!(w::Worker, msg)
300-
lock(w.msg_lock) do
298+
@lock w.msg_lock begin
301299
push!(w.del_msgs, msg)
302300
@atomic w.gcflag = true
303301
end
304-
lock(CTX[].any_gc_flag) do
305-
notify(CTX[].any_gc_flag)
306-
end
302+
@lock CTX[].any_gc_flag notify(CTX[].any_gc_flag)
307303
end
308304

309305
function process_worker(rr)
@@ -320,7 +316,7 @@ function process_worker(rr)
320316
end
321317

322318
function add_client(id, client)
323-
lock(CTX[].client_refs) do
319+
@lock CTX[].client_refs begin
324320
rv = lookup_ref(id)
325321
push!(rv.clientset, client)
326322
end
@@ -341,13 +337,11 @@ function send_add_client(rr::AbstractRemoteRef, i)
341337
# to the processor that owns the remote ref. it will add_client
342338
# itself inside deserialize().
343339
w = worker_from_id(rr.where)
344-
lock(w.msg_lock) do
340+
@lock w.msg_lock begin
345341
push!(w.add_msgs, (remoteref_id(rr), i))
346342
@atomic w.gcflag = true
347343
end
348-
lock(CTX[].any_gc_flag) do
349-
notify(CTX[].any_gc_flag)
350-
end
344+
@lock CTX[].any_gc_flag notify(CTX[].any_gc_flag)
351345
end
352346
end
353347

@@ -448,9 +442,7 @@ function remotecall_fetch(f, w::Worker, args...; kwargs...)
448442
rv.waitingfor = w.id
449443
send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs))
450444
v = take!(rv)
451-
lock(CTX[].client_refs) do
452-
delete!(CTX[].pgrp.refs, oid)
453-
end
445+
@lock CTX[].client_refs delete!(CTX[].pgrp.refs, oid)
454446
return isa(v, RemoteException) ? throw(v) : v
455447
end
456448

@@ -490,9 +482,7 @@ function remotecall_wait(f, w::Worker, args...; kwargs...)
490482
rr = Future(w)
491483
send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs))
492484
v = fetch(rv.c)
493-
lock(CTX[].client_refs) do
494-
delete!(CTX[].pgrp.refs, prid)
495-
end
485+
@lock CTX[].client_refs delete!(CTX[].pgrp.refs, prid)
496486
isa(v, RemoteException) && throw(v)
497487
return rr
498488
end

0 commit comments

Comments
 (0)