Skip to content

Commit 7a7f582

Browse files
authored
feat: pgdog.install_sharded_sequence() is cross-sharded (#881)
Make sure that installing sharded sequences is sent to all shards.
1 parent b8bade1 commit 7a7f582

4 files changed

Lines changed: 126 additions & 20 deletions

File tree

pgdog/src/frontend/router/parser/function.rs

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashMap;
1+
use std::collections::{HashMap, HashSet};
22

33
use once_cell::sync::Lazy;
44
use pg_query::{protobuf, Node, NodeEnum};
@@ -20,6 +20,9 @@ static WRITE_ONLY: Lazy<HashMap<&'static str, LockingBehavior>> = Lazy::new(|| {
2020
])
2121
});
2222

23+
static CROSS_SHARD: Lazy<HashSet<(&'static str, &'static str)>> =
24+
Lazy::new(|| HashSet::from([("pgdog", "install_sharded_sequence")]));
25+
2326
#[derive(Debug, Copy, Clone, PartialEq, Default)]
2427
pub enum LockingBehavior {
2528
Lock,
@@ -32,6 +35,7 @@ pub enum LockingBehavior {
3235
pub struct FunctionBehavior {
3336
pub writes: bool,
3437
pub locking_behavior: LockingBehavior,
38+
pub cross_shard: bool,
3539
}
3640

3741
impl FunctionBehavior {
@@ -45,28 +49,49 @@ impl FunctionBehavior {
4549

4650
pub struct Function<'a> {
4751
pub name: &'a str,
52+
pub schema: Option<&'a str>,
4853
}
4954

5055
impl<'a> Function<'a> {
51-
fn from_string(node: &'a Option<NodeEnum>) -> Result<Self, ()> {
52-
match node {
53-
Some(NodeEnum::String(protobuf::String { sval })) => Ok(Self {
54-
name: sval.as_str(),
56+
/// Build a Function from a qualified name list (as found in `FuncCall.funcname`).
57+
/// The last element is the function name; the preceding element (if any) is the
58+
/// schema.
59+
fn from_strings(parts: &'a [Node]) -> Result<Self, ()> {
60+
let str_of = |node: &'a Node| match &node.node {
61+
Some(NodeEnum::String(protobuf::String { sval })) => Ok(sval.as_str()),
62+
_ => Err(()),
63+
};
64+
match parts {
65+
[name] => Ok(Self {
66+
name: str_of(name)?,
67+
schema: None,
68+
}),
69+
[.., schema, name] => Ok(Self {
70+
name: str_of(name)?,
71+
schema: Some(str_of(schema)?),
5572
}),
56-
5773
_ => Err(()),
5874
}
5975
}
6076

6177
/// This function likely writes.
6278
pub fn behavior(&self) -> FunctionBehavior {
79+
let cross_shard = self
80+
.schema
81+
.map(|schema| CROSS_SHARD.contains(&(schema, self.name)))
82+
.unwrap_or(false);
83+
6384
if let Some(locks) = WRITE_ONLY.get(&self.name) {
6485
FunctionBehavior {
6586
writes: true,
6687
locking_behavior: *locks,
88+
cross_shard,
6789
}
6890
} else {
69-
FunctionBehavior::default()
91+
FunctionBehavior {
92+
cross_shard,
93+
..FunctionBehavior::default()
94+
}
7095
}
7196
}
7297
}
@@ -76,9 +101,7 @@ impl<'a> TryFrom<&'a Node> for Function<'a> {
76101
fn try_from(value: &'a Node) -> Result<Self, Self::Error> {
77102
match &value.node {
78103
Some(NodeEnum::FuncCall(func)) => {
79-
if let Some(node) = func.funcname.last() {
80-
return Self::from_string(&node.node);
81-
}
104+
return Self::from_strings(&func.funcname);
82105
}
83106

84107
Some(NodeEnum::TypeCast(cast)) => {
@@ -123,10 +146,52 @@ mod test {
123146
for node in &stmt.target_list {
124147
let func = Function::try_from(node).unwrap();
125148
assert!(func.name.contains("advisory_lock"));
149+
assert!(func.schema.is_none());
150+
assert!(!func.behavior().cross_shard);
126151
}
127152
}
128153

129154
_ => panic!("not a select"),
130155
}
131156
}
157+
158+
fn first_func<R>(query: &str, check: impl FnOnce(Function<'_>) -> R) -> R {
159+
let ast = parse(query).unwrap();
160+
let root = ast.protobuf.stmts.first().unwrap().stmt.as_ref().unwrap();
161+
match root.node.as_ref() {
162+
Some(NodeEnum::SelectStmt(stmt)) => {
163+
let target = stmt.target_list.first().unwrap();
164+
check(Function::try_from(target).unwrap())
165+
}
166+
_ => panic!("not a select"),
167+
}
168+
}
169+
170+
#[test]
171+
fn test_cross_shard_function() {
172+
first_func(
173+
"SELECT pgdog.install_sharded_sequence('foo', 'id')",
174+
|func| {
175+
assert_eq!(func.name, "install_sharded_sequence");
176+
assert_eq!(func.schema, Some("pgdog"));
177+
assert!(func.behavior().cross_shard);
178+
},
179+
);
180+
181+
// Same function name without the schema should not be flagged.
182+
first_func("SELECT install_sharded_sequence('foo', 'id')", |func| {
183+
assert_eq!(func.name, "install_sharded_sequence");
184+
assert!(func.schema.is_none());
185+
assert!(!func.behavior().cross_shard);
186+
});
187+
188+
// Different schema should not be flagged.
189+
first_func(
190+
"SELECT other.install_sharded_sequence('foo', 'id')",
191+
|func| {
192+
assert_eq!(func.schema, Some("other"));
193+
assert!(!func.behavior().cross_shard);
194+
},
195+
);
196+
}
132197
}

pgdog/src/frontend/router/parser/query/select.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,27 @@ impl QueryParser {
2121
context: &mut QueryParserContext,
2222
) -> Result<Command, Error> {
2323
let cte_writes = Self::cte_writes(stmt);
24-
let mut writes = Self::functions(stmt)?;
24+
let mut overrides = Self::functions(stmt)?;
2525

2626
// Write overwrite because of conservative read/write split.
2727
if self.write_override {
28-
writes.writes = true;
28+
overrides.writes = true;
2929
}
3030

3131
if cte_writes {
32-
writes.writes = true;
32+
overrides.writes = true;
33+
}
34+
35+
if overrides.cross_shard {
36+
context
37+
.shards_calculator
38+
.push(ShardWithPriority::new_override_cross_shard_function());
3339
}
3440

3541
// Early return for any direct-to-shard queries.
3642
if context.shards_calculator.shard().is_direct() {
3743
return Ok(Command::Query(
38-
Route::read(context.shards_calculator.shard().clone()).with_write(writes),
44+
Route::read(context.shards_calculator.shard().clone()).with_functions(overrides),
3945
));
4046
}
4147

@@ -83,7 +89,7 @@ impl QueryParser {
8389
.push(ShardWithPriority::new_rr_no_table(shard));
8490

8591
return Ok(Command::Query(
86-
Route::read(context.shards_calculator.shard().clone()).with_write(writes),
92+
Route::read(context.shards_calculator.shard().clone()).with_functions(overrides),
8793
));
8894
}
8995

@@ -201,7 +207,7 @@ impl QueryParser {
201207
query.with_aggregate_rewrite_plan_mut(cached_ast.rewrite_plan.aggregates.clone());
202208
}
203209

204-
Ok(Command::Query(query.with_write(writes)))
210+
Ok(Command::Query(query.with_functions(overrides)))
205211
}
206212

207213
/// Handle the `ORDER BY` clause of a `SELECT` statement.

pgdog/src/frontend/router/parser/query/test/test_functions.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,29 @@ fn test_write_function_nextval() {
3131
assert!(command.route().is_write());
3232
assert!(!command.route().is_lock_session());
3333
}
34+
35+
#[test]
36+
fn test_cross_shard_install_sharded_sequence() {
37+
let mut test = QueryParserTest::new();
38+
39+
let command = test.execute(vec![Query::new(
40+
"SELECT pgdog.install_sharded_sequence('foo', 'id')",
41+
)
42+
.into()]);
43+
44+
assert!(command.route().is_cross_shard());
45+
}
46+
47+
#[test]
48+
fn test_install_sharded_sequence_without_schema_not_cross_shard() {
49+
// Without the `pgdog.` schema qualifier we should not flag the call
50+
// as a cross-shard function — it could be any user-defined function.
51+
let mut test = QueryParserTest::new();
52+
53+
let command = test.execute(vec![Query::new(
54+
"SELECT install_sharded_sequence('foo', 'id')",
55+
)
56+
.into()]);
57+
58+
assert!(!command.route().is_cross_shard());
59+
}

pgdog/src/frontend/router/parser/route.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,16 +273,17 @@ impl Route {
273273
self.rollback_savepoint
274274
}
275275

276-
pub fn with_write(mut self, write: FunctionBehavior) -> Self {
277-
self.set_write(write);
276+
pub fn with_functions(mut self, function: FunctionBehavior) -> Self {
277+
self.set_functions(function);
278278
self
279279
}
280280

281-
pub fn set_write(&mut self, write: FunctionBehavior) {
281+
pub fn set_functions(&mut self, function: FunctionBehavior) {
282282
let FunctionBehavior {
283283
writes,
284284
locking_behavior,
285-
} = write;
285+
..
286+
} = function;
286287
self.read = !writes;
287288
self.lock_session = match locking_behavior {
288289
LockingBehavior::Lock => Some(true),
@@ -364,6 +365,7 @@ pub enum OverrideReason {
364365
Transaction,
365366
OnlyOneShard,
366367
RewriteUpdate,
368+
CrossShardFunction,
367369
}
368370

369371
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
@@ -424,6 +426,13 @@ impl ShardWithPriority {
424426
}
425427
}
426428

429+
pub fn new_override_cross_shard_function() -> Self {
430+
Self {
431+
shard: Shard::All,
432+
source: ShardSource::Override(OverrideReason::CrossShardFunction),
433+
}
434+
}
435+
427436
pub fn new_override_dry_run(shard: Shard) -> Self {
428437
Self {
429438
shard,

0 commit comments

Comments
 (0)