Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

### Added

- **WebSocket server-initiated ping with dead client detection.**
`WebSocket.encode-ping` encodes a ping frame. The server automatically sends
ping frames to idle WebSocket connections (after `App.ws-ping-interval`
seconds, default 30) and closes connections that miss
`App.ws-max-missed-pongs` consecutive pongs (default 3). Pong responses from
clients reset the counter. HTTP connections still use the simple idle timeout.
Setting `ws-ping-interval` to 0 or negative disables pinging.
`App.ws-ping-action` exposes the pure decision function for testing.

- **Multipart form-data parsing.** `FormPart` type represents a single part
from a `multipart/form-data` request, with `name`, optional `filename`,
`content-type`, and `body` fields. `Form.decode-multipart` parses a
Expand Down Expand Up @@ -43,6 +52,10 @@

### Changed

- `ConnState` gains `ws-ping-count` and `ws-last-ping` maps for tracking
server-initiated ping state per WebSocket connection.
- `sweep-idle` now takes a `poll` parameter and sends ping frames to idle
WebSocket connections instead of closing them immediately.
- `WSFrame` gains `rsv` (`Int`) and `masked` (`Bool`) fields in addition to
the existing `fin` field.
- `WSFrame` gains a `fin` field (`Bool`) for the FIN bit.
Expand Down
173 changes: 172 additions & 1 deletion test/websocket.carp
Original file line number Diff line number Diff line change
Expand Up @@ -697,4 +697,175 @@
(match (web-try-ws-upgrade &buf (App.ws-routes &app))
(Maybe.Just info) (< @(Pair.a (Pair.b &info)) 0)
(Maybe.Nothing) false))
"upgrade with missing version returns sentinel (ri < 0)"))
"upgrade with missing version returns sentinel (ri < 0)")

; ---------------------------------------------------------------------------
; Ping frame encoding
; ---------------------------------------------------------------------------

(assert-equal test
9
(let [f (WebSocket.encode-ping &(the (Array Byte) []))]
(match (WebSocket.decode-frame &f 0)
(Maybe.Just frame) @(WSFrame.opcode &frame)
(Maybe.Nothing) -1))
"encode-ping empty has opcode 9")

(assert-equal test
0
(let [f (WebSocket.encode-ping &(the (Array Byte) []))]
(match (WebSocket.decode-frame &f 0)
(Maybe.Just frame) (Array.length (WSFrame.payload &frame))
(Maybe.Nothing) -1))
"encode-ping empty has 0-byte payload")

(assert-equal test
(Byte.from-int 137)
; 0x89 = FIN + ping
(let [f (WebSocket.encode-ping &(the (Array Byte) []))]
@(Array.unsafe-nth &f 0))
"encode-ping sets FIN + opcode 9")

(assert-equal test
&[(Byte.from-int 1) (Byte.from-int 2) (Byte.from-int 3)]
&(let [data [(Byte.from-int 1) (Byte.from-int 2) (Byte.from-int 3)]
f (WebSocket.encode-ping &data)]
(match (WebSocket.decode-frame &f 0)
(Maybe.Just frame) @(WSFrame.payload &frame)
(Maybe.Nothing) (the (Array Byte) [])))
"encode-ping with payload round-trips through decode")

(assert-equal test
2
(let [f (WebSocket.encode-ping &(the (Array Byte) []))] (Array.length &f))
"encode-ping empty is 2 bytes (header only)")

; ---------------------------------------------------------------------------
; Ping/pong symmetry: ping and pong with same payload
; ---------------------------------------------------------------------------

(assert-equal test
true
(let [payload [(Byte.from-int 42)]
ping (WebSocket.encode-ping &payload)
pong (WebSocket.encode-pong &payload)
ping-pl (match (WebSocket.decode-frame &ping 0)
(Maybe.Just f) @(WSFrame.payload &f)
(Maybe.Nothing) (the (Array Byte) []))
pong-pl (match (WebSocket.decode-frame &pong 0)
(Maybe.Just f) @(WSFrame.payload &f)
(Maybe.Nothing) (the (Array Byte) []))]
(= &ping-pl &pong-pl))
"ping and pong with same payload have identical payloads")

; ---------------------------------------------------------------------------
; WebSocket ping state machine (App.ws-ping-action)
; Returns: 0 = nothing, 1 = send ping, 2 = close as dead
; Params: ping-interval max-missed idle pcount since-ping write-pending
; ---------------------------------------------------------------------------

