@@ -17,6 +17,7 @@ declare module "pg" {
1717interface Keys {
1818 keynames : Array < string > ;
1919 keytypes : Array < string > ;
20+ keytypeoids : Array < number > ;
2021 keyvalues : Array < any > ;
2122}
2223
@@ -34,7 +35,8 @@ export interface InsertChange extends Change {
3435
3536 // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L969
3637 columnnames : Array < string > ;
37- columntypes : Array < string > ;
38+ columntypes : Array < string > ; // with `include-types` option (default true)
39+ columntypeoids : Array < number > ; // with `include-type-oids` option (default false)
3840 columnvalues : Array < any > ;
3941}
4042
@@ -43,7 +45,8 @@ export interface UpdateChange extends Change {
4345
4446 // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L973
4547 columnnames : Array < string > ;
46- columntypes : Array < string > ;
48+ columntypes : Array < string > ; // with `include-types` option (default true)
49+ columntypeoids : Array < number > ; // with `include-type-oids` option (default false)
4750 columnvalues : Array < any > ;
4851
4952 // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L992-L1003
@@ -57,16 +60,26 @@ export interface DeleteChange extends Change {
5760 oldkeys : Keys ;
5861}
5962
63+ const parse = ( value : any , typeOid : number ) => {
64+ if ( value === null ) return null ;
65+ // wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`.
66+ if ( typeOid === pg . types . builtins . BOOL ) return value ;
67+ // FIXME: this should use `client.getTypeParser` or have some other non-global option to configure type parsing
68+ const parser = pg . types . getTypeParser ( typeOid , "text" ) ;
69+ return parser ( value ) ;
70+ } ;
71+
6072export const changeToRecord = ( change : InsertChange | UpdateChange ) => {
61- const { columnnames, columnvalues } = change ;
62- return columnnames . reduce ( ( memo , name , i ) => {
63- memo [ name ] = columnvalues [ i ] ;
73+ const { columnnames, columnvalues, columntypeoids } = change ;
74+ return columnnames . reduce < Record < string , any > > ( ( memo , name , i ) => {
75+ memo [ name ] = parse ( columnvalues [ i ] , columntypeoids [ i ] ) ;
6476 return memo ;
6577 } , { } ) ;
6678} ;
6779
6880export const changeToPk = ( change : UpdateChange | DeleteChange ) => {
69- return change . oldkeys . keyvalues ;
81+ const { keyvalues, keytypeoids } = change . oldkeys ;
82+ return keyvalues . map ( ( value , i ) => parse ( value , keytypeoids [ i ] ) ) ;
7083} ;
7184
7285interface Payload {
@@ -173,7 +186,7 @@ export default class PgLogicalDecoding extends EventEmitter {
173186 await this . trackSelf ( client ) ;
174187 try {
175188 const { rows } = await client . query ( {
176- text : `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text)` ,
189+ text : `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text, 'include-types', 'f', 'include-type-oids', 't', 'numeric-data-types-as-string', 't' )` ,
177190 values : [ this . slotName , uptoLsn , uptoNchanges , this . tablePattern ] ,
178191 rowMode : "array" ,
179192 } ) ;
0 commit comments