Skip to content

Commit 6762a4e

Browse files
authored
Omni routing and change to convergence for schema sharding (#620)
* Omni routing and change to convergence for schema sharding * save * Fix test * Add config test * Rename
1 parent 919e5ee commit 6762a4e

20 files changed

Lines changed: 361 additions & 41 deletions

File tree

integration/d_plus/pgdog.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ tables = [
2323
"pg_namespace",
2424
"pg_am"
2525
]
26+
sticky = true

pgdog-config/src/core.rs

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::path::PathBuf;
55
use tracing::{info, warn};
66

77
use crate::sharding::ShardedSchema;
8-
use crate::{EnumeratedDatabase, Memory, PassthoughAuth, PreparedStatements, RewriteMode};
8+
use crate::{
9+
EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements, RewriteMode,
10+
};
911

1012
use super::database::Database;
1113
use super::error::Error;
@@ -227,15 +229,18 @@ impl Config {
227229
tables
228230
}
229231

230-
pub fn omnisharded_tables(&self) -> HashMap<String, Vec<String>> {
232+
pub fn omnisharded_tables(&self) -> HashMap<String, Vec<OmnishardedTable>> {
231233
let mut tables = HashMap::new();
232234

233235
for table in &self.omnisharded_tables {
234236
let entry = tables
235237
.entry(table.database.clone())
236238
.or_insert_with(Vec::new);
237239
for t in &table.tables {
238-
entry.push(t.clone());
240+
entry.push(OmnishardedTable {
241+
name: t.clone(),
242+
sticky_routing: table.sticky,
243+
});
239244
}
240245
}
241246

@@ -647,4 +652,58 @@ password = "users_admin_password"
647652
);
648653
assert!(config_and_users.users.admin.is_none());
649654
}
655+
656+
#[test]
657+
fn test_omnisharded_tables() {
658+
let source = r#"
659+
[general]
660+
host = "0.0.0.0"
661+
port = 6432
662+
663+
[[databases]]
664+
name = "db1"
665+
host = "127.0.0.1"
666+
port = 5432
667+
668+
[[databases]]
669+
name = "db2"
670+
host = "127.0.0.1"
671+
port = 5433
672+
673+
[[omnisharded_tables]]
674+
database = "db1"
675+
tables = ["table_a", "table_b"]
676+
677+
[[omnisharded_tables]]
678+
database = "db1"
679+
tables = ["table_c"]
680+
sticky = true
681+
682+
[[omnisharded_tables]]
683+
database = "db2"
684+
tables = ["table_x"]
685+
"#;
686+
687+
let config: Config = toml::from_str(source).unwrap();
688+
689+
assert_eq!(config.omnisharded_tables.len(), 3);
690+
691+
let tables = config.omnisharded_tables();
692+
693+
assert_eq!(tables.len(), 2);
694+
695+
let db1_tables = tables.get("db1").unwrap();
696+
assert_eq!(db1_tables.len(), 3);
697+
assert_eq!(db1_tables[0].name, "table_a");
698+
assert!(!db1_tables[0].sticky_routing);
699+
assert_eq!(db1_tables[1].name, "table_b");
700+
assert!(!db1_tables[1].sticky_routing);
701+
assert_eq!(db1_tables[2].name, "table_c");
702+
assert!(db1_tables[2].sticky_routing);
703+
704+
let db2_tables = tables.get("db2").unwrap();
705+
assert_eq!(db2_tables.len(), 1);
706+
assert_eq!(db2_tables[0].name, "table_x");
707+
assert!(!db2_tables[0].sticky_routing);
708+
}
650709
}

pgdog-config/src/sharding.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,14 @@ impl From<String> for FlexibleType {
177177
pub struct OmnishardedTables {
178178
pub database: String,
179179
pub tables: Vec<String>,
180+
#[serde(default)]
181+
pub sticky: bool,
182+
}
183+
184+
#[derive(PartialEq, Debug, Clone, Default)]
185+
pub struct OmnishardedTable {
186+
pub name: String,
187+
pub sticky_routing: bool,
180188
}
181189

182190
/// Queries with manual routing rules.

pgdog/src/backend/pool/cluster.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,8 @@ impl Cluster {
538538
mod test {
539539
use std::{sync::Arc, time::Duration};
540540

541+
use pgdog_config::OmnishardedTable;
542+
541543
use crate::{
542544
backend::{
543545
pool::{Address, Config, PoolConfig, ShardConfig},
@@ -597,7 +599,16 @@ mod test {
597599
hasher: Hasher::Postgres,
598600
..Default::default()
599601
}],
600-
vec!["sharded_omni".into()],
602+
vec![
603+
OmnishardedTable {
604+
name: "sharded_omni".into(),
605+
sticky_routing: false,
606+
},
607+
OmnishardedTable {
608+
name: "sharded_omni_sticky".into(),
609+
sticky_routing: true,
610+
},
611+
],
601612
),
602613
shards,
603614
identifier,

pgdog/src/backend/replication/logical/subscriber/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ impl<'a> StreamContext<'a> {
5656
&mut self.prepared_statements,
5757
&self.params,
5858
None,
59+
1,
5960
)?)
6061
}
6162
}

