Skip to content

Commit f3e8b85

Browse files
committed
review: fix cdp monitor bugs from code review
1 parent 1fe72dd commit f3e8b85

6 files changed

Lines changed: 160 additions & 52 deletions

File tree

server/lib/cdpmonitor/computed.go

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,19 @@ func stopTimer(t *time.Timer) {
6161

6262
// resetOnNavigation resets all state machines. Called on Page.frameNavigated.
6363
// Increments navSeq so any AfterFunc callbacks already running will discard their results.
64-
func (s *computedState) resetOnNavigation() {
64+
// inflight is the number of in-flight requests from other sessions
65+
// (e.g. subframes) that were not cleared by the navigation; netPending is set
66+
// to this value instead of zero so that their eventual loadingFinished events
67+
// decrement correctly.
68+
func (s *computedState) resetOnNavigation(inflight int) {
6569
s.mu.Lock()
6670
defer s.mu.Unlock()
6771

6872
s.navSeq++
6973

7074
stopTimer(s.netTimer)
7175
s.netTimer = nil
72-
s.netPending = 0
76+
s.netPending = inflight
7377
s.netFired = false
7478

7579
stopTimer(s.layoutTimer)
@@ -109,21 +113,25 @@ func (s *computedState) onLoadingFinished() {
109113
navSeq := s.navSeq
110114
s.netTimer = time.AfterFunc(networkIdleDebounce, func() {
111115
s.mu.Lock()
112-
defer s.mu.Unlock()
113116
if s.navSeq != navSeq || s.netFired || s.netPending > 0 {
117+
s.mu.Unlock()
114118
return
115119
}
116120
s.netFired = true
117121
s.navNetIdle = true
118-
s.publish(events.Event{
122+
evs := []events.Event{{
119123
Ts: time.Now().UnixMilli(),
120124
Type: EventNetworkIdle,
121125
Category: events.CategoryNetwork,
122126
Source: events.Source{Kind: events.KindCDP},
123127
DetailLevel: events.DetailStandard,
124128
Data: json.RawMessage(`{}`),
125-
})
126-
s.checkNavigationSettled()
129+
}}
130+
evs = append(evs, s.pendingNavigationSettled()...)
131+
s.mu.Unlock()
132+
for _, ev := range evs {
133+
s.publish(ev)
134+
}
127135
})
128136
}
129137

@@ -157,42 +165,51 @@ func (s *computedState) onLayoutShift() {
157165
// emitLayoutSettled is called from the layout timer's AfterFunc goroutine.
158166
func (s *computedState) emitLayoutSettled(navSeq int) {
159167
s.mu.Lock()
160-
defer s.mu.Unlock()
161168
if s.navSeq != navSeq || s.layoutFired || !s.pageLoadSeen {
169+
s.mu.Unlock()
162170
return
163171
}
164172
s.layoutFired = true
165173
s.navLayoutSettled = true
166-
s.publish(events.Event{
174+
evs := []events.Event{{
167175
Ts: time.Now().UnixMilli(),
168176
Type: EventLayoutSettled,
169177
Category: events.CategoryPage,
170178
Source: events.Source{Kind: events.KindCDP},
171179
DetailLevel: events.DetailStandard,
172180
Data: json.RawMessage(`{}`),
173-
})
174-
s.checkNavigationSettled()
181+
}}
182+
evs = append(evs, s.pendingNavigationSettled()...)
183+
s.mu.Unlock()
184+
for _, ev := range evs {
185+
s.publish(ev)
186+
}
175187
}
176188

177189
// onDOMContentLoaded is called on Page.domContentEventFired.
178190
func (s *computedState) onDOMContentLoaded() {
179191
s.mu.Lock()
180-
defer s.mu.Unlock()
181192
s.navDOMLoaded = true
182-
s.checkNavigationSettled()
193+
evs := s.pendingNavigationSettled()
194+
s.mu.Unlock()
195+
for _, ev := range evs {
196+
s.publish(ev)
197+
}
183198
}
184199

185-
// checkNavigationSettled emits navigation_settled if all three flags are set
186-
func (s *computedState) checkNavigationSettled() {
200+
// pendingNavigationSettled returns a navigation_settled event if all three
201+
// conditions are met. Must be called with s.mu held.
202+
func (s *computedState) pendingNavigationSettled() []events.Event {
187203
if s.navDOMLoaded && s.navNetIdle && s.navLayoutSettled && !s.navFired {
188204
s.navFired = true
189-
s.publish(events.Event{
205+
return []events.Event{{
190206
Ts: time.Now().UnixMilli(),
191207
Type: EventNavigationSettled,
192208
Category: events.CategoryPage,
193209
Source: events.Source{Kind: events.KindCDP},
194210
DetailLevel: events.DetailStandard,
195211
Data: json.RawMessage(`{}`),
196-
})
212+
}}
197213
}
214+
return nil
198215
}

server/lib/cdpmonitor/handlers.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func (m *Monitor) dispatchEvent(msg cdpMessage) {
6060
m.handleTargetCreated(msg.Params, msg.SessionID)
6161
case "Target.targetDestroyed":
6262
m.handleTargetDestroyed(msg.Params, msg.SessionID)
63+
case "Target.detachedFromTarget":
64+
m.handleDetachedFromTarget(msg.Params)
6365
}
6466
}
6567

@@ -229,7 +231,9 @@ func (m *Monitor) handleLoadingFinished(params json.RawMessage, sessionID string
229231
}
230232
m.computed.onLoadingFinished()
231233
// Fetch response body async to avoid blocking readLoop; binary types are skipped.
234+
m.asyncWg.Add(1)
232235
go func() {
236+
defer m.asyncWg.Done()
233237
body := m.fetchResponseBody(p.RequestID, sessionID, state)
234238
ev := map[string]any{
235239
"method": state.method,
@@ -341,9 +345,11 @@ func (m *Monitor) handleFrameNavigated(params json.RawMessage, sessionID string)
341345
delete(m.pendingRequests, id)
342346
}
343347
}
348+
inflight := len(m.pendingRequests)
349+
// Reset computed state while still holding pendReqMu so new requests
350+
// arriving concurrently can't increment netPending before the reset.
351+
m.computed.resetOnNavigation(inflight)
344352
m.pendReqMu.Unlock()
345-
346-
m.computed.resetOnNavigation()
347353
}
348354
}
349355

@@ -373,7 +379,9 @@ func (m *Monitor) handleAttachedToTarget(msg cdpMessage) {
373379
m.sessionsMu.Unlock()
374380

375381
// Async to avoid blocking the readLoop.
382+
m.asyncWg.Add(1)
376383
go func() {
384+
defer m.asyncWg.Done()
377385
m.enableDomains(m.getLifecycleCtx(), params.SessionID)
378386
_ = m.injectScript(m.getLifecycleCtx(), params.SessionID)
379387
}()
@@ -404,3 +412,15 @@ func (m *Monitor) handleTargetDestroyed(params json.RawMessage, sessionID string
404412
})
405413
m.publishEvent(EventTargetDestroyed, events.DetailMinimal, events.Source{Kind: events.KindCDP}, "Target.targetDestroyed", data, sessionID)
406414
}
415+
416+
func (m *Monitor) handleDetachedFromTarget(params json.RawMessage) {
417+
var p struct {
418+
SessionID string `json:"sessionId"`
419+
}
420+
if err := json.Unmarshal(params, &p); err != nil || p.SessionID == "" {
421+
return
422+
}
423+
m.sessionsMu.Lock()
424+
delete(m.sessions, p.SessionID)
425+
m.sessionsMu.Unlock()
426+
}

server/lib/cdpmonitor/monitor.go

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ type Monitor struct {
5151
lastScreenshotAt atomic.Int64 // unix millis of last capture
5252
screenshotFn func(ctx context.Context, displayNum int) ([]byte, error) // nil → real ffmpeg
5353

54+
asyncWg sync.WaitGroup // tracks in-flight goroutines (fetchResponseBody, enableDomains, etc.)
55+
restartMu sync.Mutex // serializes handleUpstreamRestart to prevent overlapping reconnects
56+
5457
lifecycleCtx context.Context // cancelled on Stop()
5558
cancel context.CancelFunc
5659
done chan struct{}
60+
readReady chan struct{} // closed when readLoop has started reading
5761

5862
running atomic.Bool
5963
}
@@ -112,13 +116,18 @@ func (m *Monitor) Start(parentCtx context.Context) error {
112116
m.lifecycleCtx = ctx
113117
m.cancel = cancel
114118
m.done = make(chan struct{})
119+
m.readReady = make(chan struct{})
115120
m.lifeMu.Unlock()
116121

117122
m.running.Store(true)
118123

119124
go m.readLoop(ctx)
120125
go m.subscribeToUpstream(ctx)
121-
go m.initSession(ctx) // must run after readLoop starts
126+
m.asyncWg.Add(1)
127+
go func() {
128+
defer m.asyncWg.Done()
129+
m.initSession(ctx)
130+
}()
122131

123132
return nil
124133
}
@@ -140,6 +149,10 @@ func (m *Monitor) Stop() {
140149
<-done
141150
}
142151

152+
// Wait for all in-flight async goroutines (fetchResponseBody, enableDomains,
153+
// screenshots) to finish before closing the connection they may be writing to.
154+
m.asyncWg.Wait()
155+
143156
m.lifeMu.Lock()
144157
if m.conn != nil {
145158
_ = m.conn.Close(websocket.StatusNormalClosure, "stopped")
@@ -165,7 +178,7 @@ func (m *Monitor) clearState() {
165178

166179
m.failPendingCommands()
167180

168-
m.computed.resetOnNavigation()
181+
m.computed.resetOnNavigation(0)
169182
}
170183

171184
// failPendingCommands unblocks all in-flight send() calls by delivering an
@@ -194,13 +207,17 @@ func (m *Monitor) readLoop(ctx context.Context) {
194207
m.lifeMu.Lock()
195208
done := m.done
196209
conn := m.conn
210+
readReady := m.readReady
197211
m.lifeMu.Unlock()
198212
defer close(done)
199213

200214
if conn == nil {
201215
return
202216
}
203217

218+
// Signal that readLoop is ready to receive responses.
219+
close(readReady)
220+
204221
for {
205222
_, b, err := conn.Read(ctx)
206223
if err != nil {
@@ -284,7 +301,16 @@ func (m *Monitor) send(ctx context.Context, method string, params any, sessionID
284301

285302
// initSession enables CDP domains, injects the interaction-tracking script,
286303
// and manually attaches to any targets already open when the monitor started.
304+
// It waits for readLoop to be ready before sending any commands.
287305
func (m *Monitor) initSession(ctx context.Context) {
306+
m.lifeMu.Lock()
307+
readReady := m.readReady
308+
m.lifeMu.Unlock()
309+
select {
310+
case <-readReady:
311+
case <-ctx.Done():
312+
return
313+
}
288314
_, _ = m.send(ctx, "Target.setAutoAttach", map[string]any{
289315
"autoAttach": true,
290316
"waitForDebuggerOnStart": false,
@@ -324,7 +350,9 @@ func (m *Monitor) attachExistingTargets(ctx context.Context) {
324350
if alreadyAttached {
325351
continue
326352
}
353+
m.asyncWg.Add(1)
327354
go func(targetID string) {
355+
defer m.asyncWg.Done()
328356
res, err := m.send(ctx, "Target.attachToTarget", map[string]any{
329357
"targetId": targetID,
330358
"flatten": true,
@@ -344,18 +372,26 @@ func (m *Monitor) attachExistingTargets(ctx context.Context) {
344372
}
345373

346374
// restartReadLoop waits for the current readLoop to exit, then starts a new one.
347-
func (m *Monitor) restartReadLoop(ctx context.Context) {
375+
// Returns false if the context was cancelled before the restart completed.
376+
func (m *Monitor) restartReadLoop(ctx context.Context) bool {
348377
m.lifeMu.Lock()
349378
done := m.done
350379
m.lifeMu.Unlock()
351380

352-
<-done
381+
// Wait for old readLoop, but bail if context is cancelled (e.g. Stop called).
382+
select {
383+
case <-done:
384+
case <-ctx.Done():
385+
return false
386+
}
353387

354388
m.lifeMu.Lock()
355389
m.done = make(chan struct{})
390+
m.readReady = make(chan struct{})
356391
m.lifeMu.Unlock()
357392

358393
go m.readLoop(ctx)
394+
return true
359395
}
360396

361397
// subscribeToUpstream reconnects with backoff on Chrome restarts, publishing disconnect/reconnect events.
@@ -377,8 +413,15 @@ func (m *Monitor) subscribeToUpstream(ctx context.Context) {
377413
}
378414

379415
// handleUpstreamRestart tears down the old connection, reconnects with backoff,
380-
// and re-initializes the CDP session.
416+
// and re-initializes the CDP session. Serialized by restartMu to prevent
417+
// overlapping reconnects from rapid successive Chrome restarts.
381418
func (m *Monitor) handleUpstreamRestart(ctx context.Context, newURL string) {
419+
m.restartMu.Lock()
420+
defer m.restartMu.Unlock()
421+
422+
if ctx.Err() != nil {
423+
return
424+
}
382425
m.publish(events.Event{
383426
Ts: time.Now().UnixMilli(),
384427
Type: EventMonitorDisconnected,
@@ -403,8 +446,14 @@ func (m *Monitor) handleUpstreamRestart(ctx context.Context, newURL string) {
403446
return
404447
}
405448

406-
m.restartReadLoop(ctx)
407-
go m.initSession(ctx)
449+
if !m.restartReadLoop(ctx) {
450+
return
451+
}
452+
m.asyncWg.Add(1)
453+
go func() {
454+
defer m.asyncWg.Done()
455+
m.initSession(ctx)
456+
}()
408457

409458
m.publish(events.Event{
410459
Ts: time.Now().UnixMilli(),

server/lib/cdpmonitor/screenshot.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,20 @@ func (m *Monitor) tryScreenshot(ctx context.Context) {
2525
if !m.lastScreenshotAt.CompareAndSwap(last, now) {
2626
return
2727
}
28-
go m.captureScreenshot(ctx)
28+
m.asyncWg.Add(1)
29+
go func() {
30+
defer m.asyncWg.Done()
31+
m.captureScreenshot(ctx)
32+
}()
2933
}
3034

35+
const screenshotTimeout = 10 * time.Second
36+
3137
// captureScreenshot takes a screenshot via ffmpeg x11grab (or the screenshotFn
3238
// seam in tests), optionally downscales it, and publishes a screenshot event.
33-
func (m *Monitor) captureScreenshot(ctx context.Context) {
39+
func (m *Monitor) captureScreenshot(parentCtx context.Context) {
40+
ctx, cancel := context.WithTimeout(parentCtx, screenshotTimeout)
41+
defer cancel()
3442
var pngBytes []byte
3543
var err error
3644

server/lib/cdpmonitor/util.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,22 @@ func bodyCapFor(mime string) int {
9999
}
100100

101101
// truncateBody caps body at the given limit on a valid UTF-8 boundary.
102+
// The result never splits a multi-byte rune.
102103
func truncateBody(body string, maxBody int) string {
103104
if len(body) <= maxBody {
104105
return body
105106
}
106-
if maxBody <= utf8.UTFMax {
107-
return body[:maxBody]
107+
if maxBody <= 0 {
108+
return ""
108109
}
109-
// Walk back at most UTFMax bytes to find a clean rune boundary.
110-
i := maxBody
111-
for i > maxBody-utf8.UTFMax && !utf8.RuneStart(body[i]) {
112-
i--
110+
// Walk forward through complete runes, stopping before we exceed maxBody.
111+
end := 0
112+
for end < maxBody {
113+
_, size := utf8.DecodeRuneInString(body[end:])
114+
if end+size > maxBody {
115+
break
116+
}
117+
end += size
113118
}
114-
return body[:i]
119+
return body[:end]
115120
}

0 commit comments

Comments
 (0)