Skip to content

Commit 65f23a0

Browse files
committed
feat(dashboard): Wired up new collectors
1 parent 583773d commit 65f23a0

4 files changed

Lines changed: 164 additions & 28 deletions

File tree

src/dashboard-service/app.ts

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1+
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
2+
import { IKPICollector, SnapshotService } from './services/snapshot-service'
13
import { createDashboardRouter } from './api/dashboard-router'
24
import { createLogger } from '../factories/logger-factory'
35
import { DashboardServiceConfig } from './config'
46
import { DashboardWebSocketHub } from './ws/dashboard-ws-hub'
57
import express from 'express'
68
import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler'
79
import http from 'http'
10+
import { IncrementalKPICollectorService } from './services/incremental-kpi-collector-service'
11+
import { KPICollectorService } from './services/kpi-collector-service'
812
import { PollingScheduler } from './polling/polling-scheduler'
9-
import { SnapshotService } from './services/snapshot-service'
13+
import { StatefulIncrementalKPICollectorService } from './services/stateful-incremental-service'
1014
import { WebSocketServer } from 'ws'
11-
1215
const debug = createLogger('dashboard-service:app')
1316

1417
export interface DashboardService {
@@ -22,14 +25,36 @@ export interface DashboardService {
2225

2326
export const createDashboardService = (config: DashboardServiceConfig): DashboardService => {
2427
console.info(
25-
'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d)',
28+
'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d, useDummyData=%s, collectorMode=%s)',
2629
config.host,
2730
config.port,
2831
config.wsPath,
2932
config.pollIntervalMs,
33+
config.useDummyData,
34+
config.collectorMode,
3035
)
3136

32-
const snapshotService = new SnapshotService()
37+
const collector: IKPICollector = config.useDummyData
38+
? {
39+
collectMetrics: async () => ({
40+
eventsByKind: [],
41+
admittedUsers: 0,
42+
satsPaid: 0,
43+
topTalkers: { allTime: [], recent: [] },
44+
}),
45+
}
46+
: (() => {
47+
if (config.collectorMode === 'stateful-incremental') {
48+
return new StatefulIncrementalKPICollectorService(getMasterDbClient())
49+
}
50+
51+
const dbClient = getReadReplicaDbClient()
52+
return config.collectorMode === 'incremental'
53+
? new IncrementalKPICollectorService(dbClient)
54+
: new KPICollectorService(dbClient)
55+
})()
56+
57+
const snapshotService = new SnapshotService(collector)
3358

3459
const app = express()
3560
.disable('x-powered-by')
@@ -44,11 +69,17 @@ export const createDashboardService = (config: DashboardServiceConfig): Dashboar
4469

4570
const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot())
4671

47-
const pollingScheduler = new PollingScheduler(config.pollIntervalMs, () => {
48-
const nextSnapshot = snapshotService.refreshPlaceholder()
49-
debug('poll tick produced snapshot sequence=%d', nextSnapshot.sequence)
50-
webSocketHub.broadcastTick(nextSnapshot.sequence)
51-
webSocketHub.broadcastSnapshot(nextSnapshot)
72+
const pollingScheduler = new PollingScheduler(config.pollIntervalMs, async () => {
73+
const { snapshot, changed } = await snapshotService.refresh()
74+
75+
if (!changed) {
76+
debug('poll tick detected no KPI changes')
77+
return
78+
}
79+
80+
debug('poll tick produced snapshot sequence=%d status=%s', snapshot.sequence, snapshot.status)
81+
webSocketHub.broadcastTick(snapshot.sequence)
82+
webSocketHub.broadcastSnapshot(snapshot)
5283
})
5384

5485
const start = async () => {
@@ -72,13 +103,31 @@ export const createDashboardService = (config: DashboardServiceConfig): Dashboar
72103
})
73104
})
74105

106+
try {
107+
const initialSnapshotRefresh = await snapshotService.refresh()
108+
if (initialSnapshotRefresh.changed) {
109+
debug('initial snapshot prepared with sequence=%d status=%s', initialSnapshotRefresh.snapshot.sequence, initialSnapshotRefresh.snapshot.status)
110+
}
111+
} catch (error) {
112+
console.error('dashboard-service: initial snapshot refresh failed (will retry on next poll)', error)
113+
}
114+
75115
pollingScheduler.start()
76116
console.info('dashboard-service: polling scheduler started')
77117
}
78118

