diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..144f3a5 --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,83 @@ +# Replication Plugin + +Pulls data from your configured **external data source** (e.g. a Postgres database on +Supabase) into the **internal Durable Object SQLite** database. This lets a StarbaseDB +instance act as a close-to-edge replica that can be queried directly instead of always +reaching out to the upstream database. + +Replication is **append-only** and **pull-based**: for each table you tell the plugin +which column to track (e.g. `id` or `created_at`), and on every pull it only fetches +rows newer than the highest value it has already seen. The last seen value per table is +stored in a `tmp_replication_checkpoints` table inside the internal database. + +## Configuration + +```ts +import { ReplicationPlugin } from '../plugins/replication' + +const replicationPlugin = new ReplicationPlugin({ + // Which tables to replicate, and the append-only column to track for each. + tables: [ + { name: 'users', trackBy: 'id', schema: 'public' }, + { + name: 'orders', + trackBy: 'created_at', + schema: 'public', + destinationTable: 'orders_replica', + }, + ], + // How often the plugin auto-pulls when requests arrive (seconds). 0 disables + // automatic pulling. Defaults to 300 (5 minutes). + intervalSeconds: 300, + // Maximum rows fetched per table per pull. Defaults to 1000. + batchSize: 1000, +}) + +const plugins = [ + // ... + replicationPlugin, +] satisfies StarbasePlugin[] +``` + +| Option | Required | Default | Description | +| ------------------ | -------- | ------- | ------------------------------------------------------------- | +| `tables` | yes | — | Tables to replicate. Each needs `name` and `trackBy`. | +| `trackBy` | yes | — | Monotonically increasing column used for append-only polling. | +| `schema` | no | — | Schema on the external source (e.g. `public`). | +| `destinationTable` | no | `name` | Internal table name to write into. | +| `intervalSeconds` | no | `300` | Auto-pull cadence. `0` disables automatic pulls. | +| `batchSize` | no | `1000` | Max rows fetched per table per pull. | + +Table, column and schema names are validated as SQL identifiers (letters, numbers and +underscores). Invalid configuration throws a `ReplicationConfigurationError` at startup +so misconfiguration fails loudly instead of at query time. + +## How data is pulled + +Because Cloudflare Workers cannot run free-standing timers, the `intervalSeconds` +cadence is evaluated lazily on incoming requests: when the interval has elapsed since +the last successful pull, the work is kicked off in the background via `waitUntil` so +query latency is unaffected. + +You can also trigger a pull explicitly: + +- **Programmatically:** `await replicationPlugin.pull()` +- **Over HTTP (admin only):** `POST /replication/pull` +- **On a precise schedule:** wire it to the [Cron plugin](../cron/README.md) and call + `pull()` from its `onEvent` callback. + +```ts +cronPlugin.onEvent(async ({ name }) => { + if (name === 'replicate') { + await replicationPlugin.pull() + } +}, ctx) +``` + +## Notes + +- The destination tables must already exist in the internal database (e.g. created via + the Studio plugin or a migration). Rows are written with `INSERT OR REPLACE`, so + re-pulling an already seen row is idempotent rather than a duplicate-key error. +- Checkpoints are stored as JSON so the original value type (numeric `id` vs string + `created_at`) is preserved when comparing against the source on the next pull. diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..fa6cdd4 --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,189 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { ReplicationPlugin } from './index' +import { ReplicationConfigurationError } from './utils' + +// In-memory stand-in for the internal Durable Object SQLite database. It understands +// just enough of the plugin's SQL to track checkpoints and record upserts. +function makeRpc() { + const checkpoints = new Map< + string, + { last_value: string | null; updated_at: number } + >() + const upserts: { sql: string; params: unknown[] }[] = [] + + const executeQuery = vi.fn( + async ({ sql, params = [] }: { sql: string; params?: unknown[] }) => { + if (sql.includes('CREATE TABLE')) { + return [] + } + if (sql.includes('SELECT last_value')) { + const cp = checkpoints.get(params[0] as string) + return cp ? [{ last_value: cp.last_value }] : [] + } + if (sql.includes('MAX(updated_at)')) { + const values = [...checkpoints.values()] + return [ + { + last_pulled_at: values.length + ? Math.max(...values.map((c) => c.updated_at)) + : null, + }, + ] + } + if (sql.includes('INTO tmp_replication_checkpoints')) { + checkpoints.set(params[0] as string, { + last_value: params[1] as string, + updated_at: params[2] as number, + }) + return [] + } + upserts.push({ sql, params }) + return [] + } + ) + + return { rpc: { executeQuery }, checkpoints, upserts } +} + +function makePlugin( + rows: Record[], + readSpy = vi.fn(async () => rows) +) { + const plugin = new ReplicationPlugin({ + tables: [{ name: 'users', trackBy: 'id' }], + intervalSeconds: 0, + readExternal: readSpy, + }) + const store = makeRpc() + ;(plugin as any).dataSource = { + rpc: store.rpc, + source: 'internal', + external: {}, + } + ;(plugin as any).config = { role: 'admin' } + return { plugin, store, readSpy } +} + +describe('ReplicationPlugin - configuration', () => { + it('should reject construction with no tables', () => { + expect(() => new ReplicationPlugin({ tables: [] })).toThrowError( + ReplicationConfigurationError + ) + }) + + it('should reject a non-positive batchSize', () => { + expect( + () => + new ReplicationPlugin({ + tables: [{ name: 'users', trackBy: 'id' }], + batchSize: 0, + }) + ).toThrowError(ReplicationConfigurationError) + }) + + it('should name itself for the registry', () => { + const plugin = new ReplicationPlugin({ + tables: [{ name: 'users', trackBy: 'id' }], + }) + expect(plugin.name).toBe('starbasedb:replication') + expect(plugin.pathPrefix).toBe('/replication') + }) +}) + +describe('ReplicationPlugin - pull', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('should seed the full table on the first pull (no checkpoint)', async () => { + const rows = [ + { id: 1, name: 'Ada' }, + { id: 2, name: 'Linus' }, + ] + const { plugin, store, readSpy } = makePlugin(rows) + + const result = await plugin.pull() + + // First pull must not filter — it seeds everything. + const [sql] = readSpy.mock.calls[0] + expect(sql).not.toContain('WHERE') + + expect(store.upserts).toHaveLength(2) + expect(store.upserts[0].sql).toContain('INSERT OR REPLACE INTO users') + expect(result).toEqual([{ table: 'users', rowsReplicated: 2 }]) + + // Checkpoint advances to the highest tracked value, type-preserved as JSON. + expect(store.checkpoints.get('users')?.last_value).toBe( + JSON.stringify(2) + ) + }) + + it('should only fetch newer rows once a checkpoint exists', async () => { + const { plugin, store, readSpy } = makePlugin([{ id: 1 }]) + await plugin.pull() + + // Subsequent pull should pass the stored checkpoint as a typed param. + readSpy.mockResolvedValueOnce([{ id: 2 }]) + await plugin.pull() + + const [sql, params] = readSpy.mock.calls[1] + expect(sql).toContain('WHERE id > ?') + expect(params).toEqual([1]) + expect(store.checkpoints.get('users')?.last_value).toBe( + JSON.stringify(2) + ) + }) + + it('should not advance the checkpoint or write when no new rows exist', async () => { + const { plugin, store } = makePlugin([]) + + const result = await plugin.pull() + + expect(store.upserts).toHaveLength(0) + expect(store.checkpoints.has('users')).toBe(false) + expect(result).toEqual([{ table: 'users', rowsReplicated: 0 }]) + }) + + it('should throw when used before initialization', async () => { + const plugin = new ReplicationPlugin({ + tables: [{ name: 'users', trackBy: 'id' }], + }) + await expect(plugin.pull()).rejects.toThrowError( + 'ReplicationPlugin not properly initialized' + ) + }) + + it('should replicate multiple tables independently', async () => { + const plugin = new ReplicationPlugin({ + tables: [ + { name: 'users', trackBy: 'id' }, + { name: 'orders', trackBy: 'created_at' }, + ], + intervalSeconds: 0, + readExternal: vi + .fn() + .mockResolvedValueOnce([{ id: 7 }]) + .mockResolvedValueOnce([ + { created_at: '2024-05-01' }, + { created_at: '2024-06-01' }, + ]), + }) + const store = makeRpc() + ;(plugin as any).dataSource = { + rpc: store.rpc, + source: 'internal', + external: {}, + } + ;(plugin as any).config = { role: 'admin' } + + const result = await plugin.pull() + + expect(result).toEqual([ + { table: 'users', rowsReplicated: 1 }, + { table: 'orders', rowsReplicated: 2 }, + ]) + expect(store.checkpoints.get('orders')?.last_value).toBe( + JSON.stringify('2024-06-01') + ) + }) +}) diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..1f48674 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,301 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { executeQuery } from '../../src/operation' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource } from '../../src/types' +import { createResponse } from '../../src/utils' +import { + assertPositiveInteger, + buildSelectQuery, + buildUpsertQuery, + nextCheckpointValue, + normalizeTables, + NormalizedReplicationTable, + ReplicationConfigurationError, + ReplicationTable, +} from './utils' + +const SQL_QUERIES = { + CREATE_CHECKPOINT_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replication_checkpoints ( + table_name TEXT NOT NULL PRIMARY KEY, + last_value TEXT, + updated_at INTEGER + ) + `, + GET_CHECKPOINT: ` + SELECT last_value FROM tmp_replication_checkpoints WHERE table_name = ? + `, + SET_CHECKPOINT: ` + INSERT OR REPLACE INTO tmp_replication_checkpoints (table_name, last_value, updated_at) + VALUES (?, ?, ?) + `, + LAST_PULLED_AT: ` + SELECT MAX(updated_at) AS last_pulled_at FROM tmp_replication_checkpoints + `, +} + +const DEFAULT_INTERVAL_SECONDS = 300 +const DEFAULT_BATCH_SIZE = 1000 + +/** Outcome of replicating a single table during a pull. */ +export interface ReplicationResult { + table: string + rowsReplicated: number +} + +/** A function that reads rows from the external data source. Injectable for testing. */ +export type ExternalReader = ( + sql: string, + params: unknown[] +) => Promise[]> + +export interface ReplicationPluginOptions { + /** Tables to pull from the external source into the internal DO SQLite. */ + tables: ReplicationTable[] + /** + * How often (in seconds) the plugin will automatically pull when requests come + * in. Set to `0` to disable automatic pulling and only pull on demand via + * `pull()` or the `/replication/pull` endpoint. Defaults to 300 (5 minutes). + */ + intervalSeconds?: number + /** Maximum number of rows fetched per table per pull. Defaults to 1000. */ + batchSize?: number + /** + * Optional override for reading from the external source. Defaults to querying + * the configured external data source. Primarily useful for testing or custom + * source connectors. + */ + readExternal?: ExternalReader +} + +/** + * Pulls data from an external data source into the internal Durable Object SQLite + * database so the instance can serve as a close-to-edge replica. Replication is + * append-only: each table tracks the highest value seen for a user-defined column + * (e.g. `id` or `created_at`) and only fetches newer rows on subsequent pulls. + */ +export class ReplicationPlugin extends StarbasePlugin { + public pathPrefix: string = '/replication' + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + private executionContext?: ExecutionContext + private readonly tables: NormalizedReplicationTable[] + private readonly intervalMs: number + private readonly batchSize: number + private readonly externalReader?: ExternalReader + private isPulling = false + + constructor(options: ReplicationPluginOptions) { + super('starbasedb:replication', { + requiresAuth: true, + }) + + this.tables = normalizeTables(options.tables) + + const intervalSeconds = + options.intervalSeconds ?? DEFAULT_INTERVAL_SECONDS + if (intervalSeconds !== 0) { + assertPositiveInteger(intervalSeconds, 'intervalSeconds') + } + this.intervalMs = intervalSeconds * 1000 + + this.batchSize = assertPositiveInteger( + options.batchSize ?? DEFAULT_BATCH_SIZE, + 'batchSize' + ) + + this.externalReader = options.readExternal + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + this.config = c?.get('config') + // `executionCtx` getter throws when no ExecutionContext is bound (e.g. + // outside a Workers fetch handler), so access it defensively. + try { + this.executionContext = c?.executionCtx + } catch { + this.executionContext = undefined + } + await this.init() + await this.maybeAutoPull() + await next() + }) + + // Manually trigger a replication pull. Restricted to admin users. + app.post(`${this.pathPrefix}/pull`, async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + const replicated = await this.pull() + return createResponse({ success: true, replicated }, undefined, 200) + }) + } + + private async init() { + if (!this.dataSource) return + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_CHECKPOINT_TABLE, + params: [], + }) + } + + /** + * Trigger a pull in the background when the configured interval has elapsed since + * the last successful pull. Cloudflare Workers cannot run free-standing timers, so + * the interval is evaluated lazily on incoming requests and the work is detached + * via `waitUntil` so query latency is unaffected. + */ + private async maybeAutoPull() { + if (!this.dataSource || this.intervalMs === 0 || this.isPulling) { + return + } + + const result = (await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.LAST_PULLED_AT, + params: [], + })) as { last_pulled_at: number | null }[] + + const lastPulledAt = result?.[0]?.last_pulled_at ?? 0 + if (Date.now() - lastPulledAt < this.intervalMs) { + return + } + + const pullPromise = this.pull().catch((error) => { + console.error('Replication auto-pull failed:', error) + }) + + if (this.executionContext) { + this.executionContext.waitUntil(pullPromise) + } else { + await pullPromise + } + } + + /** + * Pull newer rows for every configured table from the external source into the + * internal database. Returns the number of rows replicated per table. + */ + public async pull(): Promise { + if (!this.dataSource) { + throw new Error('ReplicationPlugin not properly initialized') + } + + if (this.isPulling) { + return [] + } + + this.isPulling = true + try { + const results: ReplicationResult[] = [] + for (const table of this.tables) { + results.push(await this.replicateTable(table)) + } + return results + } finally { + this.isPulling = false + } + } + + private async replicateTable( + table: NormalizedReplicationTable + ): Promise { + const lastValue = await this.getCheckpoint(table.destinationTable) + const { sql, params } = buildSelectQuery( + table, + lastValue, + this.batchSize + ) + const rows = await this.readExternal(sql, params) + + if (!rows.length) { + return { table: table.destinationTable, rowsReplicated: 0 } + } + + for (const row of rows) { + const upsert = buildUpsertQuery(table.destinationTable, row) + await this.dataSource!.rpc.executeQuery({ + sql: upsert.sql, + params: upsert.params, + }) + } + + await this.setCheckpoint( + table.destinationTable, + nextCheckpointValue(rows, table.trackBy) + ) + + return { table: table.destinationTable, rowsReplicated: rows.length } + } + + private async readExternal( + sql: string, + params: unknown[] + ): Promise[]> { + if (this.externalReader) { + return this.externalReader(sql, params) + } + + if (!this.dataSource?.external || !this.config) { + throw new Error( + 'No external data source configured for replication.' + ) + } + + // Hyperdrive connections are identified by a connection string, every other + // external source is queried through the standard external code path. + const source = + 'connectionString' in this.dataSource.external + ? 'hyperdrive' + : 'external' + + const result = await executeQuery({ + sql, + params, + isRaw: false, + dataSource: { ...this.dataSource, source }, + config: this.config, + }) + + return (result as Record[]) ?? [] + } + + private async getCheckpoint(destinationTable: string): Promise { + const result = (await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.GET_CHECKPOINT, + params: [destinationTable], + })) as { last_value: string | null }[] + + const stored = result?.[0]?.last_value + if (stored === undefined || stored === null) { + return null + } + + // Stored as JSON so the original type (number vs string) is preserved for the + // source comparison — comparing a numeric `id` against a quoted string breaks + // on strongly typed engines like Postgres. + try { + return JSON.parse(stored) + } catch { + return stored + } + } + + private async setCheckpoint(destinationTable: string, value: unknown) { + await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.SET_CHECKPOINT, + params: [ + destinationTable, + JSON.stringify(value ?? null), + Date.now(), + ], + }) + } +} + +// Re-exported so consumers can catch configuration errors and reuse the types. +export { ReplicationConfigurationError } +export type { ReplicationTable } diff --git a/plugins/replication/meta.json b/plugins/replication/meta.json new file mode 100644 index 0000000..8436b1d --- /dev/null +++ b/plugins/replication/meta.json @@ -0,0 +1,15 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_replication_checkpoints": {} + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +} diff --git a/plugins/replication/utils.test.ts b/plugins/replication/utils.test.ts new file mode 100644 index 0000000..a37bbb7 --- /dev/null +++ b/plugins/replication/utils.test.ts @@ -0,0 +1,180 @@ +import { describe, it, expect } from 'vitest' +import { + assertPositiveInteger, + assertValidIdentifier, + buildSelectQuery, + buildUpsertQuery, + nextCheckpointValue, + normalizeTables, + ReplicationConfigurationError, +} from './utils' + +describe('assertValidIdentifier', () => { + it('should accept valid identifiers', () => { + expect(assertValidIdentifier('users', 'table name')).toBe('users') + expect(assertValidIdentifier('_created_at', 'column')).toBe( + '_created_at' + ) + expect(assertValidIdentifier('Table1', 'column')).toBe('Table1') + }) + + it('should reject identifiers that could enable SQL injection', () => { + expect(() => + assertValidIdentifier('users; DROP TABLE x', 'table') + ).toThrowError(ReplicationConfigurationError) + expect(() => assertValidIdentifier('1users', 'table')).toThrowError() + expect(() => assertValidIdentifier('user-name', 'table')).toThrowError() + expect(() => assertValidIdentifier('', 'table')).toThrowError() + expect(() => assertValidIdentifier(undefined, 'table')).toThrowError() + }) +}) + +describe('assertPositiveInteger', () => { + it('should accept positive integers', () => { + expect(assertPositiveInteger(1, 'batchSize')).toBe(1) + expect(assertPositiveInteger(1000, 'batchSize')).toBe(1000) + }) + + it('should reject zero, negatives and non-integers', () => { + expect(() => assertPositiveInteger(0, 'batchSize')).toThrowError() + expect(() => assertPositiveInteger(-5, 'batchSize')).toThrowError() + expect(() => assertPositiveInteger(1.5, 'batchSize')).toThrowError() + expect(() => assertPositiveInteger('10', 'batchSize')).toThrowError() + }) +}) + +describe('normalizeTables', () => { + it('should default destinationTable to the source table name', () => { + const result = normalizeTables([{ name: 'users', trackBy: 'id' }]) + expect(result).toEqual([ + { + name: 'users', + trackBy: 'id', + schema: undefined, + destinationTable: 'users', + }, + ]) + }) + + it('should preserve schema and custom destinationTable', () => { + const result = normalizeTables([ + { + name: 'orders', + trackBy: 'created_at', + schema: 'public', + destinationTable: 'orders_replica', + }, + ]) + expect(result[0].destinationTable).toBe('orders_replica') + expect(result[0].schema).toBe('public') + }) + + it('should throw on an empty table list', () => { + expect(() => normalizeTables([])).toThrowError( + ReplicationConfigurationError + ) + }) + + it('should throw on duplicate destination tables', () => { + expect(() => + normalizeTables([ + { name: 'users', trackBy: 'id' }, + { name: 'other', trackBy: 'id', destinationTable: 'users' }, + ]) + ).toThrowError(/Duplicate destination table/) + }) + + it('should throw on an invalid trackBy column', () => { + expect(() => + normalizeTables([{ name: 'users', trackBy: 'id; DROP' }]) + ).toThrowError(ReplicationConfigurationError) + }) +}) + +describe('buildSelectQuery', () => { + const table = { + name: 'users', + trackBy: 'id', + schema: undefined, + destinationTable: 'users', + } + + it('should omit the WHERE clause on the first pull (no checkpoint)', () => { + const { sql, params } = buildSelectQuery(table, null, 500) + expect(sql).toBe('SELECT * FROM users ORDER BY id ASC LIMIT 500') + expect(params).toEqual([]) + }) + + it('should add an append-only WHERE clause when a checkpoint exists', () => { + const { sql, params } = buildSelectQuery(table, 42, 500) + expect(sql).toBe( + 'SELECT * FROM users WHERE id > ? ORDER BY id ASC LIMIT 500' + ) + expect(params).toEqual([42]) + }) + + it('should qualify the table with its schema when provided', () => { + const { sql } = buildSelectQuery( + { ...table, schema: 'public' }, + null, + 10 + ) + expect(sql).toBe('SELECT * FROM public.users ORDER BY id ASC LIMIT 10') + }) + + it('should treat a checkpoint of 0 as a valid value, not "no checkpoint"', () => { + const { sql, params } = buildSelectQuery(table, 0, 10) + expect(sql).toContain('WHERE id > ?') + expect(params).toEqual([0]) + }) +}) + +describe('buildUpsertQuery', () => { + it('should build an idempotent INSERT OR REPLACE statement', () => { + const { sql, params } = buildUpsertQuery('users', { + id: 1, + name: 'Ada', + }) + expect(sql).toBe( + 'INSERT OR REPLACE INTO users (id, name) VALUES (?, ?)' + ) + expect(params).toEqual([1, 'Ada']) + }) + + it('should reject empty rows', () => { + expect(() => buildUpsertQuery('users', {})).toThrowError( + ReplicationConfigurationError + ) + }) + + it('should reject malicious column names from the source result set', () => { + expect(() => + buildUpsertQuery('users', { 'id) VALUES (1); DROP': 1 }) + ).toThrowError(ReplicationConfigurationError) + }) +}) + +describe('nextCheckpointValue', () => { + it('should return null for an empty batch', () => { + expect(nextCheckpointValue([], 'id')).toBeNull() + }) + + it('should return the trackBy value of the last (highest) row', () => { + const rows = [{ id: 1 }, { id: 2 }, { id: 3 }] + expect(nextCheckpointValue(rows, 'id')).toBe(3) + }) + + it('should support non-numeric tracking columns', () => { + const rows = [ + { created_at: '2024-01-01' }, + { created_at: '2024-02-01' }, + ] + expect(nextCheckpointValue(rows, 'created_at')).toBe('2024-02-01') + }) + + it('should throw when the trackBy column is missing from the rows', () => { + expect(() => nextCheckpointValue([{ name: 'Ada' }], 'id')).toThrowError( + ReplicationConfigurationError + ) + }) +}) diff --git a/plugins/replication/utils.ts b/plugins/replication/utils.ts new file mode 100644 index 0000000..20db6d6 --- /dev/null +++ b/plugins/replication/utils.ts @@ -0,0 +1,177 @@ +/** + * A single table that should be replicated from the external data source into the + * internal Durable Object SQLite database. + */ +export interface ReplicationTable { + /** Table name on the external data source. */ + name: string + /** + * Column used for append-only polling. Must be monotonically increasing so we + * can ask the source for "everything newer than what we already have" (e.g. an + * auto-incrementing `id` or a `created_at` timestamp). + */ + trackBy: string + /** Optional schema on the external source (e.g. `public` for Postgres). */ + schema?: string + /** Optional internal table name. Defaults to `name`. */ + destinationTable?: string +} + +/** A `ReplicationTable` after defaults have been applied and validation has passed. */ +export interface NormalizedReplicationTable { + name: string + trackBy: string + schema?: string + destinationTable: string +} + +/** Thrown when the plugin is configured with invalid options. */ +export class ReplicationConfigurationError extends Error { + constructor(message: string) { + super(message) + this.name = 'ReplicationConfigurationError' + } +} + +// SQL identifiers (table/column/schema names) cannot be passed as bound parameters, +// so we validate them against a strict allow-list to avoid SQL injection when they +// are interpolated into statements. +const IDENTIFIER_PATTERN = /^[A-Za-z_][A-Za-z0-9_]*$/ + +export function assertValidIdentifier(value: unknown, label: string): string { + if (typeof value !== 'string' || !IDENTIFIER_PATTERN.test(value)) { + throw new ReplicationConfigurationError( + `Invalid ${label}: ${JSON.stringify(value)}. Identifiers may only contain letters, numbers and underscores and may not start with a number.` + ) + } + + return value +} + +/** + * Validate the configured tables, apply defaults and reject duplicates. Throws a + * `ReplicationConfigurationError` on any invalid input rather than silently producing + * a broken replication job. + */ +export function normalizeTables( + tables: ReplicationTable[] +): NormalizedReplicationTable[] { + if (!Array.isArray(tables) || tables.length === 0) { + throw new ReplicationConfigurationError( + 'At least one table must be configured for replication.' + ) + } + + const seen = new Set() + + return tables.map((table) => { + assertValidIdentifier(table?.name, 'table name') + assertValidIdentifier(table?.trackBy, 'trackBy column') + + const destinationTable = table.destinationTable ?? table.name + assertValidIdentifier(destinationTable, 'destinationTable') + + if (table.schema !== undefined) { + assertValidIdentifier(table.schema, 'schema') + } + + if (seen.has(destinationTable)) { + throw new ReplicationConfigurationError( + `Duplicate destination table: ${destinationTable}. Each table may only be replicated once.` + ) + } + seen.add(destinationTable) + + return { + name: table.name, + trackBy: table.trackBy, + schema: table.schema, + destinationTable, + } + }) +} + +export function assertPositiveInteger(value: unknown, label: string): number { + if (typeof value !== 'number' || !Number.isInteger(value) || value <= 0) { + throw new ReplicationConfigurationError( + `${label} must be a positive integer, received ${JSON.stringify(value)}.` + ) + } + + return value +} + +/** + * Build the append-only SELECT statement that fetches rows from the external source + * that are newer than the last checkpoint. When no checkpoint exists yet (first pull) + * the WHERE clause is omitted so the full table is seeded. + */ +export function buildSelectQuery( + table: NormalizedReplicationTable, + lastValue: unknown, + batchSize: number +): { sql: string; params: unknown[] } { + const source = table.schema ? `${table.schema}.${table.name}` : table.name + const params: unknown[] = [] + let sql = `SELECT * FROM ${source}` + + if (lastValue !== null && lastValue !== undefined) { + sql += ` WHERE ${table.trackBy} > ?` + params.push(lastValue) + } + + sql += ` ORDER BY ${table.trackBy} ASC LIMIT ${batchSize}` + + return { sql, params } +} + +/** + * Build an `INSERT OR REPLACE` statement for a single row so re-pulling an already + * seen row (e.g. after a partial failure) is idempotent rather than a duplicate-key + * crash. Column names originate from the external result set, so they are validated + * as identifiers too. + */ +export function buildUpsertQuery( + destinationTable: string, + row: Record +): { sql: string; params: unknown[] } { + const columns = Object.keys(row) + + if (columns.length === 0) { + throw new ReplicationConfigurationError( + `Cannot replicate an empty row into ${destinationTable}.` + ) + } + + columns.forEach((column) => assertValidIdentifier(column, 'column name')) + + const placeholders = columns.map(() => '?').join(', ') + const sql = `INSERT OR REPLACE INTO ${destinationTable} (${columns.join(', ')}) VALUES (${placeholders})` + const params = columns.map((column) => row[column]) + + return { sql, params } +} + +/** + * Determine the new checkpoint value from a batch of rows. Rows are fetched ordered + * by `trackBy` ascending, so the last row holds the highest seen value. Returns + * `null` for an empty batch (checkpoint should not advance). + */ +export function nextCheckpointValue( + rows: Record[], + trackBy: string +): unknown { + if (!rows.length) { + return null + } + + const lastRow = rows[rows.length - 1] + + if (!(trackBy in lastRow)) { + throw new ReplicationConfigurationError( + `trackBy column "${trackBy}" was not present in the replicated rows. Make sure it is selected from the source table.` + ) + } + + return lastRow[trackBy] +} diff --git a/src/index.ts b/src/index.ts index 4d08932..73049e4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { QueryLogPlugin } from '../plugins/query-log' import { StatsPlugin } from '../plugins/stats' import { CronPlugin } from '../plugins/cron' import { InterfacePlugin } from '../plugins/interface' +// import { ReplicationPlugin } from '../plugins/replication' export { StarbaseDBDurableObject } from './do' @@ -211,6 +212,15 @@ export default { const interfacePlugin = new InterfacePlugin() + // Pulls data from the configured external data source into the internal + // Durable Object SQLite database so the instance acts as a close-to-edge + // replica. Provide the tables (and the append-only column to track) you + // want replicated and uncomment it in the `plugins` array below. + // const replicationPlugin = new ReplicationPlugin({ + // tables: [{ name: 'users', trackBy: 'id', schema: 'public' }], + // intervalSeconds: 300, + // }) + const plugins = [ webSocketPlugin, new StudioPlugin({ @@ -226,6 +236,7 @@ export default { cronPlugin, new StatsPlugin(), interfacePlugin, + // replicationPlugin, ] satisfies StarbasePlugin[] const starbase = new StarbaseDB({