Skip to content

Commit cbdf967

Browse files
feat: Import .jsonl file with events into the events table (#414)
* feat: add event import service with batch persistence * feat: add CLI entrypoint for jsonl import * test: add unit tests for event import service * feat: add npm run import script * docs: add jsonl import usage to README * refactor: reuse EventRepository instead of raw queries in import service * fix: enrich event metadata before persistence in import service * fix: remove double onProgress call at end of import
1 parent e918cae commit cbdf967

7 files changed

Lines changed: 683 additions & 22 deletions

File tree

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,29 @@ Print the Tor hostname:
230230
./scripts/print_tor_hostname
231231
```
232232
233+
### Importing events from JSON Lines
234+
235+
You can import NIP-01 events from a `.jsonl` file directly into the relay database.
236+
237+
Basic import:
238+
```
239+
npm run import -- ./events.jsonl
240+
```
241+
242+
Set a custom batch size (default: `1000`):
243+
```
244+
npm run import -- ./events.jsonl --batch-size 500
245+
```
246+
247+
The importer:
248+
249+
- Processes the file line-by-line to keep memory usage bounded.
250+
- Validates NIP-01 schema, event id hash, and Schnorr signature before insertion.
251+
- Inserts in database transactions per batch.
252+
- Skips duplicates without failing the whole import.
253+
- Prints progress in the format:
254+
`[Processed: 50,000 | Inserted: 45,000 | Skipped: 5,000 | Errors: 0]`
255+
233256
### Running as a Service
234257
235258
By default this server will run continuously until you stop it with Ctrl+C or until the system restarts.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"lint": "eslint --ext .ts ./src ./test",
3232
"lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test",
3333
"lint:fix": "npm run lint -- --fix",
34+
"import": "node -r ts-node/register src/import-events.ts",
3435
"db:migrate": "knex migrate:latest",
3536
"db:migrate:rollback": "knex migrate:rollback",
3637
"db:seed": "knex seed:run",

src/@types/repositories.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ export interface IQueryResult<T> extends Pick<Promise<T>, keyof Promise<T> & Exp
1414

1515
export interface IEventRepository {
1616
create(event: Event): Promise<number>
17+
createMany(events: Event[]): Promise<number>
1718
upsert(event: Event): Promise<number>
19+
upsertMany(events: Event[]): Promise<number>
1820
findByFilters(filters: SubscriptionFilter[]): IQueryResult<DBEvent[]>
1921
deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise<number>
2022
deleteByPubkeyExceptKinds(pubkey: Pubkey, excludedKinds: number[]): Promise<number>

src/import-events.ts

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import { extname, resolve } from 'path'
2+
3+
import fs from 'fs'
4+
5+
import dotenv from 'dotenv'
6+
7+
dotenv.config()
8+
9+
import {
10+
createEventBatchPersister,
11+
EventImportLineError,
12+
EventImportService,
13+
EventImportStats,
14+
} from './services/event-import-service'
15+
import { EventRepository } from './repositories/event-repository'
16+
import { getMasterDbClient } from './database/client'
17+
18+
interface CliOptions {
19+
batchSize: number
20+
filePath: string
21+
showHelp: boolean
22+
}
23+
24+
const DEFAULT_BATCH_SIZE = 1000
25+
const MAX_ERROR_LOGS = 20
26+
27+
const formatNumber = (value: number): string => value.toLocaleString('en-US')
28+
29+
const formatProgress = (stats: EventImportStats): string => {
30+
return `[Processed: ${formatNumber(stats.processed)} | Inserted: ${formatNumber(stats.inserted)} | Skipped: ${formatNumber(stats.skipped)} | Errors: ${formatNumber(stats.errors)}]`
31+
}
32+
33+
const printUsage = (): void => {
34+
console.log('Usage: npm run import -- <file.jsonl> [--batch-size <number>]')
35+
console.log('Example: npm run import -- ./events.jsonl --batch-size 1000')
36+
}
37+
38+
const parseBatchSize = (value: string): number => {
39+
const parsedValue = Number(value)
40+
41+
if (!Number.isInteger(parsedValue) || parsedValue <= 0) {
42+
throw new Error(`Invalid --batch-size value: ${value}`)
43+
}
44+
45+
return parsedValue
46+
}
47+
48+
const parseCliArgs = (args: string[]): CliOptions => {
49+
let batchSize = DEFAULT_BATCH_SIZE
50+
let filePath: string | undefined
51+
52+
if (args.includes('--help') || args.includes('-h')) {
53+
return {
54+
batchSize,
55+
filePath: '',
56+
showHelp: true,
57+
}
58+
}
59+
60+
for (let i = 0; i < args.length; i++) {
61+
const arg = args[i]
62+
63+
if (arg === '--batch-size') {
64+
const nextArg = args[i + 1]
65+
if (typeof nextArg !== 'string') {
66+
throw new Error('Missing value for --batch-size')
67+
}
68+
69+
batchSize = parseBatchSize(nextArg)
70+
i += 1
71+
continue
72+
}
73+
74+
if (arg.startsWith('--batch-size=')) {
75+
batchSize = parseBatchSize(arg.split('=', 2)[1])
76+
continue
77+
}
78+
79+
if (arg.startsWith('--')) {
80+
throw new Error(`Unknown option: ${arg}`)
81+
}
82+
83+
if (filePath) {
84+
throw new Error(`Unexpected extra argument: ${arg}`)
85+
}
86+
87+
filePath = arg
88+
}
89+
90+
if (!filePath) {
91+
throw new Error('Missing path to .jsonl file')
92+
}
93+
94+
return {
95+
batchSize,
96+
filePath,
97+
showHelp: false,
98+
}
99+
}
100+
101+
const ensureValidInputFile = (filePath: string): string => {
102+
const absolutePath = resolve(process.cwd(), filePath)
103+
104+
if (extname(absolutePath).toLowerCase() !== '.jsonl') {
105+
throw new Error('Input file must have a .jsonl extension')
106+
}
107+
108+
if (!fs.existsSync(absolutePath)) {
109+
throw new Error(`Input file does not exist: ${absolutePath}`)
110+
}
111+
112+
const stats = fs.statSync(absolutePath)
113+
if (!stats.isFile()) {
114+
throw new Error(`Input path is not a file: ${absolutePath}`)
115+
}
116+
117+
return absolutePath
118+
}
119+
120+
const run = async (): Promise<void> => {
121+
const options = parseCliArgs(process.argv.slice(2))
122+
123+
if (options.showHelp) {
124+
printUsage()
125+
return
126+
}
127+
128+
const absoluteFilePath = ensureValidInputFile(options.filePath)
129+
130+
const dbClient = getMasterDbClient()
131+
const eventRepository = new EventRepository(dbClient, dbClient)
132+
const importer = new EventImportService(createEventBatchPersister(eventRepository))
133+
134+
let loggedErrors = 0
135+
let suppressedErrors = 0
136+
137+
const onLineError = ({ lineNumber, reason }: EventImportLineError) => {
138+
if (loggedErrors < MAX_ERROR_LOGS) {
139+
console.warn(`[line ${lineNumber}] ${reason}`)
140+
loggedErrors += 1
141+
return
142+
}
143+
144+
suppressedErrors += 1
145+
}
146+
147+
const onProgress = (stats: EventImportStats) => {
148+
console.log(formatProgress(stats))
149+
}
150+
151+
const startedAt = Date.now()
152+
153+
try {
154+
const stats = await importer.importFromJsonl(absoluteFilePath, {
155+
batchSize: options.batchSize,
156+
onLineError,
157+
onProgress,
158+
})
159+
160+
if (suppressedErrors > 0) {
161+
console.warn(`Suppressed ${formatNumber(suppressedErrors)} additional line errors`)
162+
}
163+
164+
const elapsedSeconds = ((Date.now() - startedAt) / 1000).toFixed(2)
165+
166+
console.log(`Import completed in ${elapsedSeconds}s`)
167+
console.log(formatProgress(stats))
168+
} finally {
169+
await dbClient.destroy()
170+
}
171+
}
172+
173+
if (require.main === module) {
174+
run().catch((error: unknown) => {
175+
if (error instanceof Error) {
176+
console.error(`Import failed: ${error.message}`)
177+
} else {
178+
console.error('Import failed with unknown error')
179+
}
180+
181+
process.exit(1)
182+
})
183+
}

src/repositories/event-repository.ts

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,22 @@ export class EventRepository implements IEventRepository {
170170
return this.insert(event).then(prop('rowCount') as () => number, () => 0)
171171
}
172172

173-
private insert(event: Event) {
174-
debug('inserting event: %o', event)
175-
const row = applySpec({
173+
public async createMany(events: Event[]): Promise<number> {
174+
if (!events.length) {
175+
return 0
176+
}
177+
178+
const rows = events.map((event) => this.toInsertRow(event))
179+
180+
return this.masterDbClient('events')
181+
.insert(rows)
182+
.onConflict()
183+
.ignore()
184+
.then(prop('rowCount') as () => number, () => 0)
185+
}
186+
187+
private toInsertRow(event: Event) {
188+
return applySpec({
176189
event_id: pipe(prop('id'), toBuffer),
177190
event_pubkey: pipe(prop('pubkey'), toBuffer),
178191
event_created_at: prop('created_at'),
@@ -187,6 +200,11 @@ export class EventRepository implements IEventRepository {
187200
always(null),
188201
),
189202
})(event)
203+
}
204+
205+
private insert(event: Event) {
206+
debug('inserting event: %o', event)
207+
const row = this.toInsertRow(event)
190208

191209
return this.masterDbClient('events')
192210
.insert(row)
@@ -197,9 +215,50 @@ export class EventRepository implements IEventRepository {
197215
public upsert(event: Event): Promise<number> {
198216
debug('upserting event: %o', event)
199217

218+
const row = this.toUpsertRow(event)
219+
220+
const query = this.masterDbClient('events')
221+
.insert(row)
222+
// NIP-16: Replaceable Events
223+
// NIP-33: Parameterized Replaceable Events
224+
.onConflict(
225+
this.masterDbClient.raw(
226+
'(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)'
227+
)
228+
)
229+
.merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row))
230+
.where('events.event_created_at', '<', row.event_created_at)
231+
232+
return {
233+
then: <T1, T2>(onfulfilled: (value: number) => T1 | PromiseLike<T1>, onrejected: (reason: any) => T2 | PromiseLike<T2>) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected),
234+
catch: <T>(onrejected: (reason: any) => T | PromiseLike<T>) => query.catch(onrejected),
235+
toString: (): string => query.toString(),
236+
} as Promise<number>
237+
}
238+
239+
public async upsertMany(events: Event[]): Promise<number> {
240+
if (!events.length) {
241+
return 0
242+
}
243+
244+
const rows = events.map((event) => this.toUpsertRow(event))
245+
246+
return this.masterDbClient('events')
247+
.insert(rows)
248+
.onConflict(
249+
this.masterDbClient.raw(
250+
'(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)'
251+
)
252+
)
253+
.merge(['deleted_at', 'event_content', 'event_created_at', 'event_id', 'event_signature', 'event_tags', 'expires_at'])
254+
.whereRaw('"events"."event_created_at" < "excluded"."event_created_at"')
255+
.then(prop('rowCount') as () => number, () => 0)
256+
}
257+
258+
private toUpsertRow(event: Event) {
200259
const toJSON = (input: any) => JSON.stringify(input)
201260

202-
const row = applySpec<DBEvent>({
261+
return applySpec<DBEvent>({
203262
event_id: pipe(prop('id'), toBuffer),
204263
event_pubkey: pipe(prop('pubkey'), toBuffer),
205264
event_created_at: prop('created_at'),
@@ -220,24 +279,6 @@ export class EventRepository implements IEventRepository {
220279
),
221280
deleted_at: always(null),
222281
})(event)
223-
224-
const query = this.masterDbClient('events')
225-
.insert(row)
226-
// NIP-16: Replaceable Events
227-
// NIP-33: Parameterized Replaceable Events
228-
.onConflict(
229-
this.masterDbClient.raw(
230-
'(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)'
231-
)
232-
)
233-
.merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row))
234-
.where('events.event_created_at', '<', row.event_created_at)
235-
236-
return {
237-
then: <T1, T2>(onfulfilled: (value: number) => T1 | PromiseLike<T1>, onrejected: (reason: any) => T2 | PromiseLike<T2>) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected),
238-
catch: <T>(onrejected: (reason: any) => T | PromiseLike<T>) => query.catch(onrejected),
239-
toString: (): string => query.toString(),
240-
} as Promise<number>
241282
}
242283

243284
public deleteByPubkeyAndIds(pubkey: string, eventIdsToDelete: EventId[]): Promise<number> {

0 commit comments

Comments
 (0)