Skip to content

Commit 8d3ea65

Browse files
authored
feat: system catalogs tweaks, feat: migrate FKs to bigint too (#735)
- feat: make how we handle system catalogs configurable, i.e. if you want to get info from all shards in one query, set `system_catalogs = "sharded"` - feat: when migrating `integer` primary keys to `bigint`, handle foreign keys as well
1 parent 636d814 commit 8d3ea65

21 files changed

Lines changed: 419 additions & 135 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration/pgdog.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ healthcheck_port = 8080
2121
tls_certificate = "integration/tls/cert.pem"
2222
tls_private_key = "integration/tls/key.pem"
2323
query_parser_engine = "pg_query_raw"
24+
system_catalogs = "omnisharded_sticky"
2425

2526
[memory]
2627
net_buffer = 8096

integration/schema_sync/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
.claude
2+
destination.sql
3+
source.sql
4+
*.bak
5+
diff.txt

integration/schema_sync/dev.sh

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,31 @@ for f in source.sql destination.sql; do
4545
sed -i.bak '/^\\restrict.*$/d' $f
4646
sed -i.bak '/^\\unrestrict.*$/d' $f
4747
done
48-
rm -f source.sql.bak destination.sql.bak
49-
50-
# Verify integer primary keys are rewritten to bigint, and no other differences exist
51-
DIFF_OUTPUT=$(diff source.sql destination.sql || true)
52-
echo "$DIFF_OUTPUT" | grep -q 'flag_id integer NOT NULL' || { echo "Expected flag_id integer->bigint rewrite"; exit 1; }
53-
echo "$DIFF_OUTPUT" | grep -q 'flag_id bigint NOT NULL' || { echo "Expected flag_id integer->bigint rewrite"; exit 1; }
54-
echo "$DIFF_OUTPUT" | grep -q 'setting_id integer NOT NULL' || { echo "Expected setting_id integer->bigint rewrite"; exit 1; }
55-
echo "$DIFF_OUTPUT" | grep -q 'setting_id bigint NOT NULL' || { echo "Expected setting_id integer->bigint rewrite"; exit 1; }
56-
sed -i.bak 's/flag_id integer NOT NULL/flag_id bigint NOT NULL/g' source.sql
57-
sed -i.bak 's/setting_id integer NOT NULL/setting_id bigint NOT NULL/g' source.sql
58-
rm -f source.sql.bak
59-
diff source.sql destination.sql
60-
rm source.sql
61-
rm destination.sql
48+
49+
# Expected content changes (without line numbers for portability)
50+
EXPECTED_CHANGES=$(cat <<EOF
51+
< flag_id integer NOT NULL,
52+
> flag_id bigint NOT NULL,
53+
< setting_id integer NOT NULL,
54+
> setting_id bigint NOT NULL,
55+
< override_id integer NOT NULL,
56+
> override_id bigint NOT NULL,
57+
< flag_id integer NOT NULL,
58+
> flag_id bigint NOT NULL,
59+
EOF)
60+
61+
diff source.sql destination.sql > diff.txt || true
62+
63+
# Extract just the content lines (< and >) for comparison
64+
ACTUAL_CHANGES=$(grep '^[<>]' diff.txt)
65+
if [ "$ACTUAL_CHANGES" != "$EXPECTED_CHANGES" ]; then
66+
echo "Schema diff does not match expected changes"
67+
echo "=== Expected ==="
68+
echo "$EXPECTED_CHANGES"
69+
echo "=== Actual ==="
70+
echo "$ACTUAL_CHANGES"
71+
exit 1
72+
fi
73+
74+
rm source.sql destination.sql diff.txt
6275
popd

integration/schema_sync/ecommerce_schema.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,3 +1005,12 @@ CREATE TABLE core.feature_flags (
10051005
flag_name VARCHAR(100) NOT NULL UNIQUE,
10061006
is_enabled BOOLEAN NOT NULL DEFAULT FALSE
10071007
);
1008+
1009+
CREATE TABLE core.user_feature_overrides (
1010+
override_id SERIAL PRIMARY KEY,
1011+
user_id BIGINT NOT NULL REFERENCES core.users(user_id) ON DELETE CASCADE,
1012+
flag_id INTEGER NOT NULL REFERENCES core.feature_flags(flag_id) ON DELETE CASCADE,
1013+
is_enabled BOOLEAN NOT NULL,
1014+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1015+
UNIQUE(user_id, flag_id)
1016+
);

pgdog-config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ url = "2"
1313
uuid = { version = "1", features = ["v4", "serde"] }
1414
rand = "*"
1515
pgdog-vector = { path = "../pgdog-vector" }
16+
once_cell = "*"
1617

1718
[dev-dependencies]
1819
tempfile = "3.23.0"

pgdog-config/src/core.rs

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use tracing::{info, warn};
66

77
use crate::sharding::ShardedSchema;
88
use crate::{
9-
EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements,
10-
QueryParserEngine, QueryParserLevel, ReadWriteSplit, RewriteMode, Role,
9+
system_catalogs, EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth,
10+
PreparedStatements, QueryParserEngine, QueryParserLevel, ReadWriteSplit, RewriteMode, Role,
11+
SystemCatalogsBehavior,
1112
};
1213

1314
use super::database::Database;
@@ -249,32 +250,19 @@ impl Config {
249250

250251
// Automatically configure system catalogs
251252
// as omnisharded.
252-
if self.general.system_catalogs_omnisharded {
253+
if self.general.system_catalogs != SystemCatalogsBehavior::Sharded {
254+
let sticky_routing = matches!(
255+
self.general.system_catalogs,
256+
SystemCatalogsBehavior::OmnishardedSticky
257+
);
253258
for database in databases {
254259
let entry = tables.entry(database).or_insert_with(Vec::new);
255260

256-
for table in [
257-
"pg_class",
258-
"pg_attribute",
259-
"pg_attrdef",
260-
"pg_index",
261-
"pg_constraint",
262-
"pg_namespace",
263-
"pg_database",
264-
"pg_tablespace",
265-
"pg_type",
266-
"pg_proc",
267-
"pg_operator",
268-
"pg_cast",
269-
"pg_enum",
270-
"pg_range",
271-
"pg_authid",
272-
"pg_am",
273-
] {
274-
if entry.iter().find(|t| t.name == table).is_none() {
261+
for table in system_catalogs() {
262+
if entry.iter().find(|t| t.name == *table).is_none() {
275263
entry.push(OmnishardedTable {
276264
name: table.to_string(),
277-
sticky_routing: true,
265+
sticky_routing,
278266
});
279267
}
280268
}
@@ -765,7 +753,7 @@ password = "users_admin_password"
765753
[general]
766754
host = "0.0.0.0"
767755
port = 6432
768-
system_catalogs_omnisharded = false
756+
system_catalogs = "sharded"
769757
770758
[[databases]]
771759
name = "db1"
@@ -821,7 +809,7 @@ tables = ["table_x"]
821809
[general]
822810
host = "0.0.0.0"
823811
port = 6432
824-
system_catalogs_omnisharded = true
812+
system_catalogs = "omnisharded_sticky"
825813
826814
[[databases]]
827815
name = "db1"
@@ -848,12 +836,12 @@ tables = ["my_table"]
848836
let pg_class = db1_tables.iter().find(|t| t.name == "pg_class").unwrap();
849837
assert!(pg_class.sticky_routing);
850838

851-
// Test with system_catalogs_omnisharded = false
839+
// Test with system_catalogs = "sharded" (no omnisharding)
852840
let source_disabled = r#"
853841
[general]
854842
host = "0.0.0.0"
855843
port = 6432
856-
system_catalogs_omnisharded = false
844+
system_catalogs = "sharded"
857845
858846
[[databases]]
859847
name = "db1"

pgdog-config/src/general.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::path::PathBuf;
55
use std::time::Duration;
66

77
use crate::pooling::ConnectionRecovery;
8-
use crate::{QueryParserEngine, QueryParserLevel};
8+
use crate::{QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior};
99

1010
use super::auth::{AuthType, PassthoughAuth};
1111
use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy};
@@ -195,8 +195,8 @@ pub struct General {
195195
#[serde(default)]
196196
pub unique_id_min: u64,
197197
/// System catalogs are omnisharded?
198-
#[serde(default = "General::default_system_catalogs_omnisharded")]
199-
pub system_catalogs_omnisharded: bool,
198+
#[serde(default = "General::default_system_catalogs")]
199+
pub system_catalogs: SystemCatalogsBehavior,
200200
/// Omnisharded queries are sticky by default.
201201
#[serde(default)]
202202
pub omnisharded_sticky: bool,
@@ -268,7 +268,7 @@ impl Default for General {
268268
lsn_check_timeout: Self::lsn_check_timeout(),
269269
lsn_check_delay: Self::lsn_check_delay(),
270270
unique_id_min: u64::default(),
271-
system_catalogs_omnisharded: Self::default_system_catalogs_omnisharded(),
271+
system_catalogs: Self::default_system_catalogs(),
272272
omnisharded_sticky: bool::default(),
273273
}
274274
}
@@ -429,8 +429,8 @@ impl General {
429429
Self::env_or_default("PGDOG_SHUTDOWN_TIMEOUT", 60_000)
430430
}
431431

432-
fn default_system_catalogs_omnisharded() -> bool {
433-
Self::env_or_default("PGDOG_SYSTEM_CATALOGS_OMNISHARDED", true)
432+
fn default_system_catalogs() -> SystemCatalogsBehavior {
433+
Self::env_enum_or_default("PGDOG_SYSTEM_CATALOGS")
434434
}
435435

436436
fn default_shutdown_termination_timeout() -> Option<u64> {

pgdog-config/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod pooling;
1212
pub mod replication;
1313
pub mod rewrite;
1414
pub mod sharding;
15+
pub mod system_catalogs;
1516
pub mod url;
1617
pub mod users;
1718
pub mod util;
@@ -31,6 +32,7 @@ pub use pooling::{PoolerMode, PreparedStatements};
3132
pub use replication::*;
3233
pub use rewrite::{Rewrite, RewriteMode};
3334
pub use sharding::*;
35+
pub use system_catalogs::system_catalogs;
3436
pub use users::{Admin, Plugin, User, Users};
3537

3638
use std::time::Duration;
@@ -49,7 +51,7 @@ mod test {
4951

5052
#[test]
5153
fn test_max_duration() {
52-
assert!(MAX_DURATION > Duration::from_hours(24 * 7 * 52 * 100)); // 100 years
54+
assert!(MAX_DURATION > Duration::from_secs(24 * 7 * 52 * 100 * 3600)); // 100 years
5355
assert_eq!(MAX_DURATION.as_millis() as i64, i64::MAX);
5456

5557
#[derive(Serialize)]

pgdog-config/src/sharding.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::collections::HashMap;
33
use std::collections::{hash_map::DefaultHasher, HashSet};
44
use std::hash::{Hash, Hasher as StdHasher};
55
use std::path::PathBuf;
6+
use std::str::FromStr;
67
use tracing::{info, warn};
78

89
use super::error::Error;
@@ -314,3 +315,24 @@ pub enum QueryParserEngine {
314315
PgQueryProtobuf,
315316
PgQueryRaw,
316317
}
318+
319+
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
320+
#[serde(rename_all = "snake_case", deny_unknown_fields)]
321+
pub enum SystemCatalogsBehavior {
322+
Omnisharded,
323+
#[default]
324+
OmnishardedSticky,
325+
Sharded,
326+
}
327+
328+
impl FromStr for SystemCatalogsBehavior {
329+
type Err = ();
330+
fn from_str(s: &str) -> Result<Self, Self::Err> {
331+
Ok(match s.to_lowercase().as_str() {
332+
"omnisharded" => Self::Omnisharded,
333+
"omnisharded_sticky" => Self::OmnishardedSticky,
334+
"sharded" => Self::Sharded,
335+
_ => return Err(()),
336+
})
337+
}
338+
}

0 commit comments

Comments
 (0)