; -- Disabled pinging (interval <= 0) --

(assert-equal test
0
(App.ws-ping-action 0 3 100 0 0 false)
"ws-ping-action: interval=0 disables pinging")

(assert-equal test
0
(App.ws-ping-action -5 3 100 0 0 false)
"ws-ping-action: negative interval disables pinging")

; -- Not yet idle enough --

(assert-equal test
0
(App.ws-ping-action 30 3 10 0 0 false)
"ws-ping-action: idle < interval — nothing")

(assert-equal test
0
(App.ws-ping-action 30 3 29 0 0 false)
"ws-ping-action: idle just under interval — nothing")

; -- First ping --

(assert-equal test
1
(App.ws-ping-action 30 3 30 0 0 false)
"ws-ping-action: first ping at exactly interval")

(assert-equal test
1
(App.ws-ping-action 30 3 45 0 0 false)
"ws-ping-action: first ping well past interval")

; -- Subsequent pings --

(assert-equal test
1
(App.ws-ping-action 30 3 65 1 35 false)
"ws-ping-action: second ping when since-ping >= interval")

(assert-equal test
0
(App.ws-ping-action 30 3 65 1 10 false)
"ws-ping-action: too soon since last ping — nothing")

; -- Dead client detection --

(assert-equal test
2
(App.ws-ping-action 30 3 120 3 35 false)
"ws-ping-action: close after max missed pongs")

(assert-equal test
2
(App.ws-ping-action 30 3 200 5 35 false)
"ws-ping-action: close even with pcount > max")

; -- Write-pending guard --

(assert-equal test
0
(App.ws-ping-action 30 3 35 0 0 true)
"ws-ping-action: skip first ping when write pending")

(assert-equal test
0
(App.ws-ping-action 30 3 65 1 35 true)
"ws-ping-action: skip subsequent ping when write pending")

(assert-equal test
2
(App.ws-ping-action 30 3 120 3 35 true)
"ws-ping-action: close dead client even with write pending")

; -- Full lifecycle: fresh → ping → ping → ping → dead --

(assert-equal test
1
(App.ws-ping-action 30 3 30 0 0 false)
"lifecycle step 1: first ping (idle=30, pcount=0)")

(assert-equal test
1
(App.ws-ping-action 30 3 60 1 30 false)
"lifecycle step 2: second ping (idle=60, pcount=1, since-ping=30)")

(assert-equal test
1
(App.ws-ping-action 30 3 90 2 30 false)
"lifecycle step 3: third ping (idle=90, pcount=2, since-ping=30)")

(assert-equal test
2
(App.ws-ping-action 30 3 120 3 30 false)
"lifecycle step 4: dead (idle=120, pcount=3)")

; -- Pong resets the cycle --

(assert-equal test
1
(App.ws-ping-action 30 3 60 0 0 false)
"after pong resets pcount to 0: resumes pinging"))
76 changes: 69 additions & 7 deletions web.carp
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ frames. The event loop drains the outbox after the handler returns.")
(hidden encode-pong)
(defn encode-pong [payload] (encode-ws-frame 10 payload))

(doc encode-ping
"encodes a WebSocket ping frame with the given payload (may be empty).")
(defn encode-ping [payload] (encode-ws-frame 9 payload))

(hidden encode-close)
(defn encode-close []
(the (Array Byte) [(Byte.from-int 136) (Byte.from-int 0)]))
Expand Down Expand Up @@ -1114,7 +1118,9 @@ responses.")
ws-route-idx (Map Int Int)
ws-params (Map Int (Map String String))
ws-frag-bufs (Map Int (Array Byte))
ws-frag-opcodes (Map Int Int)])
ws-frag-opcodes (Map Int Int)
ws-ping-count (Map Int Int)
ws-last-ping (Map Int Int)])

(defmodule App
(doc create "creates an empty App with no routes and a default error handler.")
Expand Down Expand Up @@ -1180,6 +1186,8 @@ fallback.")

(def max-request-size 1048576)
(def idle-timeout 60)
(def ws-ping-interval 30)
(def ws-max-missed-pongs 3)
(def running true)

; -----------------------------------------------------------------------
Expand Down Expand Up @@ -1236,7 +1244,9 @@ fallback.")
(Map.remove! (ConnState.ws-route-idx cs) &fd)
(Map.remove! (ConnState.ws-params cs) &fd)
(Map.remove! (ConnState.ws-frag-bufs cs) &fd)
(Map.remove! (ConnState.ws-frag-opcodes cs) &fd)))
(Map.remove! (ConnState.ws-frag-opcodes cs) &fd)
(Map.remove! (ConnState.ws-ping-count cs) &fd)
(Map.remove! (ConnState.ws-last-ping cs) &fd)))

