diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..7160664 --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,70 @@ +# Replication Plugin + +Pulls data from an **external** data source (e.g. a Postgres instance on +Supabase) into the **internal** Durable Object SQLite database so a StarbaseDB +instance can serve as a close-to-edge replica that can be queried instead of +querying the upstream source directly. + +The plugin uses a **pull** mechanism (rather than per-provider push), which is a +better global solution as discussed in +[#72](https://github.com/outerbase/starbasedb/issues/72). + +## How it works + +For each configured table the plugin: + +1. Reads a bounded, ordered batch of rows from the external source using the + existing `executeQuery` operation pointed at the external connection. +2. Auto-creates the target table in the internal SQLite store (based on the + columns returned) if it does not already exist. +3. Upserts the rows into the internal table keyed on a primary key column, so + re-running replication is idempotent. +4. Persists a checkpoint (the last seen value of the configured `cursorColumn`) + in the internal `tmp_replication_checkpoints` table. On subsequent runs only + rows with a cursor value greater than the checkpoint are fetched, enabling + append-only incremental polling. Because the checkpoint lives in SQLite it + survives Durable Object hibernation. + +Each table is replicated independently — a failure replicating one table does +not abort the others (fail-open), and the error is recorded in the checkpoint +row for that table. + +## Configuration + +```ts +import { CronPlugin } from '../plugins/cron' +import { ReplicationPlugin } from '../plugins/replication' + +const cronPlugin = new CronPlugin() + +const replicationPlugin = new ReplicationPlugin({ + cron: cronPlugin, // optional — enables scheduled runs + config: { + // Optional cron expression; requires a CronPlugin instance + a matching + // entry in tmp_cron_tasks whose `name` is "starbasedb:replication". + schedule: '*/5 * * * *', + defaultBatchSize: 1000, + tables: [ + { + sourceTable: 'users', // table on the external source + targetTable: 'users', // internal table (defaults to sourceTable) + cursorColumn: 'id', // append-only polling column (e.g. id / created_at) + primaryKeyColumn: 'id', // upsert key (defaults to cursorColumn) + batchSize: 1000, // rows pulled per run (defaults to defaultBatchSize) + }, + ], + }, +}) +``` + +Add `replicationPlugin` to the `plugins` array passed to `StarbaseDB`. + +## Admin endpoints + +Both endpoints require an admin role. + +- `POST /replication/run` — trigger a replication run for all configured tables. + Pass `?table=` to replicate a single table. Returns the rows + replicated and the advanced cursor value per table. +- `GET /replication/status` — return the current checkpoints (last value, total + rows replicated, last run timestamp, last error) for every replicated table. diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..8211042 --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,143 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { ReplicationPlugin } from './index' +import { DataSource } from '../../src/types' + +// Mock the external read path so the plugin can be exercised without a real +// external database connection. +const externalRows = vi.fn() +vi.mock('../../src/operation', () => ({ + executeQuery: (opts: any) => externalRows(opts), +})) + +let plugin: ReplicationPlugin +let mockDataSource: DataSource +let rpcExecuteQuery: ReturnType + +beforeEach(() => { + vi.clearAllMocks() + + rpcExecuteQuery = vi.fn().mockResolvedValue([]) + mockDataSource = { + rpc: { executeQuery: rpcExecuteQuery }, + source: 'internal', + external: { + dialect: 'postgresql', + host: 'localhost', + port: 5432, + user: 'u', + password: 'p', + database: 'd', + }, + } as unknown as DataSource + + plugin = new ReplicationPlugin({ + config: { + tables: [ + { + sourceTable: 'users', + cursorColumn: 'id', + }, + ], + }, + }) + // Inject the data source / config the way the register() middleware would. + plugin['dataSource'] = mockDataSource + plugin['config'] = { role: 'admin' } +}) + +describe('ReplicationPlugin - initialization', () => { + it('should be a StarbasePlugin with the expected name and prefix', () => { + expect(plugin).toBeInstanceOf(ReplicationPlugin) + expect(plugin.name).toBe('starbasedb:replication') + expect(plugin.pathPrefix).toBe('/replication') + }) + + it('should create the checkpoint table on init', async () => { + await plugin['init']() + expect(rpcExecuteQuery).toHaveBeenCalledWith({ + sql: expect.stringContaining( + 'CREATE TABLE IF NOT EXISTS tmp_replication_checkpoints' + ), + params: [], + }) + }) +}) + +describe('ReplicationPlugin - replicate()', () => { + it('throws when no external source is configured', async () => { + plugin['dataSource'] = { + ...mockDataSource, + external: undefined, + } as unknown as DataSource + + await expect(plugin.replicate()).rejects.toThrow( + /No external data source/ + ) + }) + + it('reads from the external source ordered by the cursor column and upserts rows internally', async () => { + // No prior checkpoint -> first run with no WHERE filter. + rpcExecuteQuery.mockResolvedValueOnce([]) // GET_CHECKPOINT + externalRows.mockResolvedValueOnce([ + { id: 1, name: 'a' }, + { id: 2, name: 'b' }, + ]) + + const results = await plugin.replicate() + + // External read happened with ORDER BY and no WHERE on the first run. + expect(externalRows).toHaveBeenCalledTimes(1) + const externalCall = externalRows.mock.calls[0][0] + expect(externalCall.dataSource.source).toBe('external') + expect(externalCall.sql).toContain('ORDER BY "id" ASC') + expect(externalCall.sql).not.toContain('WHERE') + + // Two upserts into the internal table happened. + const upserts = rpcExecuteQuery.mock.calls.filter((c) => + String(c[0].sql).startsWith('INSERT INTO "users"') + ) + expect(upserts).toHaveLength(2) + expect(upserts[0][0].sql).toContain('ON CONFLICT("id") DO UPDATE SET') + + // Result reports rows replicated and the advanced cursor value. + expect(results).toEqual([ + { table: 'users', rowsReplicated: 2, lastValue: '2' }, + ]) + }) + + it('uses the stored checkpoint to fetch only newer rows', async () => { + rpcExecuteQuery.mockResolvedValueOnce([{ last_value: '5' }]) // GET_CHECKPOINT + externalRows.mockResolvedValueOnce([{ id: 6, name: 'c' }]) + + await plugin.replicate() + + const externalCall = externalRows.mock.calls[0][0] + expect(externalCall.sql).toContain('WHERE "id" > ?') + expect(externalCall.params).toEqual(['5']) + }) + + it('is fail-open: one failing table does not abort the others', async () => { + plugin = new ReplicationPlugin({ + config: { + tables: [ + { sourceTable: 'good', cursorColumn: 'id' }, + { sourceTable: 'bad', cursorColumn: 'id' }, + ], + }, + }) + plugin['dataSource'] = mockDataSource + plugin['config'] = { role: 'admin' } + + // good -> checkpoint empty, then rows; bad -> checkpoint empty, then throws + rpcExecuteQuery.mockResolvedValueOnce([]) // good GET_CHECKPOINT + externalRows.mockResolvedValueOnce([{ id: 1 }]) // good rows + rpcExecuteQuery.mockResolvedValue([]) // subsequent internal writes + externalRows.mockRejectedValueOnce(new Error('boom')) // bad rows + + const results = await plugin.replicate() + + expect(results).toHaveLength(2) + expect(results[0]).toMatchObject({ table: 'good', rowsReplicated: 1 }) + expect(results[1]).toMatchObject({ table: 'bad', error: 'boom' }) + }) +}) diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..fff3474 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,428 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { executeQuery } from '../../src/operation' +import { DataSource, QueryResult } from '../../src/types' +import { createResponse } from '../../src/utils' +import { CronPlugin } from '../cron' + +/** + * Configuration describing a single external -> internal table replication. + */ +export interface ReplicationTableConfig { + // Name of the table on the external data source to pull rows from. + sourceTable: string + // Name of the table in the internal DO SQLite to write rows into. + // Defaults to `sourceTable` when omitted. + targetTable?: string + // Column used to track incremental progress (e.g. `id` or `created_at`). + // Rows are pulled in ascending order of this column and only rows greater + // than the last replicated value are fetched, enabling append-only polling. + cursorColumn: string + // Column used as the primary key for upserts into the internal table. + // Defaults to `cursorColumn` when omitted. + primaryKeyColumn?: string + // Maximum number of rows to pull per run for this table. + batchSize?: number +} + +export interface ReplicationPluginConfig { + // Tables to replicate from the external source into the internal source. + tables: ReplicationTableConfig[] + // Optional cron expression. When provided (and a CronPlugin instance is + // passed) replication runs automatically on the given interval. + schedule?: string + // Default batch size applied to tables that do not specify their own. + defaultBatchSize?: number +} + +const DEFAULT_BATCH_SIZE = 1000 + +const SQL_QUERIES = { + CREATE_CHECKPOINT_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replication_checkpoints ( + target_table TEXT NOT NULL PRIMARY KEY, + cursor_column TEXT NOT NULL, + last_value TEXT, + rows_replicated INTEGER NOT NULL DEFAULT 0, + last_run_at TEXT, + last_error TEXT + ) + `, + GET_CHECKPOINT: ` + SELECT last_value FROM tmp_replication_checkpoints WHERE target_table = ? + `, + GET_ALL_CHECKPOINTS: ` + SELECT target_table, cursor_column, last_value, rows_replicated, last_run_at, last_error + FROM tmp_replication_checkpoints + `, + UPSERT_CHECKPOINT: ` + INSERT INTO tmp_replication_checkpoints + (target_table, cursor_column, last_value, rows_replicated, last_run_at, last_error) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(target_table) DO UPDATE SET + cursor_column = excluded.cursor_column, + last_value = excluded.last_value, + rows_replicated = tmp_replication_checkpoints.rows_replicated + excluded.rows_replicated, + last_run_at = excluded.last_run_at, + last_error = excluded.last_error + `, +} + +export interface ReplicationRunResult { + table: string + rowsReplicated: number + lastValue: string | null + error?: string +} + +/** + * ReplicationPlugin pulls rows from an external data source (e.g. a Postgres + * instance on Supabase) into the internal Durable Object SQLite database so the + * StarbaseDB instance can serve as a close-to-edge replica. + * + * The pull is incremental and append-only: for each configured table a cursor + * column is tracked and only rows with a cursor value greater than the last + * replicated value are fetched on subsequent runs. Progress is persisted in the + * internal `tmp_replication_checkpoints` table so it survives DO hibernation. + * + * Replication can be triggered manually via the admin HTTP endpoints or, when a + * CronPlugin instance and a `schedule` are provided, automatically on an + * interval. + */ +export class ReplicationPlugin extends StarbasePlugin { + public pathPrefix: string = '/replication' + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + private replicationConfig: ReplicationPluginConfig + private cron?: CronPlugin + private initialized = false + + constructor(opts: { config: ReplicationPluginConfig; cron?: CronPlugin }) { + super('starbasedb:replication', { + requiresAuth: true, + }) + this.replicationConfig = opts.config + this.cron = opts.cron + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + this.config = c?.get('config') + await this.init() + await next() + }) + + // Trigger a replication run for all configured tables (or a single + // table when `?table=` is provided). + app.post(`${this.pathPrefix}/run`, async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + const onlyTable = c.req.query('table') + const results = await this.replicate(onlyTable) + return createResponse({ results }, undefined, 200) + }) + + // Return the current replication checkpoints / status. + app.get(`${this.pathPrefix}/status`, async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + if (!this.dataSource) { + return createResponse(undefined, 'Data source not found', 500) + } + + const checkpoints = (await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.GET_ALL_CHECKPOINTS, + params: [], + })) as QueryResult[] + + return createResponse({ checkpoints }, undefined, 200) + }) + } + + private async init() { + if (this.initialized || !this.dataSource) return + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_CHECKPOINT_TABLE, + params: [], + }) + + // Register an automatic replication run with the CronPlugin when a + // schedule is configured. The cron callback hits our own run logic. + if (this.cron && this.replicationConfig.schedule) { + this.cron.onEvent(async ({ name }) => { + if (name === this.name) { + await this.replicate() + } + }, this.dataSource.executionContext) + } + + this.initialized = true + } + + /** + * Run replication for all configured tables, or just `onlyTable` when set. + * Each table is replicated independently: a failure on one table does not + * abort replication of the others (fail-open). + */ + public async replicate( + onlyTable?: string + ): Promise { + if (!this.dataSource) { + throw new Error('ReplicationPlugin not properly initialized') + } + + if (!this.dataSource.external) { + throw new Error( + 'No external data source configured to replicate from' + ) + } + + const tables = onlyTable + ? this.replicationConfig.tables.filter( + (t) => (t.targetTable ?? t.sourceTable) === onlyTable + ) + : this.replicationConfig.tables + + const results: ReplicationRunResult[] = [] + + for (const table of tables) { + try { + results.push(await this.replicateTable(table)) + } catch (error) { + const message = + error instanceof Error ? error.message : String(error) + console.error( + `Replication failed for table ${table.sourceTable}:`, + error + ) + await this.saveCheckpoint( + table.targetTable ?? table.sourceTable, + table.cursorColumn, + null, + 0, + message + ) + results.push({ + table: table.targetTable ?? table.sourceTable, + rowsReplicated: 0, + lastValue: null, + error: message, + }) + } + } + + return results + } + + private async replicateTable( + table: ReplicationTableConfig + ): Promise { + const targetTable = table.targetTable ?? table.sourceTable + const primaryKey = table.primaryKeyColumn ?? table.cursorColumn + const batchSize = + table.batchSize ?? + this.replicationConfig.defaultBatchSize ?? + DEFAULT_BATCH_SIZE + + const lastValue = await this.getCheckpoint(targetTable) + + // Pull a bounded, ordered batch of rows from the external source that + // are newer than the last replicated cursor value (append-only polling). + const rows = await this.fetchExternalRows( + table.sourceTable, + table.cursorColumn, + lastValue, + batchSize + ) + + if (rows.length === 0) { + await this.saveCheckpoint( + targetTable, + table.cursorColumn, + lastValue, + 0, + null + ) + return { table: targetTable, rowsReplicated: 0, lastValue } + } + + const columns = Object.keys(rows[0]) + await this.ensureTargetTable(targetTable, columns, primaryKey) + + let newCursorValue = lastValue + for (const row of rows) { + await this.upsertRow(targetTable, columns, primaryKey, row) + const cursorValue = row[table.cursorColumn] + if (cursorValue !== undefined && cursorValue !== null) { + newCursorValue = String(cursorValue) + } + } + + await this.saveCheckpoint( + targetTable, + table.cursorColumn, + newCursorValue, + rows.length, + null + ) + + return { + table: targetTable, + rowsReplicated: rows.length, + lastValue: newCursorValue, + } + } + + /** + * Read a batch of rows from the external data source using the existing + * `executeQuery` operation pointed at the external connection. + */ + private async fetchExternalRows( + sourceTable: string, + cursorColumn: string, + lastValue: string | null, + batchSize: number + ): Promise[]> { + const externalDataSource: DataSource = { + ...this.dataSource!, + source: 'external', + } + + const where = + lastValue !== null + ? `WHERE ${quoteIdentifier(cursorColumn)} > ?` + : '' + const params = lastValue !== null ? [lastValue] : [] + + const sql = + `SELECT * FROM ${quoteIdentifier(sourceTable)} ${where} ` + + `ORDER BY ${quoteIdentifier(cursorColumn)} ASC LIMIT ${Number(batchSize)}` + + const result = await executeQuery({ + sql, + params, + isRaw: false, + dataSource: externalDataSource, + config: this.config!, + }) + + return (result as Record[]) ?? [] + } + + private async ensureTargetTable( + targetTable: string, + columns: string[], + primaryKey: string + ) { + const columnDefs = columns + .map((col) => { + const def = quoteIdentifier(col) + return col === primaryKey ? `${def} PRIMARY KEY` : def + }) + .join(', ') + + await this.dataSource!.rpc.executeQuery({ + sql: `CREATE TABLE IF NOT EXISTS ${quoteIdentifier(targetTable)} (${columnDefs})`, + params: [], + }) + } + + private async upsertRow( + targetTable: string, + columns: string[], + primaryKey: string, + row: Record + ) { + const placeholders = columns.map(() => '?').join(', ') + const columnList = columns.map(quoteIdentifier).join(', ') + const updates = columns + .filter((col) => col !== primaryKey) + .map( + (col) => + `${quoteIdentifier(col)} = excluded.${quoteIdentifier(col)}` + ) + .join(', ') + + // Upsert keyed on the primary key so re-running replication is + // idempotent and never produces duplicate rows. + const conflictClause = updates + ? `ON CONFLICT(${quoteIdentifier(primaryKey)}) DO UPDATE SET ${updates}` + : `ON CONFLICT(${quoteIdentifier(primaryKey)}) DO NOTHING` + + const sql = + `INSERT INTO ${quoteIdentifier(targetTable)} (${columnList}) ` + + `VALUES (${placeholders}) ${conflictClause}` + + await this.dataSource!.rpc.executeQuery({ + sql, + params: columns.map((col) => normalizeValue(row[col])), + }) + } + + private async getCheckpoint(targetTable: string): Promise { + const result = (await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.GET_CHECKPOINT, + params: [targetTable], + })) as QueryResult[] + + if (result.length === 0) return null + const lastValue = result[0].last_value + return lastValue !== undefined && lastValue !== null + ? String(lastValue) + : null + } + + private async saveCheckpoint( + targetTable: string, + cursorColumn: string, + lastValue: string | null, + rowsReplicated: number, + error: string | null + ) { + await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_CHECKPOINT, + params: [ + targetTable, + cursorColumn, + lastValue, + rowsReplicated, + new Date().toISOString(), + error, + ], + }) + } +} + +/** + * Quote a SQL identifier (table/column name) to guard against injection via + * configured table/column names. Embedded double quotes are escaped. + */ +function quoteIdentifier(identifier: string): string { + return `"${identifier.replace(/"/g, '""')}"` +} + +/** + * Normalize a value coming from the external source into something the internal + * SQLite store can bind. Objects/arrays are JSON-encoded; everything else is + * passed through. + */ +function normalizeValue(value: unknown): unknown { + if (value === undefined) return null + if ( + value !== null && + typeof value === 'object' && + !(value instanceof Date) + ) { + return JSON.stringify(value) + } + if (value instanceof Date) { + return value.toISOString() + } + return value +} diff --git a/plugins/replication/meta.json b/plugins/replication/meta.json new file mode 100644 index 0000000..972f361 --- /dev/null +++ b/plugins/replication/meta.json @@ -0,0 +1,15 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_replication_checkpoints": "Tracks per-table incremental replication progress (cursor value, row counts, last run, last error)." + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +} diff --git a/src/index.ts b/src/index.ts index 4d08932..8f18605 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { QueryLogPlugin } from '../plugins/query-log' import { StatsPlugin } from '../plugins/stats' import { CronPlugin } from '../plugins/cron' import { InterfacePlugin } from '../plugins/interface' +import { ReplicationPlugin } from '../plugins/replication' export { StarbaseDBDurableObject } from './do' @@ -211,6 +212,17 @@ export default { const interfacePlugin = new InterfacePlugin() + // Pull-based replication from the external data source into the + // internal DO SQLite so the instance can serve as a close-to-edge + // replica. Configure the tables to replicate (and an optional cron + // schedule) below; with no tables configured the plugin is inert. + const replicationPlugin = new ReplicationPlugin({ + cron: cronPlugin, + config: { + tables: [], + }, + }) + const plugins = [ webSocketPlugin, new StudioPlugin({ @@ -224,6 +236,7 @@ export default { new QueryLogPlugin({ ctx }), cdcPlugin, cronPlugin, + replicationPlugin, new StatsPlugin(), interfacePlugin, ] satisfies StarbasePlugin[]