pgdog/src/backend/replication/sharded_tables.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
//! Tables sharded in the database.
2+
use pgdog_config::OmnishardedTable;
3+
24
use crate::{
35
config::{DataType, ShardedTable},
46
frontend::router::sharding::Mapping,
57
net::messages::Vector,
68
};
7-
use std::{collections::HashSet, sync::Arc};
9+
use std::{
10+
collections::{HashMap, HashSet},
11+
sync::Arc,
12+
};
813

914
#[derive(Default, Debug)]
1015
struct Inner {
1116
tables: Vec<ShardedTable>,
12-
omnisharded: HashSet<String>,
17+
omnisharded: HashMap<String, bool>, // Name <-> sticky routing
1318
/// This is set only if we have the same sharding scheme
1419
/// across all tables, i.e., 3 tables with the same data type
1520
/// and list/range/hash function.
@@ -46,7 +51,7 @@ impl From<&[ShardedTable]> for ShardedTables {
4651
}
4752

4853
impl ShardedTables {
49-
pub fn new(tables: Vec<ShardedTable>, omnisharded_tables: Vec<String>) -> Self {
54+
pub fn new(tables: Vec<ShardedTable>, omnisharded_tables: Vec<OmnishardedTable>) -> Self {
5055
let mut common_mapping = HashSet::new();
5156
for table in &tables {
5257
common_mapping.insert((
@@ -69,7 +74,10 @@ impl ShardedTables {
6974
Self {
7075
inner: Arc::new(Inner {
7176
tables,
72-
omnisharded: omnisharded_tables.into_iter().collect(),
77+
omnisharded: omnisharded_tables
78+
.into_iter()
79+
.map(|table| (table.name, table.sticky_routing))
80+
.collect(),
7381
common_mapping,
7482
}),
7583
}
@@ -79,7 +87,7 @@ impl ShardedTables {
7987
&self.inner.tables
8088
}
8189

82-
pub fn omnishards(&self) -> &HashSet<String> {
90+
pub fn omnishards(&self) -> &HashMap<String, bool> {
8391
&self.inner.omnisharded
8492
}
8593

pgdog/src/frontend/client/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::net::SocketAddr;
44
use std::sync::Arc;
55
use std::time::{Duration, Instant};
66

7+
use rand::{thread_rng, Rng};
78
use timeouts::Timeouts;
89
use tokio::{select, spawn, time::timeout};
910
use tracing::{debug, enabled, error, info, trace, Level as LogLevel};
@@ -28,7 +29,6 @@ use crate::state::State;
2829
use crate::stats::memory::MemoryUsage;
2930
use crate::util::user_database_from_params;
3031

31-
// pub mod counter;
3232
pub mod query_engine;
3333
pub mod timeouts;
3434

@@ -50,6 +50,7 @@ pub struct Client {
5050
client_request: ClientRequest,
5151
stream_buffer: MessageBuffer,
5252
passthrough_password: Option<String>,
53+
omni_sticky_index: usize,
5354
}
5455

5556
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
@@ -315,6 +316,7 @@ impl Client {
315316
stream_buffer: MessageBuffer::new(config.config.memory.message_buffer),
316317
shutdown: false,
317318
passthrough_password,
319+
omni_sticky_index: thread_rng().gen_range(1..usize::MAX),
318320
}))
319321
}
320322

@@ -342,6 +344,7 @@ impl Client {
342344
stream_buffer: MessageBuffer::new(4096),
343345
shutdown: false,
344346
passthrough_password: None,
347+
omni_sticky_index: 1,
345348
}
346349
}
347350

pgdog/src/frontend/client/query_engine/context.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use rand::{thread_rng, Rng};
2+
13
use crate::{
24
backend::pool::{connection::mirror::Mirror, stats::MemoryStats},
35
frontend::{
@@ -34,6 +36,8 @@ pub struct QueryEngineContext<'a> {
3436
pub(super) admin: bool,
3537
/// Executing rollback statement.
3638
pub(super) rollback: bool,
39+
/// Omnisharded modulo.
40+
pub(super) omni_sticky_index: usize,
3741
}
3842

3943
impl<'a> QueryEngineContext<'a> {
@@ -53,6 +57,7 @@ impl<'a> QueryEngineContext<'a> {
5357
admin: client.admin,
5458
requests_left: 0,
5559
rollback: false,
60+
omni_sticky_index: client.omni_sticky_index,
5661
}
5762
}
5863

@@ -77,6 +82,7 @@ impl<'a> QueryEngineContext<'a> {
7782
admin: false,
7883
requests_left: 0,
7984
rollback: false,
85+
omni_sticky_index: thread_rng().gen_range(1..usize::MAX),
8086
}
8187
}
8288

pgdog/src/frontend/client/query_engine/route_query.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl QueryEngine {
5050
context.prepared_statements,
5151
context.params,
5252
context.transaction,
53+
context.omni_sticky_index,
5354
)?;
5455
match self.router.query(router_context) {
5556
Ok(cmd) => {

pgdog/src/frontend/router/cli.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl RouterCli {
5555
&mut stmt,
5656
&mut params,
5757
None,
58+
1,
5859
)?)?;
5960
result.push(cmd);
6061
}

0 commit comments

Comments
 (0)