; -----------------------------------------------------------------------
; Event handlers
Expand Down Expand Up @@ -1401,6 +1411,9 @@ fallback.")
; ping -> pong
(Array.push-back! (WebSocket.outbox &ws)
(WebSocket.encode-pong (WSFrame.payload &frame)))
(= op 10)
; pong — reset server-initiated ping counter
(Map.put! (ConnState.ws-ping-count cs) &fd &0)
(= op 8)
; close
(do
Expand All @@ -1413,7 +1426,6 @@ fallback.")
(set! should-close true)
(break))
())
; pong (10) — ignore
(set! offset (+ offset consumed)))
; Single data frame too large
(> plen App.max-request-size)
Expand Down Expand Up @@ -1695,14 +1707,62 @@ fallback.")
(ignore (Poll.modify poll fd poll-write)))
(conn-done-writing cs poll fd)))))))))))))))))

(doc ws-ping-action "Decides what to do for a WebSocket connection's ping
state. Returns 0 (nothing), 1 (send ping), or 2 (close as dead).
Setting ping-interval to 0 or negative disables pinging.")
(defn ws-ping-action [ping-interval
max-missed
idle
pcount
since-ping
write-pending]
(cond
(<= ping-interval 0) 0
(< idle ping-interval) 0
(>= pcount max-missed) 2
(and (or (= pcount 0) (>= since-ping ping-interval)) (not write-pending))
1
0))

(hidden sweep-idle)
(defn sweep-idle [cs]
(defn sweep-idle [cs poll]
(let [now (System.time)
keys (Map.keys (ConnState.last-active cs))]
(for [j 0 (Array.length &keys)]
(let [rfd @(Array.unsafe-nth &keys j)
ts (Map.get (ConnState.last-active cs) &rfd)]
(when (> (- now ts) App.idle-timeout) (queue-close cs rfd))))))
ts (Map.get (ConnState.last-active cs) &rfd)
idle (- now ts)
is-ws (Map.contains? (ConnState.ws-route-idx cs) &rfd)]
(if is-ws
; WebSocket: ping/pong dead client detection
(let [pcount (if (Map.contains? (ConnState.ws-ping-count cs) &rfd)
(Map.get (ConnState.ws-ping-count cs) &rfd)
0)
last-ping (if (Map.contains? (ConnState.ws-last-ping cs) &rfd)
(Map.get (ConnState.ws-last-ping cs) &rfd)
0)
since-ping (- now last-ping)
write-pending (Map.contains? (ConnState.write-bufs cs) &rfd)
action (ws-ping-action App.ws-ping-interval
App.ws-max-missed-pongs
idle
pcount
since-ping
write-pending)]
(cond
(= action 2) (queue-close cs rfd)
(= action 1)
(let-do [ping-frame (WebSocket.encode-ping &(the (Array Byte)
[]))]
(Map.put! (ConnState.write-bufs cs) &rfd &ping-frame)
(Map.put! (ConnState.write-positions cs) &rfd &0)
(Map.put! (ConnState.keep-alives cs) &rfd &true)
(Map.put! (ConnState.ws-ping-count cs) &rfd &(+ pcount 1))
(Map.put! (ConnState.ws-last-ping cs) &rfd &now)
(ignore (Poll.modify poll rfd poll-write)))
()))
; HTTP: simple idle timeout
(when (> idle App.idle-timeout) (queue-close cs rfd)))))))

(hidden flush-closed)
(defn flush-closed [cs poll]
Expand Down Expand Up @@ -1781,6 +1841,8 @@ For multi-core scaling, run several copies behind a TCP load balancer.")
{}
{}
{}
{}
{}
{})]
(IO.println &(fmt "Listening on %s:%d" host port))
(set! App.running true)
Expand Down Expand Up @@ -1810,7 +1872,7 @@ For multi-core scaling, run several copies behind a TCP load balancer.")
&poll
fd)
())))
(sweep-idle &cs)
(sweep-idle &cs &poll)
(flush-closed &cs &poll))))
(Poll.close poll)
(TcpListener.close listener))))))
Expand Down
Loading