79119
const stop = async () => {
80120
console.info('dashboard-service: stopping service')
81121
pollingScheduler.stop()
122+
123+
if (collector?.close) {
124+
try {
125+
await collector.close()
126+
} catch (error) {
127+
console.error('dashboard-service: failed to close collector resources', error)
128+
}
129+
}
130+
82131
webSocketHub.close()
83132
await new Promise<void>((resolve, reject) => {
84133
if (!webServer.listening) {

src/dashboard-service/config.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@ export interface DashboardServiceConfig {
33
port: number
44
wsPath: string
55
pollIntervalMs: number
6+
useDummyData: boolean
7+
collectorMode: DashboardCollectorMode
8+
}
9+
10+
export type DashboardCollectorMode = 'full' | 'incremental' | 'stateful-incremental'
11+
12+
const parseBoolean = (value: string | undefined, fallback = false): boolean => {
13+
if (typeof value === 'undefined') {
14+
return fallback
15+
}
16+
17+
return value === '1' || value.toLowerCase() === 'true'
618
}
719

820
const parseInteger = (value: string | undefined, fallback: number): number => {
@@ -18,11 +30,29 @@ const parseInteger = (value: string | undefined, fallback: number): number => {
1830
return parsed
1931
}
2032

33+
const parseCollectorMode = (
34+
value: string | undefined,
35+
fallback: DashboardCollectorMode = 'full',
36+
): DashboardCollectorMode => {
37+
if (typeof value === 'undefined') {
38+
return fallback
39+
}
40+
41+
const normalized = value.toLowerCase()
42+
if (normalized === 'full' || normalized === 'incremental' || normalized === 'stateful-incremental') {
43+
return normalized
44+
}
45+
46+
return fallback
47+
}
48+
2149
export const getDashboardServiceConfig = (): DashboardServiceConfig => {
2250
return {
2351
host: process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1',
2452
port: parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011),
2553
wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream',
2654
pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000),
55+
useDummyData: parseBoolean(process.env.DASHBOARD_USE_DUMMY_DATA, false),
56+
collectorMode: parseCollectorMode(process.env.DASHBOARD_COLLECTOR_MODE, 'full'),
2757
}
2858
}
Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,81 @@
1-
import { KPISnapshot } from '../types'
1+
import { DashboardMetrics, KPISnapshot } from '../types'
2+
import { createLogger } from '../../factories/logger-factory'
3+
4+
const debug = createLogger('dashboard-service:snapshot-service')
5+
6+
const defaultMetrics = (): DashboardMetrics => ({
7+
eventsByKind: [],
8+
admittedUsers: 0,
9+
satsPaid: 0,
10+
topTalkers: {
11+
allTime: [],
12+
recent: [],
13+
},
14+
})
15+
16+
export interface ISnapshotRefreshResult {
17+
snapshot: KPISnapshot
18+
changed: boolean
19+
}
20+
21+
export interface IKPICollector {
22+
collectMetrics(): Promise<DashboardMetrics>
23+
close?(): Promise<void> | void
24+
}
225

326
export class SnapshotService {
27+
private metricsFingerprint = JSON.stringify(defaultMetrics())
28+
429
private sequence = 0
530

631
private snapshot: KPISnapshot = {
732
sequence: this.sequence,
833
generatedAt: new Date(0).toISOString(),
9-
status: 'placeholder',
10-
metrics: {
11-
eventsByKind: [],
12-
admittedUsers: null,
13-
satsPaid: null,
14-
topTalkers: [],
15-
},
34+
status: 'live',
35+
metrics: defaultMetrics(),
1636
}
1737

38+
public constructor(private readonly collector: IKPICollector) { }
39+
1840
public getSnapshot(): KPISnapshot {
1941
return this.snapshot
2042
}
2143

22-
// Phase 1 placeholder: advances sequence/time so polling and websocket flow can be validated end-to-end.
23-
public refreshPlaceholder(): KPISnapshot {
44+
/**
45+
* Fetches fresh metrics from the collector and updates the snapshot if the
46+
* metrics have changed. Throws if the collector is unavailable — callers
47+
* are responsible for catching and deciding how to surface errors.
48+
*/
49+
public async refresh(): Promise<ISnapshotRefreshResult> {
50+
const metrics = await this.collector.collectMetrics()
51+
const nextFingerprint = JSON.stringify(metrics)
52+
53+
if (nextFingerprint === this.metricsFingerprint && this.snapshot.status === 'live') {
54+
debug('metrics unchanged, skipping snapshot sequence update')
55+
return {
56+
snapshot: this.snapshot,
57+
changed: false,
58+
}
59+
}
60+
61+
this.metricsFingerprint = nextFingerprint
62+
63+
return this.updateSnapshot(metrics, 'live')
64+
}
65+
66+
private updateSnapshot(metrics: DashboardMetrics, status: 'live' | 'stale'): ISnapshotRefreshResult {
2467
this.sequence += 1
2568

2669
this.snapshot = {
27-
...this.snapshot,
2870
sequence: this.sequence,
2971
generatedAt: new Date().toISOString(),
72+
status,
73+
metrics,
3074
}
3175

32-
return this.snapshot
76+
return {
77+
snapshot: this.snapshot,
78+
changed: true,
79+
}
3380
}
3481
}

src/dashboard-service/types.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,26 @@ export interface TopTalker {
33
count: number
44
}
55

6+
export interface EventsByKindCount {
7+
kind: string
8+
count: number
9+
}
10+
11+
export interface DashboardMetrics {
12+
eventsByKind: EventsByKindCount[]
13+
admittedUsers: number
14+
satsPaid: number
15+
topTalkers: {
16+
allTime: TopTalker[]
17+
recent: TopTalker[]
18+
}
19+
}
20+
621
export interface KPISnapshot {
722
sequence: number
823
generatedAt: string
9-
status: 'placeholder'
10-
metrics: {
11-
eventsByKind: Array<{ kind: string, count: number }>
12-
admittedUsers: number | null
13-
satsPaid: number | null
14-
topTalkers: TopTalker[]
15-
}
24+
status: 'live' | 'stale'
25+
metrics: DashboardMetrics
1626
}
1727

1828
export interface DashboardSnapshotResponse {

0 commit comments

Comments
 (0)