Skip to content

Commit 70d4839

Browse files
authored
fix: oid is u32, not i32 (#873)
Possibly fixes #847. We incorrectly implemented `oid` as `i32`, but it's actually `u32`, so OIDs over 2.2B would be read as `0`. On very busy databases that constantly create/destroy tables/columns, this would overflow. OIDs are not guaranteed to be unique and it gets pretty bad when the oid search space gets small (i.e. very large tables), but we still want to support large OIDs since they are valid.
1 parent afb753d commit 70d4839

12 files changed

Lines changed: 280 additions & 55 deletions

File tree

pgdog-postgres-types/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod integer;
1111
pub mod interface;
1212
pub mod interval;
1313
pub mod numeric;
14+
pub mod oid;
1415
pub mod text;
1516
pub mod timestamp;
1617
pub mod timestamptz;
@@ -27,5 +28,6 @@ pub use format::Format;
2728
pub use interface::{FromDataType, ToDataRowColumn};
2829
pub use interval::Interval;
2930
pub use numeric::Numeric;
31+
pub use oid::Oid;
3032
pub use timestamp::Timestamp;
3133
pub use timestamptz::TimestampTz;

pgdog-postgres-types/src/oid.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
//! PostgreSQL `oid` data type.
2+
3+
use super::*;
4+
use bytes::{Buf, Bytes};
5+
use std::fmt;
6+
7+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
8+
pub struct Oid(pub u32);
9+
10+
impl Oid {
11+
pub const fn new(value: u32) -> Self {
12+
Self(value)
13+
}
14+
15+
pub const fn get(self) -> u32 {
16+
self.0
17+
}
18+
}
19+
20+
impl fmt::Display for Oid {
21+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22+
write!(f, "{}", self.0)
23+
}
24+
}
25+
26+
impl From<u32> for Oid {
27+
fn from(value: u32) -> Self {
28+
Self(value)
29+
}
30+
}
31+
32+
impl From<Oid> for u32 {
33+
fn from(value: Oid) -> Self {
34+
value.0
35+
}
36+
}
37+
38+
impl FromDataType for Oid {
39+
fn decode(bytes: &[u8], encoding: Format) -> Result<Self, Error> {
40+
match encoding {
41+
Format::Binary => {
42+
let bytes: [u8; 4] = bytes.try_into()?;
43+
Ok(Self(bytes.as_slice().get_u32()))
44+
}
45+
46+
Format::Text => {
47+
let s = String::decode(bytes, Format::Text)?;
48+
Ok(Self(s.parse()?))
49+
}
50+
}
51+
}
52+
53+
fn encode(&self, encoding: Format) -> Result<Bytes, Error> {
54+
match encoding {
55+
Format::Text => Ok(Bytes::copy_from_slice(self.0.to_string().as_bytes())),
56+
Format::Binary => Ok(Bytes::copy_from_slice(&self.0.to_be_bytes())),
57+
}
58+
}
59+
}
60+
61+
impl ToDataRowColumn for Oid {
62+
fn to_data_row_column(&self) -> Data {
63+
Bytes::copy_from_slice(self.0.to_string().as_bytes()).into()
64+
}
65+
}
66+
67+
#[cfg(test)]
68+
mod test {
69+
use super::*;
70+
71+
#[test]
72+
fn test_oid_text_round_trip_small() {
73+
let oid = Oid(16384); // typical user-table oid
74+
let encoded = oid.encode(Format::Text).unwrap();
75+
let decoded = Oid::decode(&encoded, Format::Text).unwrap();
76+
assert_eq!(oid, decoded);
77+
}
78+
79+
#[test]
80+
fn test_oid_text_round_trip_above_i32_max() {
81+
// Regression for issue #847: pg_class.oid can exceed i32::MAX in
82+
// long-lived databases. Parsing as i32 used to silently produce 0.
83+
let oid = Oid(2_500_000_000);
84+
let encoded = oid.encode(Format::Text).unwrap();
85+
assert_eq!(&encoded[..], b"2500000000");
86+
let decoded = Oid::decode(&encoded, Format::Text).unwrap();
87+
assert_eq!(oid, decoded);
88+
}
89+
90+
#[test]
91+
fn test_oid_text_round_trip_max() {
92+
let oid = Oid(u32::MAX);
93+
let encoded = oid.encode(Format::Text).unwrap();
94+
let decoded = Oid::decode(&encoded, Format::Text).unwrap();
95+
assert_eq!(oid, decoded);
96+
}
97+
98+
#[test]
99+
fn test_oid_binary_round_trip() {
100+
for value in [0u32, 1, 16384, 2_147_483_647, 2_147_483_648, u32::MAX] {
101+
let oid = Oid(value);
102+
let encoded = oid.encode(Format::Binary).unwrap();
103+
assert_eq!(encoded.len(), 4);
104+
let decoded = Oid::decode(&encoded, Format::Binary).unwrap();
105+
assert_eq!(oid, decoded);
106+
}
107+
}
108+
109+
#[test]
110+
fn test_oid_decode_text_invalid() {
111+
// Non-numeric text should error, not silently produce 0.
112+
let result = Oid::decode(b"not-a-number", Format::Text);
113+
assert!(matches!(result, Err(Error::NotInteger(_))));
114+
}
115+
116+
#[test]
117+
fn test_oid_decode_text_negative_rejected() {
118+
// Negative values are not valid OIDs.
119+
let result = Oid::decode(b"-1", Format::Text);
120+
assert!(matches!(result, Err(Error::NotInteger(_))));
121+
}
122+
123+
#[test]
124+
fn test_oid_decode_binary_wrong_size() {
125+
let result = Oid::decode(&[0u8; 3], Format::Binary);
126+
assert!(result.is_err());
127+
}
128+
129+
#[test]
130+
fn test_oid_display() {
131+
assert_eq!(Oid(0).to_string(), "0");
132+
assert_eq!(Oid(2_500_000_000).to_string(), "2500000000");
133+
}
134+
135+
#[test]
136+
fn test_oid_default_is_zero() {
137+
assert_eq!(Oid::default(), Oid(0));
138+
}
139+
}

