|
1 | 1 | using .CUDA |
2 | 2 | import .CUDA: CuDevice, CuContext, devices, attribute |
3 | 3 |
|
| 4 | +using UUIDs |
| 5 | + |
4 | 6 | export CuArrayDeviceProc |
5 | 7 |
|
6 | 8 | "Represents a single CUDA GPU device." |
7 | 9 | struct CuArrayDeviceProc <: Dagger.Processor |
8 | 10 | owner::Int |
9 | | - #ctx::CuContext |
10 | 11 | device::Int |
| 12 | + device_uuid::UUID |
11 | 13 | end |
12 | 14 | @gpuproc(CuArrayDeviceProc, CuArray) |
13 | | -#= FIXME: DtoD copies and CUDA IPC |
14 | | -function Dagger.move(from::CuArrayDeviceProc, to::CuArrayDeviceProc, x) |
15 | | - if from === to |
16 | | - return x |
| 15 | +Dagger.get_parent(proc::CuArrayDeviceProc) = Dagger.OSProc(proc.owner) |
| 16 | + |
| 17 | +# function can_access(this, peer) |
| 18 | +# status = Ref{Cint}() |
| 19 | +# CUDA.cuDeviceCanAccessPeer(status, this, peer) |
| 20 | +# return status[] == 1 |
| 21 | +# end |
| 22 | + |
| 23 | +function Dagger.move(from::CuArrayDeviceProc, to::CuArrayDeviceProc, x::Dagger.Chunk{T}) where T<:CuArray |
| 24 | + if from == to |
| 25 | + # Same process and GPU, no change |
| 26 | + poolget(x.handle) |
| 27 | + elseif from.owner == to.owner |
| 28 | + # Same process but different GPUs, use DtoD copy |
| 29 | + from_arr = poolget(x.handle) |
| 30 | + to_arr = CUDA.device!(to.device) do |
| 31 | + CuArray{T,N}(undef, size) |
| 32 | + end |
| 33 | + copyto!(to_arr, from_arr) |
| 34 | + to_arr |
| 35 | + elseif Dagger.system_uuid(from.owner) == Dagger.system_uuid(to.owner) |
| 36 | + # Same node, we can use IPC |
| 37 | + ipc_handle, eT, shape = remotecall_fetch(from.owner, x.handle) do h |
| 38 | + arr = poolget(h) |
| 39 | + ipc_handle_ref = Ref{CUDA.CUipcMemHandle}() |
| 40 | + GC.@preserve arr begin |
| 41 | + CUDA.cuIpcGetMemHandle(ipc_handle_ref, pointer(arr)) |
| 42 | + end |
| 43 | + (ipc_handle_ref[], eltype(arr), size(arr)) |
| 44 | + end |
| 45 | + r_ptr = Ref{CUDA.CUdeviceptr}() |
| 46 | + CUDA.device!(from.device) do # FIXME: Assumes that device IDs are identical across processes |
| 47 | + CUDA.cuIpcOpenMemHandle(r_ptr, ipc_handle, CUDA.CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS) |
| 48 | + end |
| 49 | + ptr = Base.unsafe_convert(CUDA.CuPtr{eT}, r_ptr[]) |
| 50 | + arr = unsafe_wrap(CuArray, ptr, shape; own=false) |
| 51 | + finalizer(arr) do arr |
| 52 | + CUDA.cuIpcCloseMemHandle(pointer(arr)) |
| 53 | + end |
| 54 | + if from.device_uuid != to.device_uuid |
| 55 | + CUDA.device!(to.device) do |
| 56 | + to_arr = similar(arr) |
| 57 | + copyto!(to_arr, arr) |
| 58 | + to_arr |
| 59 | + end |
| 60 | + else |
| 61 | + arr |
| 62 | + end |
17 | 63 | else |
18 | | - error("Not implemented") |
| 64 | + # Different node, use DtoH, serialization, HtoD |
| 65 | + # TODO UCX |
| 66 | + CuArray(remotecall_fetch(from.owner, x.handle) do h |
| 67 | + Array(poolget(h)) |
| 68 | + end) |
19 | 69 | end |
20 | 70 | end |
21 | | -=# |
| 71 | + |
22 | 72 | function Dagger.execute!(proc::CuArrayDeviceProc, func, args...) |
23 | | - fetch(Threads.@spawn begin |
24 | | - task_local_storage(:processor, proc) |
| 73 | + tls = Dagger.get_tls() |
| 74 | + task = Threads.@spawn begin |
| 75 | + Dagger.set_tls!(tls) |
25 | 76 | CUDA.device!(proc.device) |
26 | 77 | CUDA.@sync func(args...) |
27 | | - end) |
| 78 | + end |
| 79 | + try |
| 80 | + fetch(task) |
| 81 | + catch err |
| 82 | + @static if VERSION >= v"1.1" |
| 83 | + stk = Base.catch_stack(task) |
| 84 | + err, frames = stk[1] |
| 85 | + rethrow(CapturedException(err, frames)) |
| 86 | + else |
| 87 | + rethrow(task.result) |
| 88 | + end |
| 89 | + end |
28 | 90 | end |
29 | 91 | Base.show(io::IO, proc::CuArrayDeviceProc) = |
30 | | - print(io, "CuArrayDeviceProc on worker $(proc.owner), device $(proc.device)") |
| 92 | + print(io, "CuArrayDeviceProc on worker $(proc.owner), device $(proc.device), uuid $(proc.device_uuid)") |
31 | 93 |
|
32 | 94 | processor(::Val{:CUDA}) = CuArrayDeviceProc |
33 | 95 | cancompute(::Val{:CUDA}) = CUDA.has_cuda() |
34 | 96 | kernel_backend(::CuArrayDeviceProc) = CUDADevice() |
35 | 97 |
|
36 | 98 | if CUDA.has_cuda() |
37 | 99 | for dev in devices() |
38 | | - Dagger.add_callback!(proc -> begin |
39 | | - return CuArrayDeviceProc(Distributed.myid(), #=CuContext(dev),=# dev.handle) |
40 | | - end) |
| 100 | + Dagger.add_processor_callback!("cuarray_device_$(dev.handle)") do |
| 101 | + CuArrayDeviceProc(Distributed.myid(), dev.handle, CUDA.uuid(dev)) |
| 102 | + end |
41 | 103 | end |
42 | 104 | end |
0 commit comments