Skip to content

Commit 8d1771b

Browse files
committed
feat(dashboard): Implemented basic snapshot service, polling service and ws connection management
1 parent 4b48184 commit 8d1771b

3 files changed

Lines changed: 218 additions & 0 deletions

File tree

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { createLogger } from '../../factories/logger-factory'
2+
3+
type Tick = () => Promise<void> | void
4+
5+
const debug = createLogger('dashboard-service:polling')
6+
7+
export class PollingScheduler {
8+
private timer: NodeJS.Timer | undefined
9+
10+
public constructor(
11+
private readonly intervalMs: number,
12+
private readonly tick: Tick,
13+
) { }
14+
15+
public start(): void {
16+
if (this.timer) {
17+
return
18+
}
19+
20+
debug('starting scheduler with interval %d ms', this.intervalMs)
21+
22+
this.timer = setInterval(() => {
23+
Promise.resolve(this.tick())
24+
.catch((error) => {
25+
console.error('dashboard-service: polling tick failed', error)
26+
})
27+
}, this.intervalMs)
28+
}
29+
30+
public stop(): void {
31+
if (!this.timer) {
32+
return
33+
}
34+
35+
debug('stopping scheduler')
36+
clearInterval(this.timer)
37+
this.timer = undefined
38+
}
39+
40+
public isRunning(): boolean {
41+
return Boolean(this.timer)
42+
}
43+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { KPISnapshot } from '../types'
2+
3+
export class SnapshotService {
4+
private sequence = 0
5+
6+
private snapshot: KPISnapshot = {
7+
sequence: this.sequence,
8+
generatedAt: new Date(0).toISOString(),
9+
status: 'placeholder',
10+
metrics: {
11+
eventsByKind: [],
12+
admittedUsers: null,
13+
satsPaid: null,
14+
topTalkers: [],
15+
},
16+
}
17+
18+
public getSnapshot(): KPISnapshot {
19+
return this.snapshot
20+
}
21+
22+
// Phase 1 placeholder: advances sequence/time so polling and websocket flow can be validated end-to-end.
23+
public refreshPlaceholder(): KPISnapshot {
24+
this.sequence += 1
25+
26+
this.snapshot = {
27+
...this.snapshot,
28+
sequence: this.sequence,
29+
generatedAt: new Date().toISOString(),
30+
}
31+
32+
return this.snapshot
33+
}
34+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { DashboardServerMessage, KPISnapshot } from '../types'
2+
import { RawData, WebSocketServer } from 'ws'
3+
import { createLogger } from '../../factories/logger-factory'
4+
import WebSocket from 'ws'
5+
6+
const debug = createLogger('dashboard-service:ws')
7+
8+
export class DashboardWebSocketHub {
9+
public constructor(
10+
private readonly webSocketServer: WebSocketServer,
11+
private readonly getSnapshot: () => KPISnapshot,
12+
) {
13+
console.info('dashboard-service: websocket hub initialized')
14+
15+
this.webSocketServer
16+
.on('connection', this.onConnection.bind(this))
17+
.on('close', () => {
18+
console.info('dashboard-service: websocket server closed')
19+
})
20+
.on('error', (error) => {
21+
console.error('dashboard-service: websocket server error', error)
22+
})
23+
}
24+
25+
public broadcastSnapshot(snapshot: KPISnapshot): void {
26+
this.broadcast({
27+
type: 'kpi.snapshot',
28+
payload: snapshot,
29+
})
30+
}
31+
32+
public broadcastTick(sequence: number): void {
33+
this.broadcast({
34+
type: 'kpi.tick',
35+
payload: {
36+
at: new Date().toISOString(),
37+
sequence,
38+
},
39+
})
40+
}
41+
42+
public close(): void {
43+
console.info('dashboard-service: closing websocket hub')
44+
this.webSocketServer.clients.forEach((client) => {
45+
client.close()
46+
})
47+
this.webSocketServer.removeAllListeners()
48+
}
49+
50+
private onConnection(client: WebSocket): void {
51+
const connectedClients = this.getConnectedClientsCount()
52+
console.info('dashboard-service: websocket client connected (clients=%d)', connectedClients)
53+
54+
client
55+
.on('close', (code, reason) => {
56+
console.info(
57+
'dashboard-service: websocket client disconnected (code=%d, reason=%s, clients=%d)',
58+
code,
59+
reason.toString(),
60+
this.getConnectedClientsCount(),
61+
)
62+
})
63+
.on('error', (error) => {
64+
console.error('dashboard-service: websocket client error', error)
65+
})
66+
.on('message', (raw) => {
67+
this.onClientMessage(raw)
68+
})
69+
70+
this.send(client, {
71+
type: 'dashboard.connected',
72+
payload: {
73+
at: new Date().toISOString(),
74+
},
75+
})
76+
77+
this.send(client, {
78+
type: 'kpi.snapshot',
79+
payload: this.getSnapshot(),
80+
})
81+
82+
debug('dashboard websocket bootstrap snapshot sent')
83+
}
84+
85+
private onClientMessage(raw: RawData): void {
86+
try {
87+
const rawMessage = this.toUtf8(raw)
88+
const message = JSON.parse(rawMessage)
89+
debug('dashboard websocket client message received: %o', message)
90+
} catch (error) {
91+
console.error('dashboard-service: websocket message parsing failed', error)
92+
}
93+
}
94+
95+
private broadcast(message: DashboardServerMessage): void {
96+
this.webSocketServer.clients.forEach((client) => {
97+
if (client.readyState !== WebSocket.OPEN) {
98+
return
99+
}
100+
this.send(client, message)
101+
})
102+
}
103+
104+
private send(client: WebSocket, message: DashboardServerMessage): void {
105+
if (client.readyState !== WebSocket.OPEN) {
106+
return
107+
}
108+
109+
try {
110+
client.send(JSON.stringify(message))
111+
} catch (error) {
112+
console.error('dashboard-service: websocket send failed', error)
113+
}
114+
}
115+
116+
private toUtf8(raw: RawData): string {
117+
if (typeof raw === 'string') {
118+
return raw
119+
}
120+
121+
if (Buffer.isBuffer(raw)) {
122+
return raw.toString('utf8')
123+
}
124+
125+
if (Array.isArray(raw)) {
126+
return raw.map((chunk) => {
127+
if (Buffer.isBuffer(chunk)) {
128+
return chunk.toString('utf8')
129+
}
130+
131+
return Buffer.from(chunk as ArrayBuffer).toString('utf8')
132+
}).join('')
133+
}
134+
135+
return Buffer.from(raw).toString('utf8')
136+
}
137+
138+
private getConnectedClientsCount(): number {
139+
return Array.from(this.webSocketServer.clients).filter((client) => client.readyState === WebSocket.OPEN).length
140+
}
141+
}

0 commit comments

Comments
 (0)