@@ -11,6 +11,8 @@ import okhttp3.WebSocketListener
1111import org.json.JSONArray
1212import org.json.JSONObject
1313import java.util.concurrent.TimeUnit
14+ import java.util.concurrent.atomic.AtomicBoolean
15+ import java.util.concurrent.atomic.AtomicInteger
1416
1517class DDPClient {
1618 private data class QueuedMethodCall (
@@ -28,17 +30,29 @@ class DDPClient {
2830
2931 private var webSocket: WebSocket ? = null
3032 private val client: OkHttpClient = sharedClient
31- private var sendCounter = 0
33+ private val sendCounter = AtomicInteger (0 )
34+
35+ @Volatile
3236 private var isConnected = false
37+
3338 private val mainHandler = Handler (Looper .getMainLooper())
3439
3540 private val pendingCallbacks = mutableMapOf<String , (JSONObject ) - > Unit > ()
3641 private val queuedMethodCalls = mutableListOf<QueuedMethodCall >()
42+
43+ @Volatile
3744 private var connectedCallback: ((Boolean ) -> Unit )? = null
3845
46+ @Volatile
47+ private var connectTimeoutRunnable: Runnable ? = null
48+
49+ private val connectResultDelivered = AtomicBoolean (false )
50+
3951 var onCollectionMessage: ((JSONObject ) -> Unit )? = null
4052
4153 fun connect (host : String , callback : (Boolean ) -> Unit ) {
54+ resetConnectHandshakeState()
55+
4256 val wsUrl = buildWebSocketURL(host)
4357
4458 Log .d(TAG , " Connecting to $wsUrl " )
@@ -60,16 +74,26 @@ class DDPClient {
6074 }
6175
6276 override fun onMessage (webSocket : WebSocket , text : String ) {
77+ if (webSocket != = this @DDPClient.webSocket) return
6378 handleMessage(text)
6479 }
6580
6681 override fun onFailure (webSocket : WebSocket , t : Throwable , response : Response ? ) {
82+ if (webSocket != = this @DDPClient.webSocket) return
6783 Log .e(TAG , " WebSocket failure: ${t.message} " )
6884 isConnected = false
69- mainHandler.post { callback(false ) }
85+ mainHandler.post {
86+ // Re-check identity on main thread: disconnect()+connect() can interleave
87+ // between the outer guard (OkHttp thread) and this runnable's execution.
88+ // Without this re-check, a stale failure can hijack a newly installed
89+ // connectedCallback via the CAS in tryDeliverConnectOutcome.
90+ if (webSocket != = this @DDPClient.webSocket) return @post
91+ tryDeliverConnectOutcome(false )
92+ }
7093 }
7194
7295 override fun onClosed (webSocket : WebSocket , code : Int , reason : String ) {
96+ if (webSocket != = this @DDPClient.webSocket) return
7397 Log .d(TAG , " WebSocket closed: $code $reason " )
7498 isConnected = false
7599 }
@@ -134,6 +158,7 @@ class DDPClient {
134158 fun disconnect () {
135159 Log .d(TAG , " Disconnecting" )
136160 isConnected = false
161+ cancelConnectTimeout()
137162 synchronized(pendingCallbacks) { pendingCallbacks.clear() }
138163 clearQueuedMethodCalls()
139164 connectedCallback = null
@@ -143,10 +168,10 @@ class DDPClient {
143168 }
144169
145170 private fun nextMessage (msg : String ): JSONObject {
146- sendCounter++
171+ val nextId = sendCounter.incrementAndGet()
147172 return JSONObject ().apply {
148173 put(" msg" , msg)
149- put(" id" , " ddp-$sendCounter " )
174+ put(" id" , " ddp-$nextId " )
150175 }
151176 }
152177
@@ -211,14 +236,44 @@ class DDPClient {
211236 }
212237 }
213238
239+ private fun resetConnectHandshakeState () {
240+ connectResultDelivered.set(false )
241+ }
242+
243+ /* *
244+ * Delivers at most one outcome for the WebSocket connect handshake ([connect] callback).
245+ * Uses [AtomicBoolean] so [onFailure], the connect-timeout runnable, and `"connected"` cannot
246+ * all report conflicting results.
247+ */
248+ private fun tryDeliverConnectOutcome (success : Boolean , connectTimeout : Boolean = false) {
249+ if (! connectResultDelivered.compareAndSet(false , true )) {
250+ return
251+ }
252+ cancelConnectTimeout()
253+ val cb = connectedCallback
254+ connectedCallback = null
255+ if (connectTimeout) {
256+ Log .e(TAG , " Connect timeout" )
257+ }
258+ mainHandler.post {
259+ cb?.invoke(success)
260+ }
261+ }
262+
214263 private fun waitForConnected (timeoutMs : Long , callback : (Boolean ) -> Unit ) {
215264 connectedCallback = callback
216- mainHandler.postDelayed({
217- val cb = connectedCallback ? : return @postDelayed
218- connectedCallback = null
219- Log .e(TAG , " Connect timeout" )
220- cb(false )
221- }, timeoutMs)
265+ cancelConnectTimeout()
266+ val runnable = Runnable {
267+ connectTimeoutRunnable = null
268+ tryDeliverConnectOutcome(false , connectTimeout = true )
269+ }
270+ connectTimeoutRunnable = runnable
271+ mainHandler.postDelayed(runnable, timeoutMs)
272+ }
273+
274+ private fun cancelConnectTimeout () {
275+ connectTimeoutRunnable?.let { mainHandler.removeCallbacks(it) }
276+ connectTimeoutRunnable = null
222277 }
223278
224279 private fun handleMessage (text : String ) {
@@ -231,10 +286,7 @@ class DDPClient {
231286 when (json.optString(" msg" )) {
232287 " connected" -> {
233288 isConnected = true
234- mainHandler.removeCallbacksAndMessages(null )
235- val cb = connectedCallback
236- connectedCallback = null
237- cb?.let { mainHandler.post { it(true ) } }
289+ tryDeliverConnectOutcome(true )
238290 }
239291
240292 " ping" -> {
@@ -301,4 +353,24 @@ class DDPClient {
301353 val scheme = if (useSsl) " wss" else " ws"
302354 return " $scheme ://$normalizedHost /websocket"
303355 }
356+
357+ internal fun testStartConnectTimeout (timeoutMs : Long , callback : (Boolean ) -> Unit ) {
358+ resetConnectHandshakeState()
359+ waitForConnected(timeoutMs, callback)
360+ }
361+
362+ internal fun testDeliverRawMessage (text : String ) {
363+ handleMessage(text)
364+ }
365+
366+ internal fun testDeliverConnectFailure (fromWebSocket : WebSocket ? = null) {
367+ mainHandler.post {
368+ if (fromWebSocket != null && fromWebSocket != = this @DDPClient.webSocket) return @post
369+ tryDeliverConnectOutcome(false )
370+ }
371+ }
372+
373+ internal fun testSetActiveWebSocket (ws : WebSocket ? ) {
374+ this .webSocket = ws
375+ }
304376}
0 commit comments