@@ -13,6 +13,7 @@ import (
1313 "math/rand"
1414 "net"
1515 "net/url"
16+ "sync"
1617 "time"
1718 "unsafe"
1819
@@ -103,6 +104,146 @@ func RemDial(cmdline *C.char) (*C.char, C.int) {
103104 return C .CString (a .ID ), 0
104105}
105106
107+ // ========================================================================
108+ // BufferedConn — goroutine-buffered net.Conn for non-blocking TryRead.
109+ //
110+ // A background goroutine continuously reads from the underlying net.Conn
111+ // into an internal buffer. TryRead / TryWrite are pure memory operations
112+ // that never touch deadlines or block on I/O — the FFI caller gets
113+ // instant results (data or WouldBlock).
114+ //
115+ // This avoids multi-layer timeout conflicts:
116+ // - No SetReadDeadline / SetWriteDeadline manipulation
117+ // - The ONLY timeout is in the Rust session layer (read_exact_with_idle_timeout)
118+ // ========================================================================
119+
120+ type BufferedConn struct {
121+ conn net.Conn
122+
123+ // Read side: goroutine fills readBuf, TryRead drains it.
124+ readMu sync.Mutex
125+ readBuf []byte
126+ readErr error // sticky: once set, all future TryRead return it after buf drain
127+
128+ // Write: passed through to conn directly (Write is typically fast).
129+ // If non-blocking write is needed, add writeBuf + goroutine later.
130+ }
131+
132+ func NewBufferedConn (conn net.Conn ) * BufferedConn {
133+ bc := & BufferedConn {conn : conn }
134+ go bc .readLoop ()
135+ return bc
136+ }
137+
138+ func (bc * BufferedConn ) readLoop () {
139+ buf := make ([]byte , 64 * 1024 )
140+ for {
141+ n , err := bc .conn .Read (buf )
142+
143+ bc .readMu .Lock ()
144+ if n > 0 {
145+ bc .readBuf = append (bc .readBuf , buf [:n ]... )
146+ }
147+ if err != nil {
148+ bc .readErr = err
149+ }
150+ bc .readMu .Unlock ()
151+
152+ if err != nil {
153+ return
154+ }
155+ }
156+ }
157+
158+ // TryRead copies available data from the buffer.
159+ // Returns (n, 0) if data was available, (0, ErrWouldBlock) if empty,
160+ // or (0, ErrDialFailed) on connection error (after buffer is drained).
161+ func (bc * BufferedConn ) TryRead (dst []byte ) (int , int ) {
162+ bc .readMu .Lock ()
163+ defer bc .readMu .Unlock ()
164+
165+ if len (bc .readBuf ) > 0 {
166+ n := copy (dst , bc .readBuf )
167+ bc .readBuf = bc .readBuf [n :]
168+ return n , 0
169+ }
170+
171+ // Buffer empty — check if the connection has an error.
172+ if bc .readErr != nil {
173+ if bc .readErr == io .EOF {
174+ return 0 , 0 // EOF
175+ }
176+ return 0 , ErrDialFailed
177+ }
178+
179+ return 0 , ErrWouldBlock
180+ }
181+
182+ // Write passes through to the underlying conn (blocking).
183+ func (bc * BufferedConn ) Write (src []byte ) (int , int ) {
184+ n , err := bc .conn .Write (src )
185+ if err != nil {
186+ return 0 , ErrDialFailed
187+ }
188+ return n , 0
189+ }
190+
191+ // Close closes the underlying conn. The read goroutine will see the
192+ // error from Read and exit.
193+ func (bc * BufferedConn ) Close () int {
194+ err := bc .conn .Close ()
195+ if err != nil {
196+ return ErrDialFailed
197+ }
198+ return 0
199+ }
200+
201+ // BlockingRead reads from the underlying conn directly (for backward compat).
202+ func (bc * BufferedConn ) BlockingRead (dst []byte ) (int , int ) {
203+ // First drain any buffered data.
204+ bc .readMu .Lock ()
205+ if len (bc .readBuf ) > 0 {
206+ n := copy (dst , bc .readBuf )
207+ bc .readBuf = bc .readBuf [n :]
208+ bc .readMu .Unlock ()
209+ return n , 0
210+ }
211+ readErr := bc .readErr
212+ bc .readMu .Unlock ()
213+
214+ if readErr != nil {
215+ if readErr == io .EOF {
216+ return 0 , 0
217+ }
218+ return 0 , ErrDialFailed
219+ }
220+
221+ // Wait for the goroutine to produce data.
222+ for {
223+ time .Sleep (1 * time .Millisecond )
224+ bc .readMu .Lock ()
225+ if len (bc .readBuf ) > 0 {
226+ n := copy (dst , bc .readBuf )
227+ bc .readBuf = bc .readBuf [n :]
228+ bc .readMu .Unlock ()
229+ return n , 0
230+ }
231+ if bc .readErr != nil {
232+ err := bc .readErr
233+ bc .readMu .Unlock ()
234+ if err == io .EOF {
235+ return 0 , 0
236+ }
237+ return 0 , ErrDialFailed
238+ }
239+ bc .readMu .Unlock ()
240+ }
241+ }
242+
243+ // ========================================================================
244+ // FFI exports
245+ // ========================================================================
246+
106247func memoryDialGo (memhandle string , dst string ) (int , int ) {
107248 memURL := & url.URL {
108249 Scheme : "memory" ,
@@ -119,7 +260,8 @@ func memoryDialGo(memhandle string, dst string) (int, int) {
119260 }
120261
121262 connHandle := rand .Intn (0x7FFFFFFF )
122- conns .Store (connHandle , conn )
263+ // Wrap in BufferedConn for non-blocking TryRead support.
264+ conns .Store (connHandle , NewBufferedConn (conn ))
123265 return connHandle , 0
124266}
125267
@@ -129,66 +271,86 @@ func MemoryDial(memhandle *C.char, dst *C.char) (C.int, C.int) {
129271 return C .int (connHandle ), C .int (errCode )
130272}
131273
132- func memoryReadGo (handleInt int , dst [] byte ) (int , int ) {
133- conn , ok := conns .Load (handleInt )
274+ func getBufferedConn (handleInt int ) (* BufferedConn , int ) {
275+ v , ok := conns .Load (handleInt )
134276 if ! ok {
135- return 0 , ErrArgsParseFailed
136- }
137-
138- buffer := make ([]byte , len (dst ))
139- n , err := conn .(net.Conn ).Read (buffer )
140- if err != nil && err != io .EOF {
141- return 0 , ErrDialFailed
142- }
143-
144- if n > 0 {
145- copy (dst [:n ], buffer [:n ])
277+ return nil , ErrArgsParseFailed
146278 }
147-
148- return n , 0
279+ return v .(* BufferedConn ), 0
149280}
150281
151282//export MemoryRead
152283func MemoryRead (chandle C.int , buf unsafe.Pointer , size C.int ) (C.int , C.int ) {
284+ bc , errCode := getBufferedConn (int (chandle ))
285+ if errCode != 0 {
286+ return 0 , C .int (errCode )
287+ }
153288 cBuf := (* [1 << 30 ]byte )(buf )
154- n , errCode := memoryReadGo ( int ( chandle ), cBuf [:int (size )])
155- return C .int (n ), C .int (errCode )
289+ n , ec := bc . BlockingRead ( cBuf [:int (size )])
290+ return C .int (n ), C .int (ec )
156291}
157292
158- func memoryWriteGo (handleInt int , src []byte ) (int , int ) {
159- conn , ok := conns .Load (handleInt )
160- if ! ok {
161- return 0 , ErrArgsParseFailed
293+ // MemoryTryRead is a non-blocking read: returns data from the internal
294+ // buffer or ErrWouldBlock if no data is available. No deadlines, no
295+ // timeout parameters — pure memory operation.
296+ //
297+ //export MemoryTryRead
298+ func MemoryTryRead (chandle C.int , buf unsafe.Pointer , size C.int ) (C.int , C.int ) {
299+ bc , errCode := getBufferedConn (int (chandle ))
300+ if errCode != 0 {
301+ return 0 , C .int (errCode )
162302 }
303+ cBuf := (* [1 << 30 ]byte )(buf )
304+ n , ec := bc .TryRead (cBuf [:int (size )])
305+ return C .int (n ), C .int (ec )
306+ }
163307
164- n , err := conn .(net.Conn ).Write (src )
165- if err != nil {
166- return 0 , ErrDialFailed
308+ // MemoryReadTimeout is kept for backward compatibility with the previous
309+ // Rust transport version. Prefer MemoryTryRead for new code.
310+ //
311+ //export MemoryReadTimeout
312+ func MemoryReadTimeout (chandle C.int , buf unsafe.Pointer , size C.int , timeoutMs C.int ) (C.int , C.int ) {
313+ if timeoutMs == 0 {
314+ return MemoryTryRead (chandle , buf , size )
315+ }
316+ // Positive timeout: poll TryRead until data or deadline.
317+ bc , errCode := getBufferedConn (int (chandle ))
318+ if errCode != 0 {
319+ return 0 , C .int (errCode )
320+ }
321+ cBuf := (* [1 << 30 ]byte )(buf )
322+ deadline := time .Now ().Add (time .Duration (int (timeoutMs )) * time .Millisecond )
323+ for {
324+ n , ec := bc .TryRead (cBuf [:int (size )])
325+ if ec != ErrWouldBlock || n > 0 {
326+ return C .int (n ), C .int (ec )
327+ }
328+ if time .Now ().After (deadline ) {
329+ return 0 , C .int (ErrWouldBlock )
330+ }
331+ time .Sleep (1 * time .Millisecond )
167332 }
168-
169- return n , 0
170333}
171334
172335//export MemoryWrite
173336func MemoryWrite (chandle C.int , buf unsafe.Pointer , size C.int ) (C.int , C.int ) {
337+ bc , errCode := getBufferedConn (int (chandle ))
338+ if errCode != 0 {
339+ return 0 , C .int (errCode )
340+ }
174341 cBuf := (* [1 << 30 ]byte )(buf )
175- n , errCode := memoryWriteGo ( int ( chandle ), cBuf [:int (size )])
176- return C .int (n ), C .int (errCode )
342+ n , ec := bc . Write ( cBuf [:int (size )])
343+ return C .int (n ), C .int (ec )
177344}
178345
179346func memoryCloseGo (handleInt int ) int {
180- conn , ok := conns . Load (handleInt )
181- if ! ok {
182- return ErrArgsParseFailed
347+ bc , errCode := getBufferedConn (handleInt )
348+ if errCode != 0 {
349+ return errCode
183350 }
184-
185- err := conn .(net.Conn ).Close ()
186- if err != nil {
187- return ErrDialFailed
188- }
189-
351+ ec := bc .Close ()
190352 conns .Delete (handleInt )
191- return 0
353+ return ec
192354}
193355
194356//export MemoryClose
@@ -202,6 +364,9 @@ func CleanupAgent() {
202364 if a , ok := value .(* agent.Agent ); ok {
203365 a .Close (nil )
204366 }
367+ // Drop the registry entry synchronously so the next RemDial can
368+ // reuse the same alias immediately after cleanup returns.
369+ agent .Agents .Delete (key )
205370 return true
206371 })
207372}
0 commit comments