pgdog/src/backend/replication/buffer.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use fnv::FnvHashMap as HashMap;
22
use fnv::FnvHashSet as HashSet;
3+
use pgdog_postgres_types::Oid;
34
use std::collections::VecDeque;
45

56
use crate::backend::ShardingSchema;
@@ -23,10 +24,10 @@ pub struct Buffer {
2324
replication_config: ReplicationConfig,
2425
begin: Option<XLogData>,
2526
message: Option<XLogData>,
26-
relations: HashMap<i32, Relation>,
27-
sent_relations: HashSet<i32>,
27+
relations: HashMap<Oid, Relation>,
28+
sent_relations: HashSet<Oid>,
2829
shard: Shard,
29-
oid: Option<i32>,
30+
oid: Option<Oid>,
3031
buffer: VecDeque<Message>,
3132
sharding_schema: ShardingSchema,
3233
}
@@ -163,7 +164,7 @@ impl Buffer {
163164
Ok(())
164165
}
165166

166-
fn sharding_key(&self, oid: i32) -> Result<(&str, Vec<&str>), Error> {
167+
fn sharding_key(&self, oid: Oid) -> Result<(&str, Vec<&str>), Error> {
167168
let relation = self.relations.get(&oid).ok_or(Error::NoRelationMessage)?;
168169
let columns = relation.columns();
169170
let name = relation.name();

pgdog/src/backend/replication/logical/publisher/queries.rs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
//!
66
use std::fmt::Display;
77

8+
use pgdog_postgres_types::Oid;
9+
810
use crate::{
911
backend::Server,
1012
net::{DataRow, Format},
@@ -101,7 +103,7 @@ ON (c.relnamespace = n.oid) WHERE n.nspname = $1 AND c.relname = $2";
101103
/// Identifies the columns part of the replica identity for a table.
102104
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
103105
pub struct ReplicaIdentity {
104-
pub oid: i32,
106+
pub oid: Oid,
105107
pub identity: String,
106108
pub kind: String,
107109
}
@@ -148,9 +150,11 @@ FROM
148150

149151
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
150152
pub struct PublicationTableColumn {
153+
/// Column number (`pg_attribute.attnum`). Despite the name, this is not an OID.
151154
pub oid: i32,
152155
pub name: String,
153-
pub type_oid: i32,
156+
/// Type OID (`pg_attribute.atttypid`).
157+
pub type_oid: Oid,
154158
pub identity: bool,
155159
}
156160

@@ -183,6 +187,74 @@ mod test {
183187

184188
use super::*;
185189

190+
#[test]
191+
fn test_replica_identity_decodes_oid_above_i32_max() {
192+
// Regression for issue #847: pg_class.oid is unsigned 32-bit, and in
193+
// long-lived databases it routinely exceeds i32::MAX. Decoding such an
194+
// OID as i32 used to silently produce 0, which caused PgDog to send
195+
// queries against OID 0 and trigger Postgres' "could not open relation
196+
// with OID 0" error. With Oid (u32), this round-trips correctly.
197+
let mut row = DataRow::new();
198+
row.add(Oid(2_500_000_000))
199+
.add("d".to_string())
200+
.add("r".to_string());
201+
let identity = ReplicaIdentity::from(row);
202+
assert_eq!(identity.oid, Oid(2_500_000_000));
203+
assert_eq!(identity.identity, "d");
204+
assert_eq!(identity.kind, "r");
205+
}
206+
207+
#[test]
208+
fn test_replica_identity_substitutes_high_oid_into_columns_query() {
209+
// The COLUMNS query embeds identity.oid as text via Display. Verify
210+
// that a high OID renders as an unsigned decimal, not a negative i32.
211+
let identity = ReplicaIdentity {
212+
oid: Oid(2_500_000_000),
213+
identity: "d".to_string(),
214+
kind: "r".to_string(),
215+
};
216+
let rendered = COLUMNS
217+
.replace("$1", &identity.oid.to_string())
218+
.replace("$2", &identity.oid.to_string());
219+
assert!(rendered.contains("pg_get_replica_identity_index(2500000000)"));
220+
assert!(rendered.contains("a.attrelid = 2500000000"));
221+
assert!(!rendered.contains("(0)"));
222+
assert!(!rendered.contains("= 0 "));
223+
}
224+
225+
#[test]
226+
fn test_publication_table_column_decodes_high_type_oid() {
227+
// pg_attribute.atttypid is also of type oid; user-defined types in
228+
// long-lived databases can exceed i32::MAX.
229+
let mut row = DataRow::new();
230+
row.add(1_i64.to_string()) // attnum
231+
.add("col".to_string())
232+
.add(Oid(3_000_000_000)) // atttypid
233+
.add("t".to_string()); // identity bool
234+
let column = PublicationTableColumn::from(row);
235+
assert_eq!(column.type_oid, Oid(3_000_000_000));
236+
assert_eq!(column.name, "col");
237+
assert!(column.identity);
238+
}
239+
240+
#[tokio::test]
241+
async fn test_oid_above_i32_max_round_trips_from_postgres() {
242+
// Regression for issue #847: ask Postgres to emit a real `oid` value
243+
// above i32::MAX on the wire and verify our text-format decode handles
244+
// it. With the previous i32-based parser this returned 0; with Oid(u32)
245+
// the value round-trips correctly.
246+
let mut server = test_server().await;
247+
let rows: Vec<DataRow> = server
248+
.fetch_all("SELECT 2500000000::oid, 4294967295::oid")
249+
.await
250+
.unwrap();
251+
assert_eq!(rows.len(), 1);
252+
let high: Oid = rows[0].get(0, Format::Text).unwrap();
253+
let max: Oid = rows[0].get(1, Format::Text).unwrap();
254+
assert_eq!(high, Oid(2_500_000_000));
255+
assert_eq!(max, Oid(u32::MAX));
256+
}
257+
186258
#[tokio::test]
187259
async fn test_logical_publisher_queries() {
188260
let mut server = test_server().await;

pgdog/src/backend/replication/logical/publisher/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ mod test {
306306
parent_name: "".to_string(),
307307
},
308308
identity: ReplicaIdentity {
309-
oid: 1,
309+
oid: pgdog_postgres_types::Oid(1),
310310
identity: "".to_string(),
311311
kind: "".to_string(),
312312
},
@@ -315,7 +315,7 @@ mod test {
315315
.map(|(name, identity)| PublicationTableColumn {
316316
oid: 1,
317317
name: name.to_string(),
318-
type_oid: 23,
318+
type_oid: pgdog_postgres_types::Oid(23),
319319
identity,
320320
})
321321
.collect(),

pgdog/src/backend/replication/logical/subscriber/stream.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use pg_query::{
1616
NodeEnum,
1717
};
1818
use pgdog_config::QueryParserEngine;
19+
use pgdog_postgres_types::Oid;
1920
use tracing::{debug, trace};
2021

2122
use super::super::{publisher::Table, Error};
@@ -123,19 +124,19 @@ pub struct StreamSubscriber {
123124

124125
// Relation markers sent by the publisher.
125126
// Happens once per connection.
126-
relations: HashMap<i32, Relation>,
127+
relations: HashMap<Oid, Relation>,
127128

128129
// Tables in the publication on the publisher.
129130
tables: HashMap<Key, Table>,
130131

131132
// Statements
132-
statements: HashMap<i32, Statements>,
133+
statements: HashMap<Oid, Statements>,
133134

134135
// Partitioned tables dedup.
135136
partitioned_dedup: HashSet<Key>,
136137

137138
// LSNs for each table
138-
table_lsns: HashMap<i32, i64>,
139+
table_lsns: HashMap<Oid, i64>,
139140

140141
// Connections to shards.
141142
connections: Vec<Server>,
@@ -388,7 +389,7 @@ impl StreamSubscriber {
388389
Ok(())
389390
}
390391

391-
pub(crate) fn lsn_applied(&self, oid: &i32) -> bool {
392+
pub(crate) fn lsn_applied(&self, oid: &Oid) -> bool {
392393
if let Some(table_lsn) = self.table_lsns.get(oid) {
393394
// Don't apply change if table is ahead.
394395
if self.lsn < *table_lsn {

0 commit comments

Comments
 (0)