@@ -131,9 +131,8 @@ pub struct StreamSubscriber {
131131
132132 // Statements
133133 statements : HashMap < Oid , Statements > ,
134-
135- // Partitioned tables dedup.
136- partitioned_dedup : HashSet < Key > ,
134+ // Mapping of table keys to their oid.
135+ keys : HashMap < Key , Oid > ,
137136
138137 // LSNs for each table
139138 table_lsns : HashMap < Oid , i64 > ,
@@ -171,7 +170,6 @@ impl StreamSubscriber {
171170 cluster,
172171 relations : HashMap :: new ( ) ,
173172 statements : HashMap :: new ( ) ,
174- partitioned_dedup : HashSet :: new ( ) ,
175173 table_lsns : HashMap :: new ( ) ,
176174 changed_tables : HashSet :: new ( ) ,
177175 tables : tables
@@ -193,6 +191,7 @@ impl StreamSubscriber {
193191 in_transaction : false ,
194192 query_parser_engine,
195193 missed_rows : MissedRows :: default ( ) ,
194+ keys : HashMap :: default ( ) ,
196195 }
197196 }
198197
@@ -466,73 +465,78 @@ impl StreamSubscriber {
466465 name : table. table . destination_name ( ) . to_string ( ) ,
467466 } ;
468467
469- if self . partitioned_dedup . contains ( & dest_key) {
470- debug ! ( "queries for table {} already prepared" , dest_key) ;
471- return Ok ( ( ) ) ;
472- }
468+ // Partition child tables target the parent on the destination shard,
469+ // we don't need to prepare the same statement per child.
470+ if let Some ( oid) = self . keys . get ( & dest_key) {
471+ let statements = self . statements . get ( oid) . ok_or ( Error :: MissingKey ) ?;
472+ self . statements . insert ( relation. oid , statements. clone ( ) ) ;
473473
474- let insert = Statement :: new ( & table. insert ( false ) , self . query_parser_engine ) ?;
475- let upsert = Statement :: new ( & table. insert ( true ) , self . query_parser_engine ) ?;
476- let update = Statement :: new ( & table. update ( ) , self . query_parser_engine ) ?;
477- let delete = Statement :: new ( & table. delete ( ) , self . query_parser_engine ) ?;
474+ debug ! ( "queries for table {} already prepared" , dest_key) ;
475+ } else {
476+ let insert = Statement :: new ( & table. insert ( false ) , self . query_parser_engine ) ?;
477+ let upsert = Statement :: new ( & table. insert ( true ) , self . query_parser_engine ) ?;
478+ let update = Statement :: new ( & table. update ( ) , self . query_parser_engine ) ?;
479+ let delete = Statement :: new ( & table. delete ( ) , self . query_parser_engine ) ?;
480+
481+ for server in & mut self . connections {
482+ for stmt in & [ & insert, & upsert, & update, & delete] {
483+ debug ! ( "preparing \" {}\" [{}]" , stmt. query( ) , server. addr( ) ) ;
484+ }
478485
479- for server in & mut self . connections {
480- for stmt in & [ & insert, & upsert, & update, & delete] {
481- debug ! ( "preparing \" {}\" [{}]" , stmt. query( ) , server. addr( ) ) ;
486+ server
487+ . send (
488+ & vec ! [
489+ insert. parse( ) . clone( ) . into( ) ,
490+ upsert. parse( ) . clone( ) . into( ) ,
491+ update. parse( ) . clone( ) . into( ) ,
492+ delete. parse( ) . clone( ) . into( ) ,
493+ if self . in_transaction {
494+ Flush . into( )
495+ } else {
496+ Sync . into( )
497+ } ,
498+ ]
499+ . into ( ) ,
500+ )
501+ . await ?;
482502 }
483503
484- server
485- . send (
486- & vec ! [
487- insert. parse( ) . clone( ) . into( ) ,
488- upsert. parse( ) . clone( ) . into( ) ,
489- update. parse( ) . clone( ) . into( ) ,
490- delete. parse( ) . clone( ) . into( ) ,
491- if self . in_transaction {
492- Flush . into( )
493- } else {
494- Sync . into( )
495- } ,
496- ]
497- . into ( ) ,
498- )
499- . await ?;
500- }
501-
502- for server in & mut self . connections {
503- let num_messages = if self . in_transaction { 4 } else { 5 } ;
504- for _ in 0 ..num_messages {
505- let msg = server. read ( ) . await ?;
506- trace ! ( "[{}] --> {:?}" , server. addr( ) , msg) ;
507-
508- match msg. code ( ) {
509- 'E' => {
510- return Err ( Error :: PgError ( Box :: new ( ErrorResponse :: from_bytes (
511- msg. to_bytes ( ) ?,
512- ) ?) ) )
504+ for server in & mut self . connections {
505+ let num_messages = if self . in_transaction { 4 } else { 5 } ;
506+ for _ in 0 ..num_messages {
507+ let msg = server. read ( ) . await ?;
508+ trace ! ( "[{}] --> {:?}" , server. addr( ) , msg) ;
509+
510+ match msg. code ( ) {
511+ 'E' => {
512+ return Err ( Error :: PgError ( Box :: new ( ErrorResponse :: from_bytes (
513+ msg. to_bytes ( ) ?,
514+ ) ?) ) )
515+ }
516+ 'Z' => break ,
517+ '1' => continue ,
518+ c => return Err ( Error :: RelationOutOfSync ( c) ) ,
513519 }
514- 'Z' => break ,
515- '1' => continue ,
516- c => return Err ( Error :: RelationOutOfSync ( c) ) ,
517520 }
518521 }
519- }
520522
521- self . statements . insert (
522- relation. oid ,
523- Statements {
524- insert,
525- upsert,
526- update,
527- delete,
528- omni : !table. is_sharded ( & self . cluster . sharding_schema ( ) . tables ) ,
529- } ,
530- ) ;
523+ self . statements . insert (
524+ relation. oid ,
525+ Statements {
526+ insert,
527+ upsert,
528+ update,
529+ delete,
530+ omni : !table. is_sharded ( & self . cluster . sharding_schema ( ) . tables ) ,
531+ } ,
532+ ) ;
533+
534+ self . keys . insert ( dest_key, relation. oid ) ;
535+ }
531536
532537 // Only record tables we expect to stream changes for.
533538 self . table_lsns . insert ( relation. oid , table. lsn . lsn ) ;
534539 self . relations . insert ( relation. oid , relation) ;
535- self . partitioned_dedup . insert ( dest_key) ;
536540 }
537541
538542 Ok ( ( ) )
0 commit comments