Skip to content

Commit c5c7e7c

Browse files
committed
feat(lds): make type parsing configurable
* Refactor `changeToRecord` and `changeToPk` from exports into methods of `PgLogicalDecoding` * add `LdsOptions.types` to choose parsers, use noop identity function by default * call `changeToRecord` only once on updates
1 parent 6af5c2b commit c5c7e7c

2 files changed

Lines changed: 39 additions & 31 deletions

File tree

packages/lds/src/index.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
/* eslint-disable no-console,curly */
2-
import PgLogicalDecoding, {
3-
changeToRecord,
4-
changeToPk,
5-
LdsOptions,
6-
} from "./pg-logical-decoding";
2+
import PgLogicalDecoding, { LdsOptions } from "./pg-logical-decoding";
73
import FatalError from "./fatal-error";
84

95
export interface Options extends LdsOptions {
@@ -108,31 +104,32 @@ export default async function subscribeToLogicalDecoding(
108104
_: "insertC",
109105
schema,
110106
table,
111-
data: changeToRecord(change),
107+
data: client.changeToRecord(change),
112108
};
113109
callback(announcement);
114110
} else if (change.kind === "update") {
111+
const data = client.changeToRecord(change);
115112
const rowAnnouncement: UpdateRowAnnouncement = {
116113
_: "update",
117114
schema,
118115
table,
119-
keys: changeToPk(change),
120-
data: changeToRecord(change),
116+
keys: client.changeToPk(change),
117+
data,
121118
};
122119
callback(rowAnnouncement);
123120
const collectionAnnouncement: UpdateCollectionAnnouncement = {
124121
_: "updateC",
125122
schema,
126123
table,
127-
data: changeToRecord(change),
124+
data,
128125
};
129126
callback(collectionAnnouncement);
130127
} else if (change.kind === "delete") {
131128
const announcement: DeleteRowAnnouncement = {
132129
_: "delete",
133130
schema,
134131
table,
135-
keys: changeToPk(change),
132+
keys: client.changeToPk(change),
136133
};
137134
callback(announcement);
138135
} else {

packages/lds/src/pg-logical-decoding.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,27 +60,7 @@ export interface DeleteChange extends Change {
6060
oldkeys: Keys;
6161
}
6262

63-
const parse = (value: any, typeOid: number) => {
64-
if (value === null) return null;
65-
// wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`.
66-
if (typeOid === pg.types.builtins.BOOL) return value;
67-
// FIXME: this should use `client.getTypeParser` or have some other non-global option to configure type parsing
68-
const parser = pg.types.getTypeParser(typeOid, "text");
69-
return parser(value);
70-
};
71-
72-
export const changeToRecord = (change: InsertChange | UpdateChange) => {
73-
const { columnnames, columnvalues, columntypeoids } = change;
74-
return columnnames.reduce<Record<string, any>>((memo, name, i) => {
75-
memo[name] = parse(columnvalues[i], columntypeoids[i]);
76-
return memo;
77-
}, {});
78-
};
79-
80-
export const changeToPk = (change: UpdateChange | DeleteChange) => {
81-
const { keyvalues, keytypeoids } = change.oldkeys;
82-
return keyvalues.map((value, i) => parse(value, keytypeoids[i]));
83-
};
63+
const id = <T>(value: T): T => value;
8464

8565
interface Payload {
8666
lsn: string;
@@ -101,6 +81,8 @@ export interface LdsOptions {
10181
slotName?: string;
10282
/** Whether `.createSlot()` should create a temporary replication slot which will be limited to the `client` session and gets cleaned up automatically. Defaults to `false`. */
10383
temporary?: boolean;
84+
/** (Custom) [type parsers](https://node-postgres.com/features/queries#types) to deserialise the wal2json column string values. Pass `pg.types` to get the default type parsing. Defaults to `undefined`, that is raw values will get emitted. */
85+
types?: pg.CustomTypesConfig;
10486
}
10587

10688
export default class PgLogicalDecoding extends EventEmitter {
@@ -110,6 +92,7 @@ export default class PgLogicalDecoding extends EventEmitter {
11092
private tablePattern: string;
11193
private pool: pg.Pool | null;
11294
private client: Promise<pg.PoolClient> | null;
95+
private readonly parse: (value: any, typeOid: number) => any;
11396

11497
constructor(connectionString: string, options?: LdsOptions) {
11598
super();
@@ -118,10 +101,21 @@ export default class PgLogicalDecoding extends EventEmitter {
118101
tablePattern = "*.*",
119102
slotName = "postgraphile",
120103
temporary = false,
104+
types,
121105
} = options || {};
122106
this.tablePattern = tablePattern;
123107
this.slotName = slotName;
124108
this.temporary = temporary;
109+
this.parse = types
110+
? (value: any, typeOid: number) => {
111+
if (value === null) return null;
112+
// wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`.
113+
if (typeOid === pg.types.builtins.BOOL) return value;
114+
// FIXME: this should use `client.getTypeParser` or have some other non-global option to configure type parsing
115+
const parser = pg.types.getTypeParser(typeOid, "text");
116+
return parser(value);
117+
}
118+
: id;
125119
// We just use the pool to get better error handling
126120
this.pool = new pg.Pool({
127121
connectionString: this.connectionString,
@@ -208,6 +202,23 @@ export default class PgLogicalDecoding extends EventEmitter {
208202
}
209203
}
210204

205+
public changeToRecord(
206+
change: InsertChange | UpdateChange
207+
): Record<string, any> {
208+
const { columnnames, columnvalues, columntypeoids } = change;
209+
return columnnames.reduce<Record<string, any>>((memo, name, i) => {
210+
memo[name] = this.parse(columnvalues[i], columntypeoids[i]);
211+
return memo;
212+
}, {});
213+
}
214+
215+
public changeToPk(change: UpdateChange | DeleteChange): any[] {
216+
const { keyvalues, keytypeoids } = change.oldkeys;
217+
return this.parse == id
218+
? keyvalues
219+
: keyvalues.map((value, i) => this.parse(value, keytypeoids[i]));
220+
}
221+
211222
public async close() {
212223
if (!this.temporary) {
213224
const client = await this.getClient();

0 commit comments

Comments
 (0)