Skip to content

Commit 38eccf3

Browse files
authored
feat: add 53-bit unique ID generator & database-generated sharded sequences (#858)
### Smaller unique ID _This feature is still experimental and subject to change_. Add support for 53-bit unique ID. This makes them smaller and "javascript-safe" in case identifiers are exposed as integers to the app API. This is configurable thourough `pgdog.toml`: ```toml [general] unique_id_function = "compact" # or "standard", which is default ``` Trade-offs: it only allows 64 pgdog nodes and 64,000 IDs generated _per second_ per node. This is considerably lower than the standard 64-bit unique ID. It should be used for omnisharded tables only with a low write frequency. ### Database-generated sharded sequences Add the ability to generate cross-shard unique IDs using sequences. This produces smaller starting integers, but can only work with direct-to-shard `INSERT`s. For omnisharded tables, the unique ID function should continue to be used. This is configurable through `pgdog.toml`: ```toml [rewrite] primary_key = "rewrite_omni" # Will only inject Unique ID to omnisharded table inserts, leaving others to be # generated by the database instead. ``` The sharded sequence is installed automatically on all tables with a `BIGINT` primary key when running `pgdog setup` or `SETUP SCHEMA` via the admin DB. ### Identity columns `GENERATED ... WITH IDENTITY` is now removed entirely during schema sync. This constraint blocks our implementation of replication and doesn't work with sharded databases, incl. with our unique ID generator because it prevents the column value from being inserted by the query. ### Smaller features - Add `RESET PREPARED` admin command which evicts all unused prepared statements from the global cache - log any notice/warning received from Postgres when executing queries via `execute_checked` ### Bug fixes - `unique_id_min` setting was being ignored by the unique ID generator. This didn't affect any deployments because unique IDs start very large already. - Admin command `SETUP SCHEMA` was applied to all database/user pairs; it's now applied to the `schema_owner` only
1 parent d2cb052 commit 38eccf3

31 files changed

Lines changed: 1112 additions & 285 deletions

File tree

.schema/pgdog.schema.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
"tls_verify": "prefer",
103103
"two_phase_commit": false,
104104
"two_phase_commit_auto": null,
105+
"unique_id_function": "standard",
105106
"unique_id_min": 0,
106107
"workers": 2
107108
}
@@ -1025,6 +1026,11 @@
10251026
],
10261027
"default": null
10271028
},
1029+
"unique_id_function": {
1030+
"description": "Unique ID generation function.",
1031+
"$ref": "#/$defs/UniqueIdFunction",
1032+
"default": "standard"
1033+
},
10281034
"unique_id_min": {
10291035
"description": "Minimum ID for unique ID generator.",
10301036
"type": "integer",
@@ -1495,6 +1501,11 @@
14951501
"description": "Automatically rewrite the query and execute it.",
14961502
"type": "string",
14971503
"const": "rewrite"
1504+
},
1505+
{
1506+
"description": "Rewrite only for omnisharded tables.",
1507+
"type": "string",
1508+
"const": "rewrite_omni"
14981509
}
14991510
]
15001511
},
@@ -1817,6 +1828,20 @@
18171828
}
18181829
]
18191830
},
1831+
"UniqueIdFunction": {
1832+
"oneOf": [
1833+
{
1834+
"description": "Standard 64-bit function using the entire 64-bit range.",
1835+
"type": "string",
1836+
"const": "standard"
1837+
},
1838+
{
1839+
"description": "Compact function using the leftest 53-bit range, making it\nJavaScript-safe, so you can pass it as an integer directly\nto the frontend apps.\n\nThe year is 2026 and JavaScript continues to be a pain in the ass.",
1840+
"type": "string",
1841+
"const": "compact"
1842+
}
1843+
]
1844+
},
18201845
"Vector": {
18211846
"type": "object",
18221847
"properties": {

integration/copy_data/pgdog.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ name = "source"
99
host = "127.0.0.1"
1010
database_name = "pgdog"
1111

12+
[[sharded_tables]]
13+
database = "source"
14+
column = "tenant_id"
15+
data_type = "bigint"
16+
1217
#
1318
# Reshard 0 -> 2
1419
#

integration/copy_data/setup.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ CREATE TABLE copy_data.with_identity(
4545
tenant_id BIGINT NOT NULL
4646
);
4747

48+
4849
-- Omni (non-sharded) tables: no tenant_id column.
4950
CREATE TABLE IF NOT EXISTS copy_data.countries (
5051
id BIGSERIAL PRIMARY KEY,
@@ -65,6 +66,12 @@ CREATE TABLE IF NOT EXISTS copy_data.categories (
6566
parent_id INT
6667
);
6768

69+
CREATE TABLE copy_data.settings (
70+
id BIGSERIAL PRIMARY KEY,
71+
setting_name TEXT NOT NULL UNIQUE,
72+
setting_value TEXT NOT NULL
73+
);
74+
6875
DROP PUBLICATION IF EXISTS pgdog;
6976
CREATE PUBLICATION pgdog FOR TABLES IN SCHEMA copy_data;
7077

integration/pgdog.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ query_parser_engine = "pg_query_raw"
2424
system_catalogs = "omnisharded_sticky"
2525
reload_schema_on_ddl = false
2626
#idle_healthcheck_delay = 50000000
27-
27+
unique_id_function = "standard"
2828

2929
[memory]
3030
net_buffer = 8096

integration/rust/src/setup.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,19 @@ pub async fn connections_sqlx() -> Vec<Pool<Postgres>> {
4545
}
4646

4747
pub async fn connection_sqlx_direct() -> Pool<Postgres> {
48+
connection_sqlx_direct_db("pgdog").await
49+
}
50+
51+
pub async fn connection_sqlx_direct_db(name: &str) -> Pool<Postgres> {
4852
PgPoolOptions::new()
4953
.max_connections(1)
50-
.connect("postgres://pgdog:pgdog@127.0.0.1:5432/pgdog?application_name=sqlx_direct")
54+
.connect(
55+
format!(
56+
"postgres://pgdog:pgdog@127.0.0.1:5432/{}?application_name=sqlx_direct",
57+
name
58+
)
59+
.as_str(),
60+
)
5161
.await
5262
.unwrap()
5363
}

integration/rust/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod prepared;
2222
pub mod reload;
2323
pub mod reset;
2424
pub mod rewrite;
25+
pub mod rewrite_omni;
2526
pub mod savepoint;
2627
pub mod set_in_transaction;
2728
pub mod set_sharding_key;
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use rust::setup::*;
2+
use sqlx::Executor;
3+
4+
#[tokio::test]
5+
async fn test_omni_only_pk_rewrite() {
6+
let shard_0 = connection_sqlx_direct_db("shard_0").await;
7+
let shard_1 = connection_sqlx_direct_db("shard_1").await;
8+
9+
for (shard, pool) in [&shard_0, &shard_1].iter().enumerate() {
10+
pool.execute(
11+
"DROP TABLE IF EXISTS public.test_omni_rewrite_pk_omni, test_omni_rewrite_pk_sharded CASCADE",
12+
)
13+
.await
14+
.unwrap();
15+
16+
pool.execute("CREATE TABLE IF NOT EXISTS public.test_omni_rewrite_pk_omni(id BIGSERIAL PRIMARY KEY, value TEXT NOT NULL)").await.unwrap();
17+
pool.execute("CREATE TABLE IF NOT EXISTS public.test_omni_rewrite_pk_sharded(id BIGSERIAL PRIMARY KEY, customer_id BIGINT NOT NULL, value TEXT NOT NULL)").await.unwrap();
18+
19+
pool.execute(
20+
"SELECT pgdog.install_sharded_sequence('public', 'test_omni_rewrite_pk_sharded', 'id')",
21+
)
22+
.await
23+
.unwrap();
24+
25+
// Configure sharding.
26+
let mut t = pool.begin().await.unwrap();
27+
t.execute("DELETE FROM pgdog.config").await.unwrap();
28+
t.execute(
29+
format!(
30+
"INSERT INTO pgdog.config (shard, shards) VALUES ({}, 2)",
31+
shard
32+
)
33+
.as_str(),
34+
)
35+
.await
36+
.unwrap();
37+
t.commit().await.unwrap();
38+
}
39+
40+
let sharded = connections_sqlx().await.pop().unwrap();
41+
let admin = admin_sqlx().await;
42+
43+
// This will reload the schema as well.
44+
admin
45+
.execute("SET rewrite_primary_key TO 'rewrite_omni'")
46+
.await
47+
.unwrap();
48+
49+
let starting_id: (i64,) = sqlx::query_as("SELECT pgdog.unique_id()")
50+
.fetch_one(&sharded)
51+
.await
52+
.unwrap();
53+
54+
for run in 0..25 {
55+
let omni_id: (i64,) = sqlx::query_as(
56+
"INSERT INTO public.test_omni_rewrite_pk_omni (value) VALUES ($1) RETURNING id",
57+
)
58+
.bind(format!("test_{}", run))
59+
.fetch_one(&sharded)
60+
.await
61+
.unwrap();
62+
63+
assert!(
64+
omni_id.0 > starting_id.0,
65+
"omni ID should be unique_id, but got {}",
66+
omni_id.0
67+
);
68+
69+
let sharded_id: (i64,) = sqlx::query_as(
70+
"INSERT INTO public.test_omni_rewrite_pk_sharded (customer_id, value) VALUES ($1, $2) RETURNING id",
71+
)
72+
.bind(run as i64)
73+
.bind(format!("test_{}", run))
74+
.fetch_one(&sharded)
75+
.await
76+
.unwrap();
77+
78+
assert!(
79+
sharded_id.0 < omni_id.0,
80+
"sharded ID should not be unique_id, but got {}",
81+
sharded_id.0
82+
);
83+
}
84+
85+
sharded.close().await;
86+
let sharded = connections_sqlx().await.pop().unwrap();
87+
88+
// Re-enable sharded unique ID.
89+
admin
90+
.execute("SET rewrite_primary_key TO 'rewrite'")
91+
.await
92+
.unwrap();
93+
94+
// The rewrite is cached in prepared statements
95+
// and the query cache, so we need to be careful to rest it
96+
// _after_ the client disconnected. In production, changing this setting
97+
// definitely requires a restart.
98+
admin.execute("RESET QUERY_CACHE").await.unwrap();
99+
admin.execute("RESET PREPARED").await.unwrap();
100+
101+
for run in 25..50 {
102+
let omni_id: (i64,) = sqlx::query_as(
103+
"INSERT INTO public.test_omni_rewrite_pk_omni (value) VALUES ($1) RETURNING id",
104+
)
105+
.bind(format!("test_{}", run))
106+
.fetch_one(&sharded)
107+
.await
108+
.unwrap();
109+
110+
assert!(
111+
omni_id.0 > starting_id.0,
112+
"omni ID should be unique_id, but got {}",
113+
omni_id.0
114+
);
115+
116+
let sharded_id: (i64,) = sqlx::query_as(
117+
"INSERT INTO public.test_omni_rewrite_pk_sharded (customer_id, value) VALUES ($1, $2) RETURNING id",
118+
)
119+
.bind(run as i64)
120+
.bind(format!("test_{}", run))
121+
.fetch_one(&sharded)
122+
.await
123+
.unwrap();
124+
125+
assert!(
126+
sharded_id.0 > omni_id.0,
127+
"sharded ID should be unique_id, but got {}",
128+
sharded_id.0
129+
);
130+
}
131+
}

integration/schema_sync/dev.sh

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,25 @@ EOF
8181

8282
diff source.sql destination.sql > diff.txt || true
8383

84-
# Extract column name and type from diff lines, ignoring everything else
85-
# This normalizes across different PG versions and constraint syntaxes
86-
ACTUAL_CONVERSIONS=$(grep '^[<>]' diff.txt | \
87-
grep -E '\b(integer|bigint)\b' | \
88-
sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+(integer|bigint).*/\1 \2/' | \
89-
sort -u)
84+
# Extract integer -> bigint conversions from the diff
85+
# 1. Get removed (< integer) and added (> bigint) column lines
86+
# 2. Only keep columns that appear as integer in source AND bigint in destination
87+
REMOVED_INT=$(grep '^<' diff.txt | \
88+
sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+integer\b.*/\1/' | \
89+
grep -E '^[a-z_]+$' | sort -u)
90+
91+
ADDED_BIGINT=$(grep '^>' diff.txt | \
92+
sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+bigint\b.*/\1/' | \
93+
grep -E '^[a-z_]+$' | sort -u)
94+
95+
# Columns that changed from integer to bigint
96+
CONVERTED=$(comm -12 <(echo "$REMOVED_INT") <(echo "$ADDED_BIGINT"))
97+
98+
# Build the expected format: column_name integer \n column_name bigint
99+
ACTUAL_CONVERSIONS=$(echo "$CONVERTED" | while read col; do
100+
echo "$col integer"
101+
echo "$col bigint"
102+
done | sort -u)
90103

91104
EXPECTED_SORTED=$(echo "$EXPECTED_CONVERSIONS" | sort -u)
92105

integration/schema_sync/pgdog.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,7 @@ database_name = "pgdog1"
1414
name = "destination"
1515
host = "127.0.0.1"
1616
database_name = "pgdog2"
17+
18+
[[sharded_tables]]
19+
database = "destination"
20+
column = "user_id"

pgdog-config/src/general.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::str::FromStr;
88
use std::time::Duration;
99

1010
use crate::pooling::ConnectionRecovery;
11+
use crate::UniqueIdFunction;
1112
use crate::{
1213
CopyFormat, CutoverTimeoutAction, LoadSchema, QueryParserEngine, QueryParserLevel,
1314
SystemCatalogsBehavior,
@@ -545,6 +546,10 @@ pub struct General {
545546
#[serde(default)]
546547
pub unique_id_min: u64,
547548

549+
/// Unique ID generation function.
550+
#[serde(default)]
551+
pub unique_id_function: UniqueIdFunction,
552+
548553
/// Changes how system catalog tables (like `pg_database`, `pg_class`, etc.) are treated by the query router.
549554
///
550555
/// _Default:_ `omnisharded_sticky`
@@ -718,6 +723,7 @@ impl Default for General {
718723
cutover_timeout: Self::cutover_timeout(),
719724
cutover_timeout_action: Self::cutover_timeout_action(),
720725
cutover_save_config: bool::default(),
726+
unique_id_function: Self::unique_id_function(),
721727
}
722728
}
723729
}
@@ -817,6 +823,10 @@ impl General {
817823
Self::env_or_default("PGDOG_BAN_REPLICA_LAG_BYTES", i64::MAX as u64)
818824
}
819825

826+
fn unique_id_function() -> UniqueIdFunction {
827+
Self::env_enum_or_default("PGDOG_UNIQUE_ID_FUNCTION")
828+
}
829+
820830
fn cutover_replication_lag_threshold() -> u64 {
821831
Self::env_or_default("PGDOG_CUTOVER_REPLICATION_LAG_THRESHOLD", 0)
822832
// 0 bytes

0 commit comments

Comments
 (0)