Skip to content

Commit 6af5c2b

Browse files
committed
refactor(lds)/docs(lds): forward options from subscribeToLogicalDecoding to PgLogicalDecoding and add doc comments to describe them
1 parent 1f252e7 commit 6af5c2b

2 files changed

Lines changed: 12 additions & 19 deletions

File tree

packages/lds/src/index.ts

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
import PgLogicalDecoding, {
33
changeToRecord,
44
changeToPk,
5+
LdsOptions,
56
} from "./pg-logical-decoding";
67
import FatalError from "./fatal-error";
78

8-
export interface Options {
9-
slotName?: string;
10-
tablePattern?: string;
9+
export interface Options extends LdsOptions {
10+
/** Number of milliseconds between polls. Defaults to `200`. */
1111
sleepDuration?: number;
12-
temporary?: boolean;
1312
}
1413

1514
const DROP_STALE_SLOTS_INTERVAL = 15 * 60 * 1000;
@@ -57,18 +56,9 @@ export default async function subscribeToLogicalDecoding(
5756
callback: AnnounceCallback,
5857
options: Options = {}
5958
): Promise<LDSubscription> {
60-
const {
61-
slotName = "postgraphile",
62-
tablePattern = "*.*",
63-
sleepDuration = 200,
64-
temporary = false,
65-
} = options;
59+
const { sleepDuration = 200 } = options;
6660
let lastLsn: string | null = null;
67-
const client = new PgLogicalDecoding(connectionString, {
68-
tablePattern,
69-
slotName,
70-
temporary,
71-
});
61+
const client = new PgLogicalDecoding(connectionString, options);
7262

7363
// We must do this before we create the temporary slot, since errors will release a temporary slot immediately
7464
await client.dropStaleSlots();
@@ -81,7 +71,7 @@ export default async function subscribeToLogicalDecoding(
8171
} else if (e.code === "42710") {
8272
// Slot already exists; ignore.
8373
} else if (e.code === "42602") {
84-
throw new FatalError(`Invalid slot name '${slotName}'?`, e);
74+
throw new FatalError(`Invalid slot name '${client.slotName}'?`, e);
8575
} else {
8676
console.error(
8777
"An unhandled error occurred when attempting to create the replication slot:"
@@ -151,7 +141,7 @@ export default async function subscribeToLogicalDecoding(
151141
}
152142
}
153143
}
154-
if (!temporary && nextStaleCheck < Date.now()) {
144+
if (!client.temporary && nextStaleCheck < Date.now()) {
155145
// Roughly every 15 minutes, drop stale slots.
156146
nextStaleCheck = Date.now() + DROP_STALE_SLOTS_INTERVAL;
157147
client.dropStaleSlots().catch(e => {

packages/lds/src/pg-logical-decoding.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ const toLsnData = ([lsn, data]: [string, string]): Payload => ({
9494
data: JSON.parse(data),
9595
});
9696

97-
interface Options {
97+
export interface LdsOptions {
98+
/** The 'add-tables' wal2json parameter. Defaults to `*.*`. */
9899
tablePattern?: string;
100+
/** The [replication slot](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS) identifier. Defaults to `postgraphile`. */
99101
slotName?: string;
102+
/** Whether `.createSlot()` should create a temporary replication slot which will be limited to the `client` session and gets cleaned up automatically. Defaults to `false`. */
100103
temporary?: boolean;
101104
}
102105

@@ -108,7 +111,7 @@ export default class PgLogicalDecoding extends EventEmitter {
108111
private pool: pg.Pool | null;
109112
private client: Promise<pg.PoolClient> | null;
110113

111-
constructor(connectionString: string, options?: Options) {
114+
constructor(connectionString: string, options?: LdsOptions) {
112115
super();
113116
this.connectionString = connectionString;
114117
const {

0 commit comments

Comments
 (0)