Skip to content

Commit bde7cde

Browse files
committed
feat: add opt-in event retention purge (#359)
1 parent 1410824 commit bde7cde

13 files changed

Lines changed: 475 additions & 5 deletions

CONFIGURATION.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ Running `nostream` for the first time creates the settings file in `<project_roo
109109
| limits.event.rateLimits[].rate | Maximum number of events during period. |
110110
| limits.event.whitelists.pubkeys | List of public keys to ignore rate limits. |
111111
| limits.event.whitelists.ipAddresses | List of IPs (IPv4 or IPv6) to ignore rate limits. |
112+
| limits.event.retention.maxDays | Maximum number of days to retain events. Purge deletes events that are expired (`expires_at`), soft-deleted (`deleted_at`), or older than this window (`created_at`). Any non-positive value disables retention purge. |
113+
| limits.event.retention.kind.whitelist | Event kinds excluded from retention purge. |
114+
| limits.event.retention.pubkey.whitelist | Public keys excluded from retention purge. |
112115
| limits.client.subscription.maxSubscriptions | Maximum number of subscriptions per connected client. Defaults to 10. Disabled when set to zero. |
113116
| limits.client.subscription.maxFilters | Maximum number of filters per subscription. Defaults to 10. Disabled when set to zero. |
114117
| limits.message.rateLimits[].period | Rate limit period in milliseconds. |

resources/default-settings.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ limits:
7676
- "10.10.10.1"
7777
- "::ffff:10.10.10.1"
7878
event:
79+
retention:
80+
maxDays: -1
81+
kind:
82+
whitelist: []
83+
pubkey:
84+
whitelist: []
7985
eventId:
8086
minLeadingZeroBits: 0
8187
kind:

src/@types/repositories.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,25 @@ import { PassThrough } from 'stream'
22

33
import { DatabaseClient, EventId, Pubkey } from './base'
44
import { DBEvent, Event } from './event'
5+
import { EventKinds } from '../constants/base'
56
import { Invoice } from './invoice'
67
import { SubscriptionFilter } from './subscription'
78
import { User } from './user'
89

10+
export type EventKindsRange = [EventKinds, EventKinds]
11+
12+
export interface EventRetentionOptions {
13+
maxDays?: number
14+
kindWhitelist?: (EventKinds | EventKindsRange)[]
15+
pubkeyWhitelist?: Pubkey[]
16+
}
17+
18+
export interface EventPurgeCounts {
19+
deleted: number
20+
expired: number
21+
retained: number
22+
}
23+
924
export type ExposedPromiseKeys = 'then' | 'catch' | 'finally'
1025

1126
export interface IQueryResult<T> extends Pick<Promise<T>, keyof Promise<T> & ExposedPromiseKeys> {
@@ -21,6 +36,7 @@ export interface IEventRepository {
2136
deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise<number>
2237
deleteByPubkeyExceptKinds(pubkey: Pubkey, excludedKinds: number[]): Promise<number>
2338
hasActiveRequestToVanish(pubkey: Pubkey): Promise<boolean>
39+
deleteExpiredAndRetained(options?: EventRetentionOptions): Promise<EventPurgeCounts>
2440
}
2541

2642
export interface IInvoiceRepository {

src/@types/services.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { Invoice } from './invoice'
22
import { Pubkey } from './base'
33

4+
export interface IMaintenanceService {
5+
clearOldEvents(): Promise<void>
6+
}
7+
48
export interface IPaymentsService {
59
getInvoiceFromPaymentsProcessor(invoice: string | Invoice): Promise<Partial<Invoice>>
610
createInvoice(

src/@types/settings.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,20 @@ export interface EventWhitelists {
6969
ipAddresses?: string[]
7070
}
7171

72+
export interface EventRetentionKindLimits {
73+
whitelist?: (EventKinds | EventKindsRange)[]
74+
}
75+
76+
export interface EventRetentionPubkeyLimits {
77+
whitelist?: Pubkey[]
78+
}
79+
80+
export interface EventRetentionLimits {
81+
maxDays?: number
82+
kind?: EventRetentionKindLimits
83+
pubkey?: EventRetentionPubkeyLimits
84+
}
85+
7286
export interface EventLimits {
7387
eventId?: EventIdLimits
7488
pubkey?: PubkeyLimits
@@ -77,6 +91,7 @@ export interface EventLimits {
7791
content?: ContentLimits | ContentLimits[]
7892
rateLimits?: EventRateLimit[]
7993
whitelists?: EventWhitelists
94+
retention?: EventRetentionLimits
8095
}
8196

8297
export interface ClientSubscriptionLimits {

src/app/maintenance-worker.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
1+
import { IMaintenanceService, IPaymentsService } from '../@types/services'
12
import { mergeDeepLeft, path, pipe } from 'ramda'
23
import { IRunnable } from '../@types/base'
34

45
import { createLogger } from '../factories/logger-factory'
56
import { delayMs } from '../utils/misc'
67
import { InvoiceStatus } from '../@types/invoice'
7-
import { IPaymentsService } from '../@types/services'
88
import { Settings } from '../@types/settings'
99

1010
const UPDATE_INVOICE_INTERVAL = 60000
1111

1212
const debug = createLogger('maintenance-worker')
1313

1414
export class MaintenanceWorker implements IRunnable {
15-
private interval: NodeJS.Timeout | undefined
15+
private interval: NodeJS.Timer | undefined
16+
private isRunning = false
1617

1718
public constructor(
1819
private readonly process: NodeJS.Process,
1920
private readonly paymentsService: IPaymentsService,
21+
private readonly maintenanceService: IMaintenanceService,
2022
private readonly settings: () => Settings,
2123
) {
2224
this.process
@@ -28,12 +30,28 @@ export class MaintenanceWorker implements IRunnable {
2830
}
2931

3032
public run(): void {
31-
this.interval = setInterval(() => this.onSchedule(), UPDATE_INVOICE_INTERVAL)
33+
this.interval = setInterval(async () => {
34+
if (this.isRunning) {
35+
debug('skipping scheduled maintenance run because previous run is still in progress')
36+
return
37+
}
38+
39+
this.isRunning = true
40+
try {
41+
await this.onSchedule()
42+
} catch (error) {
43+
this.onError(error as Error)
44+
} finally {
45+
this.isRunning = false
46+
}
47+
}, UPDATE_INVOICE_INTERVAL)
3248
}
3349

3450
private async onSchedule(): Promise<void> {
3551
const currentSettings = this.settings()
3652

53+
await this.maintenanceService.clearOldEvents()
54+
3755
if (!path(['payments','enabled'], currentSettings)) {
3856
return
3957
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
2+
import { createSettings } from './settings-factory'
3+
import { EventRepository } from '../repositories/event-repository'
4+
import { MaintenanceService } from '../services/maintenance-service'
5+
6+
export const createMaintenanceService = () => {
7+
return new MaintenanceService(
8+
new EventRepository(getMasterDbClient(), getReadReplicaDbClient()),
9+
createSettings
10+
)
11+
}
Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
import { createMaintenanceService } from './maintenance-service-factory'
12
import { createPaymentsService } from './payments-service-factory'
23
import { createSettings } from './settings-factory'
34
import { MaintenanceWorker } from '../app/maintenance-worker'
45

56
export const maintenanceWorkerFactory = () => {
6-
return new MaintenanceWorker(process, createPaymentsService(), createSettings)
7+
return new MaintenanceWorker(
8+
process,
9+
createPaymentsService(),
10+
createMaintenanceService(),
11+
createSettings
12+
)
713
}

src/repositories/event-repository.ts

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import {
3131
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey, EventKinds } from '../constants/base'
3232
import { DatabaseClient, EventId } from '../@types/base'
3333
import { DBEvent, Event } from '../@types/event'
34-
import { IEventRepository, IQueryResult } from '../@types/repositories'
34+
import { EventPurgeCounts, EventRetentionOptions, IEventRepository, IQueryResult } from '../@types/repositories'
3535
import { toBuffer, toJSON } from '../utils/transform'
3636
import { createLogger } from '../factories/logger-factory'
3737
import { isGenericTagQuery } from '../utils/filter'
@@ -322,4 +322,80 @@ export class EventRepository implements IEventRepository {
322322

323323
return Boolean(result)
324324
}
325+
326+
public deleteExpiredAndRetained(options?: EventRetentionOptions): Promise<EventPurgeCounts> {
327+
const now = Math.floor(Date.now() / 1000)
328+
const maxDays = options?.maxDays
329+
330+
if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) {
331+
debug('skipping purge: retention.maxDays is not a positive number')
332+
return Promise.resolve({
333+
deleted: 0,
334+
expired: 0,
335+
retained: 0,
336+
})
337+
}
338+
339+
const retentionLimit = now - (maxDays * 86400)
340+
const batchSize = 1000
341+
342+
debug('deleting expired and retained events (retentionLimit: %d, now: %d, batchSize: %d)', retentionLimit, now, batchSize)
343+
344+
const candidates = this.masterDbClient('events')
345+
.select('event_id')
346+
.where(function () {
347+
this.where('expires_at', '<', now)
348+
.orWhereNotNull('deleted_at')
349+
.orWhere('event_created_at', '<', retentionLimit)
350+
})
351+
.modify((query) => {
352+
if (Array.isArray(options?.kindWhitelist) && options.kindWhitelist.length > 0) {
353+
query.whereNot((builder) => {
354+
options.kindWhitelist.forEach((kindOrRange) => {
355+
if (Array.isArray(kindOrRange)) {
356+
builder.orWhereBetween('event_kind', kindOrRange)
357+
} else {
358+
builder.orWhere('event_kind', kindOrRange)
359+
}
360+
})
361+
})
362+
}
363+
364+
if (Array.isArray(options?.pubkeyWhitelist) && options.pubkeyWhitelist.length > 0) {
365+
query.whereNotIn('event_pubkey', map(toBuffer)(options.pubkeyWhitelist))
366+
}
367+
})
368+
.limit(batchSize)
369+
370+
const query = this.masterDbClient('events')
371+
.whereIn('event_id', candidates)
372+
.del(['deleted_at', 'expires_at', 'event_created_at'])
373+
374+
const mapToCounts = (deletedRows: Pick<DBEvent, 'deleted_at' | 'expires_at' | 'event_created_at'>[]): EventPurgeCounts => deletedRows.reduce((counts, row) => {
375+
if (row.deleted_at) {
376+
counts.deleted += 1
377+
} else if (typeof row.expires_at === 'number' && row.expires_at < now) {
378+
counts.expired += 1
379+
} else if (row.event_created_at < retentionLimit) {
380+
counts.retained += 1
381+
}
382+
383+
return counts
384+
}, {
385+
deleted: 0,
386+
expired: 0,
387+
retained: 0,
388+
})
389+
390+
return {
391+
then: <T1, T2>(
392+
onfulfilled: (value: EventPurgeCounts) => T1 | PromiseLike<T1>,
393+
onrejected: (reason: any) => T2 | PromiseLike<T2>
394+
) => query
395+
.then((rows: any) => mapToCounts(rows))
396+
.then(onfulfilled, onrejected),
397+
catch: <T>(onrejected: (reason: any) => T | PromiseLike<T>) => query.catch(onrejected),
398+
toString: (): string => query.toString(),
399+
} as Promise<EventPurgeCounts>
400+
}
325401
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { createLogger } from '../factories/logger-factory'
2+
import { IEventRepository } from '../@types/repositories'
3+
import { IMaintenanceService } from '../@types/services'
4+
import { Settings } from '../@types/settings'
5+
6+
const debug = createLogger('maintenance-service')
7+
8+
export class MaintenanceService implements IMaintenanceService {
9+
public constructor(
10+
private readonly eventRepository: IEventRepository,
11+
private readonly settings: () => Settings,
12+
) {}
13+
14+
public async clearOldEvents(): Promise<void> {
15+
const currentSettings = this.settings()
16+
const retention = currentSettings.limits?.event?.retention
17+
const maxDays = retention?.maxDays
18+
19+
if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) {
20+
return
21+
}
22+
23+
try {
24+
debug('purging deleted, expired and old events')
25+
const deletedCounts = await this.eventRepository.deleteExpiredAndRetained({
26+
maxDays,
27+
kindWhitelist: retention?.kind?.whitelist,
28+
pubkeyWhitelist: retention?.pubkey?.whitelist,
29+
})
30+
const totalDeleted = deletedCounts.deleted + deletedCounts.expired + deletedCounts.retained
31+
if (totalDeleted > 0) {
32+
console.info(`[Maintenance] Deleted events: deleted=${deletedCounts.deleted}, expired=${deletedCounts.expired}, retained=${deletedCounts.retained}.`)
33+
}
34+
} catch (error) {
35+
console.error('Unable to purge events. Reason:', error)
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)