|
| 1 | +import { Client, ClientConfig } from 'pg' |
| 2 | +import { createLogger } from '../../factories/logger-factory' |
| 3 | +import { DashboardMetrics } from '../types' |
| 4 | +import { DatabaseClient } from '../../@types/base' |
| 5 | +import { IncrementalKPICollectorService } from './incremental-kpi-collector-service' |
| 6 | + |
| 7 | +const debug = createLogger('dashboard-service:stateful-incremental-kpi-collector') |
| 8 | + |
| 9 | +const DEFAULT_EVENTS_CHANNEL = 'dashboard_events_changed' |
| 10 | +const DEFAULT_USERS_CHANNEL = 'dashboard_users_changed' |
| 11 | + |
| 12 | +const isValidChannelName = (channel: string): boolean => { |
| 13 | + return /^[a-zA-Z_][a-zA-Z0-9_]*$/.test(channel) |
| 14 | +} |
| 15 | + |
| 16 | +const getListenerConnectionConfig = (): ClientConfig => { |
| 17 | + if (process.env.DB_URI) { |
| 18 | + return { |
| 19 | + connectionString: process.env.DB_URI, |
| 20 | + } |
| 21 | + } |
| 22 | + |
| 23 | + return { |
| 24 | + host: process.env.DB_HOST, |
| 25 | + port: Number(process.env.DB_PORT), |
| 26 | + user: process.env.DB_USER, |
| 27 | + password: process.env.DB_PASSWORD, |
| 28 | + database: process.env.DB_NAME, |
| 29 | + } |
| 30 | +} |
| 31 | + |
| 32 | +const defaultMetrics = (): DashboardMetrics => { |
| 33 | + return { |
| 34 | + eventsByKind: [], |
| 35 | + admittedUsers: 0, |
| 36 | + satsPaid: 0, |
| 37 | + topTalkers: { |
| 38 | + allTime: [], |
| 39 | + recent: [], |
| 40 | + }, |
| 41 | + } |
| 42 | +} |
| 43 | + |
| 44 | +export class StatefulIncrementalKPICollectorService { |
| 45 | + private cachedMetrics: DashboardMetrics = defaultMetrics() |
| 46 | + |
| 47 | + private hasCache = false |
| 48 | + |
| 49 | + private isDirty = true |
| 50 | + |
| 51 | + private isListenerReady = false |
| 52 | + |
| 53 | + /** Set to true permanently once close() is called — prevents reconnect loops after shutdown. */ |
| 54 | + private isClosed = false |
| 55 | + |
| 56 | + private listenerClient: Client | undefined |
| 57 | + |
| 58 | + private reconnectTimer: ReturnType<typeof setTimeout> | undefined |
| 59 | + |
| 60 | + private readonly incrementalCollector: IncrementalKPICollectorService |
| 61 | + |
| 62 | + private readonly channels: string[] |
| 63 | + |
| 64 | + private static readonly BASE_DELAY_MS = 500 |
| 65 | + private static readonly MAX_DELAY_MS = 30_000 |
| 66 | + |
| 67 | + /** Backoff state — reset to BASE_DELAY_MS on every successful connect. */ |
| 68 | + private reconnectDelayMs = StatefulIncrementalKPICollectorService.BASE_DELAY_MS |
| 69 | + |
| 70 | + public constructor( |
| 71 | + dbClient: DatabaseClient, |
| 72 | + trackedKinds?: number[], |
| 73 | + topTalkersLimit?: number, |
| 74 | + recentDays?: number, |
| 75 | + ) { |
| 76 | + this.incrementalCollector = new IncrementalKPICollectorService( |
| 77 | + dbClient, |
| 78 | + trackedKinds, |
| 79 | + topTalkersLimit, |
| 80 | + recentDays, |
| 81 | + ) |
| 82 | + |
| 83 | + this.channels = [ |
| 84 | + process.env.DASHBOARD_EVENTS_NOTIFY_CHANNEL ?? DEFAULT_EVENTS_CHANNEL, |
| 85 | + process.env.DASHBOARD_USERS_NOTIFY_CHANNEL ?? DEFAULT_USERS_CHANNEL, |
| 86 | + ] |
| 87 | + } |
| 88 | + |
| 89 | + public async collectMetrics(): Promise<DashboardMetrics> { |
| 90 | + // Kick off a connect attempt if the listener isn't alive yet. |
| 91 | + // We don't await here — the listener is best-effort; data comes from the |
| 92 | + // incremental collector regardless. |
| 93 | + if (!this.isListenerReady && !this.listenerClient) { |
| 94 | + this.scheduleReconnect(0) |
| 95 | + } |
| 96 | + |
| 97 | + if (!this.hasCache || this.isDirty) { |
| 98 | + this.cachedMetrics = await this.incrementalCollector.collectMetrics() |
| 99 | + this.hasCache = true |
| 100 | + this.isDirty = false |
| 101 | + } |
| 102 | + |
| 103 | + return this.cachedMetrics |
| 104 | + } |
| 105 | + |
| 106 | + public async close(): Promise<void> { |
| 107 | + this.isClosed = true |
| 108 | + |
| 109 | + if (this.reconnectTimer) { |
| 110 | + clearTimeout(this.reconnectTimer) |
| 111 | + this.reconnectTimer = undefined |
| 112 | + } |
| 113 | + |
| 114 | + const client = this.listenerClient |
| 115 | + this.listenerClient = undefined |
| 116 | + this.isListenerReady = false |
| 117 | + |
| 118 | + if (!client) { |
| 119 | + return |
| 120 | + } |
| 121 | + |
| 122 | + for (const channel of this.channels) { |
| 123 | + if (!isValidChannelName(channel)) { |
| 124 | + continue |
| 125 | + } |
| 126 | + |
| 127 | + try { |
| 128 | + await client.query(`UNLISTEN ${channel}`) |
| 129 | + } catch (error) { |
| 130 | + console.error('dashboard-service: failed to unlisten channel', { |
| 131 | + channel, |
| 132 | + error, |
| 133 | + }) |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + client.removeAllListeners('notification') |
| 138 | + client.removeAllListeners('error') |
| 139 | + client.removeAllListeners('end') |
| 140 | + |
| 141 | + try { |
| 142 | + await client.end() |
| 143 | + } catch (error) { |
| 144 | + console.error('dashboard-service: failed to close stateful incremental collector listener', error) |
| 145 | + } |
| 146 | + } |
| 147 | + |
| 148 | + |
| 149 | + /** |
| 150 | + * Schedule a reconnect attempt after `delayMs` milliseconds. |
| 151 | + * Passing 0 connects immediately (used on first call and after close via `close()`). |
| 152 | + */ |
| 153 | + private scheduleReconnect(delayMs: number): void { |
| 154 | + if (this.isClosed || this.reconnectTimer || this.isListenerReady) { |
| 155 | + return |
| 156 | + } |
| 157 | + |
| 158 | + debug('scheduling listener reconnect in %d ms', delayMs) |
| 159 | + |
| 160 | + this.reconnectTimer = setTimeout(() => { |
| 161 | + this.reconnectTimer = undefined |
| 162 | + this.connectListener().catch((err) => { |
| 163 | + // connectListener already logs; just ensure the loop continues. |
| 164 | + debug('connectListener threw unexpectedly: %o', err) |
| 165 | + }) |
| 166 | + }, delayMs) |
| 167 | + } |
| 168 | + |
| 169 | + private async connectListener(): Promise<void> { |
| 170 | + if (this.isClosed || this.isListenerReady) { |
| 171 | + return |
| 172 | + } |
| 173 | + |
| 174 | + const client = new Client(getListenerConnectionConfig()) |
| 175 | + |
| 176 | + client.on('notification', (notification) => { |
| 177 | + if (!notification.channel || !this.channels.includes(notification.channel)) { |
| 178 | + return |
| 179 | + } |
| 180 | + |
| 181 | + this.isDirty = true |
| 182 | + debug('received postgres notification on channel=%s', notification.channel) |
| 183 | + }) |
| 184 | + |
| 185 | + client.on('error', (error) => { |
| 186 | + this.isDirty = true |
| 187 | + this.isListenerReady = false |
| 188 | + console.error('dashboard-service: stateful incremental collector listener error', error) |
| 189 | + // Don't call scheduleReconnect here — 'end' will always fire after 'error' |
| 190 | + // on a pg.Client, so we reconnect from the 'end' handler to avoid double-scheduling. |
| 191 | + }) |
| 192 | + |
| 193 | + client.on('end', () => { |
| 194 | + this.isDirty = true |
| 195 | + this.isListenerReady = false |
| 196 | + this.listenerClient = undefined |
| 197 | + debug('postgres stateful incremental collector listener ended — will reconnect in %d ms', this.reconnectDelayMs) |
| 198 | + |
| 199 | + if (!this.isClosed) { |
| 200 | + this.scheduleReconnect(this.reconnectDelayMs) |
| 201 | + // Exponential backoff, capped at MAX_DELAY_MS. |
| 202 | + this.reconnectDelayMs = Math.min( |
| 203 | + this.reconnectDelayMs * 2, |
| 204 | + StatefulIncrementalKPICollectorService.MAX_DELAY_MS, |
| 205 | + ) |
| 206 | + } |
| 207 | + }) |
| 208 | + |
| 209 | + try { |
| 210 | + await client.connect() |
| 211 | + |
| 212 | + for (const channel of this.channels) { |
| 213 | + if (!isValidChannelName(channel)) { |
| 214 | + console.error('dashboard-service: skipping invalid notify channel name', channel) |
| 215 | + continue |
| 216 | + } |
| 217 | + |
| 218 | + await client.query(`LISTEN ${channel}`) |
| 219 | + } |
| 220 | + |
| 221 | + this.listenerClient = client |
| 222 | + this.isListenerReady = true |
| 223 | + // Reset backoff on successful connect. |
| 224 | + this.reconnectDelayMs = StatefulIncrementalKPICollectorService.BASE_DELAY_MS |
| 225 | + debug('postgres stateful incremental collector listener initialized for channels=%o', this.channels) |
| 226 | + } catch (error) { |
| 227 | + this.isDirty = true |
| 228 | + this.listenerClient = undefined |
| 229 | + this.isListenerReady = false |
| 230 | + console.error('dashboard-service: unable to initialize stateful incremental collector listener', error) |
| 231 | + |
| 232 | + try { |
| 233 | + await client.end() |
| 234 | + } catch (_closeError) { |
| 235 | + // best effort — 'end' handler above will fire and schedule the next reconnect |
| 236 | + } |
| 237 | + } |
| 238 | + } |
| 239 | +} |
0 commit comments