Skip to content

Commit 2b9657c

Browse files
committed
feat(lds): make wal2json parameters configurable
* add `LdsOptions.params` * fix `numeric-data-types-as-string` to be set only if `types` are passed * adjust the `parse` function accordingly, to handle `number` values without a type parser
1 parent c5c7e7c commit 2b9657c

1 file changed

Lines changed: 25 additions & 12 deletions

File tree

packages/lds/src/pg-logical-decoding.ts

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,45 +83,58 @@ export interface LdsOptions {
8383
temporary?: boolean;
8484
/** (Custom) [type parsers](https://node-postgres.com/features/queries#types) to deserialise the wal2json column string values. Pass `pg.types` to get the default type parsing. Defaults to `undefined`, that is raw values will get emitted. */
8585
types?: pg.CustomTypesConfig;
86+
/** Extra [parameters to be passed to wal2json](https://github.com/eulerto/wal2json?tab=readme-ov-file#parameters). Use e.g. `{'numeric-data-types-as-string', 't'}` to make the type parsers apply to numeric values. */
87+
params?: Record<string, string>;
8688
}
8789

8890
export default class PgLogicalDecoding extends EventEmitter {
8991
public readonly slotName: string;
9092
public readonly temporary: boolean;
91-
private connectionString: string;
92-
private tablePattern: string;
93+
private readonly getChangesQueryText: string;
94+
private readonly parse: (value: any, typeOid: number) => any;
9395
private pool: pg.Pool | null;
9496
private client: Promise<pg.PoolClient> | null;
95-
private readonly parse: (value: any, typeOid: number) => any;
9697

9798
constructor(connectionString: string, options?: LdsOptions) {
9899
super();
99-
this.connectionString = connectionString;
100100
const {
101101
tablePattern = "*.*",
102102
slotName = "postgraphile",
103103
temporary = false,
104104
types,
105+
params,
105106
} = options || {};
106-
this.tablePattern = tablePattern;
107107
this.slotName = slotName;
108108
this.temporary = temporary;
109+
const parametersSql = Object.entries({
110+
"add-tables": tablePattern != "*.*" ? tablePattern : null,
111+
"include-types": "f", // type names are unnecessary
112+
"include-type-oids": types ? "t" : null,
113+
"numeric-data-types-as-string": types ? "t" : null,
114+
...params,
115+
})
116+
.flatMap(entry => (typeof entry[1] == "string" ? entry : []))
117+
.map(pg.Client.prototype.escapeLiteral)
118+
.join(", ");
119+
this.getChangesQueryText = `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, ${parametersSql})`;
109120
this.parse = types
110121
? (value: any, typeOid: number) => {
111122
if (value === null) return null;
112-
// wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`.
113-
if (typeOid === pg.types.builtins.BOOL) return value;
114-
// FIXME: this should use `client.getTypeParser` or have some other non-global option to configure type parsing
123+
// wal2json always outputs `bool`s as boolean
124+
if (typeof value === "boolean") return value; // assert: typeOid === pg.types.builtins.BOOL
125+
// wal2json outputs numeric data as numbers, unless `numeric-data-types-as-string` is set
126+
if (typeof value === "number") return value;
115127
const parser = pg.types.getTypeParser(typeOid, "text");
116128
return parser(value);
117129
}
118130
: id;
119131
// We just use the pool to get better error handling
120132
this.pool = new pg.Pool({
121-
connectionString: this.connectionString,
133+
connectionString,
122134
max: 1,
123135
});
124136
this.pool.on("error", this.onPoolError);
137+
this.client = null;
125138
}
126139

127140
public async dropStaleSlots() {
@@ -182,9 +195,9 @@ export default class PgLogicalDecoding extends EventEmitter {
182195
const client = await this.getClient();
183196
await this.trackSelf(client);
184197
try {
185-
const { rows } = await client.query({
186-
text: `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text, 'include-types', 'f', 'include-type-oids', 't', 'numeric-data-types-as-string', 't')`,
187-
values: [this.slotName, uptoLsn, uptoNchanges, this.tablePattern],
198+
const { rows } = await client.query<[lsn: string, data: string]>({
199+
text: this.getChangesQueryText,
200+
values: [this.slotName, uptoLsn, uptoNchanges],
188201
rowMode: "array",
189202
});
190203
return rows.map(toLsnData);

0 commit comments

Comments
 (0)