Skip to content

Commit bd7f3e2

Browse files
authored
Omnisharded tables routing fix (#618)
* Fix omnishard tables detection * Tests * make explain work
1 parent 74e51d9 commit bd7f3e2

6 files changed

Lines changed: 126 additions & 9 deletions

File tree

integration/d_plus/pgdog.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[general]
2+
expanded_explain = true
3+
4+
[[databases]]
5+
name = "pgdog"
6+
host = "127.0.0.1"
7+
shard = 0
8+
database_name = "shard_0"
9+
10+
[[databases]]
11+
name = "pgdog"
12+
host = "127.0.0.1"
13+
shard = 1
14+
database_name = "shard_1"
15+
16+
[admin]
17+
password = "pgdog"
18+
19+
[[omnisharded_tables]]
20+
database = "pgdog"
21+
tables = [
22+
"pg_class",
23+
"pg_namespace",
24+
"pg_am"
25+
]

integration/d_plus/users.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[[users]]
2+
name = "pgdog"
3+
database = "pgdog"
4+
password = "pgdog"

pgdog/src/frontend/router/parser/from_clause.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use pg_query::{Node, NodeEnum};
33
use super::*;
44

55
/// Handle FROM <table/join> clause.
6-
#[derive(Copy, Clone)]
6+
#[derive(Copy, Clone, Debug)]
77
pub struct FromClause<'a> {
88
nodes: &'a [Node],
99
}
@@ -64,4 +64,37 @@ impl<'a> FromClause<'a> {
6464

6565
None
6666
}
67+
68+
/// Get all tables from the FROM clause.
69+
pub fn tables(&'a self) -> Vec<Table<'a>> {
70+
let mut tables = vec![];
71+
72+
fn tables_recursive(node: &Node) -> Vec<Table<'_>> {
73+
let mut tables = vec![];
74+
match node.node {
75+
Some(NodeEnum::RangeVar(ref range_var)) => {
76+
tables.push(Table::from(range_var));
77+
}
78+
79+
Some(NodeEnum::JoinExpr(ref join)) => {
80+
if let Some(ref node) = join.larg {
81+
tables.extend(tables_recursive(node));
82+
}
83+
if let Some(ref node) = join.rarg {
84+
tables.extend(tables_recursive(node));
85+
}
86+
}
87+
88+
_ => (),
89+
}
90+
91+
tables
92+
}
93+
94+
for node in self.nodes {
95+
tables.extend(tables_recursive(node));
96+
}
97+
98+
tables
99+
}
67100
}

pgdog/src/frontend/router/parser/query/select.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,36 @@ impl QueryParser {
105105

106106
let mut query = Route::select(shard, order_by, aggregates, limit, distinct);
107107

108-
let mut omni = false;
108+
// Omnisharded tables check.
109109
if query.is_all_shards() {
110-
if let Some(name) = from_clause.table_name() {
111-
omni = context.sharding_schema.tables.omnishards().contains(name);
112-
}
113-
}
110+
let tables = from_clause.tables();
111+
let omni = tables.iter().all(|table| {
112+
context
113+
.sharding_schema
114+
.tables
115+
.omnishards()
116+
.contains(table.name)
117+
});
118+
119+
if omni {
120+
let shard = round_robin::next() % context.shards;
114121

115-
if omni {
116-
query.set_shard_mut(round_robin::next() % context.shards);
122+
query.set_shard_mut(shard);
123+
124+
if let Some(recorder) = self.recorder_mut() {
125+
recorder.record_entry(
126+
Some(shard.into()),
127+
format!(
128+
"SELECT matched omnisharded tables: {}",
129+
tables
130+
.iter()
131+
.map(|table| table.name)
132+
.collect::<Vec<_>>()
133+
.join(", ")
134+
),
135+
);
136+
}
137+
}
117138
}
118139

119140
// Only rewrite if query is cross-shard.

pgdog/src/frontend/router/parser/query/test.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,33 @@ fn test_omni() {
329329
assert!(matches!(route.shard(), Shard::Direct(_)));
330330
let (_, qp) = command!(q);
331331
assert!(!qp.in_transaction);
332+
333+
// Test that sharded tables take priority.
334+
let q = "
335+
SELECT
336+
sharded_omni.*,
337+
sharded.*
338+
FROM
339+
sharded_omni
340+
INNER JOIN
341+
sharded
342+
ON sharded_omni.id = sharded.i
343+
WHERE sharded.id = 5";
344+
345+
let route = query!(q);
346+
let shard = route.shard().clone();
347+
348+
for _ in 0..5 {
349+
let route = query!(q);
350+
// Test that shard doesn't change (i.e. not round robin)
351+
assert_eq!(&shard, route.shard());
352+
assert!(matches!(shard, Shard::Direct(_)));
353+
}
354+
355+
// Test that all tables have to be omnisharded.
356+
let q = "SELECT * FROM sharded_omni INNER JOIN not_sharded ON sharded_omni.id = not_sharded.id WHERE sharded_omni = $1";
357+
let route = query!(q);
358+
assert!(matches!(route.shard(), Shard::All));
332359
}
333360

334361
#[test]

pgdog/src/frontend/router/parser/where_clause.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::frontend::router::parser::{from_clause::FromClause, Table};
1010

1111
use super::Key;
1212

13-
#[derive(Copy, Clone)]
13+
#[derive(Copy, Clone, Debug)]
1414
pub enum TablesSource<'a> {
1515
Table(Table<'a>),
1616
FromClause(FromClause<'a>),
@@ -54,6 +54,13 @@ impl<'a> TablesSource<'a> {
5454
Self::FromClause(fc) => fc.table_name(),
5555
}
5656
}
57+
58+
pub fn tables(&'a self) -> Vec<Table<'a>> {
59+
match self {
60+
Self::Table(table) => vec![*table],
61+
Self::FromClause(from_clause) => from_clause.tables(),
62+
}
63+
}
5764
}
5865

5966
#[derive(Debug)]

0 commit comments

Comments
 (0)