Skip to content

Commit 6884194

Browse files
authored
Annotate explain plans with routing decisions (#546)
* initial draft internal explain annotations * clean up formatting logic in query parser * explain plan annotation integration test * explain annotation testing * enable explain annotations only when expanded_explain flag set * Enable expanded_explain in integration tests * Tweak explain recorder * better integration debugging * less strict assertion :(
1 parent e63ed13 commit 6884194

17 files changed

Lines changed: 727 additions & 26 deletions

File tree

integration/pgdog.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ read_write_strategy = "aggressive"
1111
openmetrics_port = 9090
1212
openmetrics_namespace = "pgdog_"
1313
prepared_statements_limit = 500
14+
expanded_explain = true
1415
# dns_ttl = 15_000
1516
query_cache_limit = 500
1617
pub_sub_channel_size = 4098
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use rust::setup::connections_tokio;
2+
use tokio_postgres::SimpleQueryMessage;
3+
4+
#[tokio::test]
5+
async fn explain_routing_annotations_surface() -> Result<(), Box<dyn std::error::Error>> {
6+
let mut clients = connections_tokio().await;
7+
let sharded = clients.swap_remove(1);
8+
9+
for shard in [0, 1] {
10+
let drop = format!(
11+
"/* pgdog_shard: {} */ DROP TABLE IF EXISTS explain_avg_test",
12+
shard
13+
);
14+
sharded.simple_query(drop.as_str()).await.ok();
15+
}
16+
17+
for shard in [0, 1] {
18+
let create = format!(
19+
"/* pgdog_shard: {} */ CREATE TABLE explain_avg_test(price DOUBLE PRECISION)",
20+
shard
21+
);
22+
sharded.simple_query(create.as_str()).await?;
23+
}
24+
25+
sharded
26+
.simple_query(
27+
"/* pgdog_shard: 0 */ INSERT INTO explain_avg_test(price) VALUES (10.0), (14.0)",
28+
)
29+
.await?;
30+
sharded
31+
.simple_query(
32+
"/* pgdog_shard: 1 */ INSERT INTO explain_avg_test(price) VALUES (18.0), (22.0)",
33+
)
34+
.await?;
35+
36+
let rows = sharded
37+
.simple_query("EXPLAIN SELECT AVG(price) FROM explain_avg_test")
38+
.await?;
39+
40+
let mut plan_lines = vec![];
41+
for message in rows {
42+
if let SimpleQueryMessage::Row(row) = message {
43+
plan_lines.push(row.get(0).unwrap_or_default().to_string());
44+
}
45+
}
46+
47+
assert!(
48+
plan_lines.iter().any(|line| line.contains("Aggregate")),
49+
"missing Aggregate node in EXPLAIN output: {:?}",
50+
plan_lines
51+
);
52+
let routing_header = plan_lines
53+
.iter()
54+
.find(|line| line.contains("PgDog Routing:"));
55+
assert!(
56+
routing_header.is_some(),
57+
"missing PgDog Routing header in EXPLAIN output: {:?}",
58+
plan_lines
59+
);
60+
let summary_line = plan_lines
61+
.iter()
62+
.find(|line| line.contains("Summary:"))
63+
.cloned();
64+
assert!(
65+
summary_line
66+
.as_ref()
67+
.map(|line| line.contains("Summary: shard=all"))
68+
.unwrap_or(false),
69+
"unexpected summary line: {:?} (all lines: {:?})",
70+
summary_line,
71+
plan_lines
72+
);
73+
let broadcast_line = plan_lines
74+
.iter()
75+
.find(|line| line.contains("no sharding key matched"))
76+
.cloned();
77+
assert!(
78+
broadcast_line.is_some(),
79+
"missing broadcast note in EXPLAIN output: {:?}",
80+
plan_lines
81+
);
82+
83+
for shard in [0, 1] {
84+
let drop = format!(
85+
"/* pgdog_shard: {} */ DROP TABLE IF EXISTS explain_avg_test",
86+
shard
87+
);
88+
sharded.simple_query(drop.as_str()).await.ok();
89+
}
90+
91+
Ok(())
92+
}

integration/rust/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod avg;
33
pub mod ban;
44
pub mod cross_shard_disabled;
55
pub mod distinct;
6+
pub mod explain;
67
pub mod fake_transactions;
78
pub mod maintenance_mode;
89
pub mod notify;

pgdog/src/config/general.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ pub struct General {
156156
/// Two-phase commit automatic transactions.
157157
#[serde(default)]
158158
pub two_phase_commit_auto: Option<bool>,
159+
/// Enable expanded EXPLAIN output.
160+
#[serde(default = "General::expanded_explain")]
161+
pub expanded_explain: bool,
159162
}
160163

161164
impl Default for General {
@@ -211,6 +214,7 @@ impl Default for General {
211214
log_disconnections: Self::log_disconnections(),
212215
two_phase_commit: bool::default(),
213216
two_phase_commit_auto: None,
217+
expanded_explain: Self::expanded_explain(),
214218
server_lifetime: Self::server_lifetime(),
215219
}
216220
}
@@ -479,6 +483,10 @@ impl General {
479483
Self::env_bool_or_default("PGDOG_LOG_DISCONNECTIONS", true)
480484
}
481485

486+
pub fn expanded_explain() -> bool {
487+
Self::env_bool_or_default("PGDOG_EXPANDED_EXPLAIN", false)
488+
}
489+
482490
pub fn server_lifetime() -> u64 {
483491
Self::env_or_default(
484492
"PGDOG_SERVER_LIFETIME",

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
22
backend::pool::{Connection, Request},
3+
config::config,
34
frontend::{
45
client::query_engine::hooks::QueryEngineHooks,
56
router::{parser::Shard, Route},
@@ -32,6 +33,7 @@ pub mod unknown_command;
3233
#[cfg(test)]
3334
mod testing;
3435

36+
use self::query::ExplainResponseState;
3537
pub use context::QueryEngineContext;
3638
use notify_buffer::NotifyBuffer;
3739
pub use two_pc::phase::TwoPcPhase;
@@ -50,6 +52,7 @@ pub struct QueryEngine {
5052
set_route: Option<Route>,
5153
two_pc: TwoPc,
5254
notify_buffer: NotifyBuffer,
55+
pending_explain: Option<ExplainResponseState>,
5356
}
5457

5558
impl QueryEngine {
@@ -130,13 +133,21 @@ impl QueryEngine {
130133
// to have accurate timings between queries.
131134
self.backend.mirror(context.client_request);
132135

136+
self.pending_explain = None;
137+
133138
let command = self.router.command();
134-
let route = if let Some(ref route) = self.set_route {
139+
let mut route = if let Some(ref route) = self.set_route {
135140
route.clone()
136141
} else {
137142
command.route().clone()
138143
};
139144

145+
if let Some(trace) = route.take_explain() {
146+
if config().config.general.expanded_explain {
147+
self.pending_explain = Some(ExplainResponseState::new(trace));
148+
}
149+
}
150+
140151
// FIXME, we should not to copy route twice.
141152
context.client_request.route = Some(route.clone());
142153

pgdog/src/frontend/client/query_engine/query.rs

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use tokio::time::timeout;
22

33
use crate::{
4-
frontend::client::TransactionType,
4+
frontend::{client::TransactionType, router::parser::explain_trace::ExplainTrace},
55
net::{
6-
FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery, ToBytes,
7-
TransactionState,
6+
DataRow, FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery,
7+
RowDescription, ToBytes, TransactionState,
88
},
99
state::State,
1010
};
@@ -96,9 +96,35 @@ impl QueryEngine {
9696
self.streaming = message.streaming();
9797

9898
let code = message.code();
99+
let payload = if code == 'T' {
100+
Some(message.payload())
101+
} else {
102+
None
103+
};
99104
let mut message = message.backend();
100105
let has_more_messages = self.backend.has_more_messages();
101106

107+
if let Some(bytes) = payload {
108+
if let Some(state) = self.pending_explain.as_mut() {
109+
if let Ok(row_description) = RowDescription::from_bytes(bytes) {
110+
state.capture_row_description(row_description);
111+
} else {
112+
state.annotated = true;
113+
}
114+
}
115+
}
116+
117+
if code == 'C' {
118+
self.emit_explain_rows(context).await?;
119+
}
120+
121+
if code == 'E' {
122+
if let Some(state) = self.pending_explain.as_mut() {
123+
state.annotated = true;
124+
}
125+
self.pending_explain = None;
126+
}
127+
102128
// Messages that we need to send to the client immediately.
103129
// ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B)
104130
let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A')
@@ -185,6 +211,38 @@ impl QueryEngine {
185211
context.stream.send(&message).await?;
186212
}
187213

214+
if code == 'Z' {
215+
self.pending_explain = None;
216+
}
217+
218+
Ok(())
219+
}
220+
221+
async fn emit_explain_rows(
222+
&mut self,
223+
context: &mut QueryEngineContext<'_>,
224+
) -> Result<(), Error> {
225+
if let Some(state) = self.pending_explain.as_mut() {
226+
if !state.should_emit() {
227+
return Ok(());
228+
}
229+
230+
if state.row_description.is_none() {
231+
return Ok(());
232+
}
233+
234+
for line in state.lines.clone() {
235+
let mut row = DataRow::new();
236+
row.add(line);
237+
let message = row.message()?;
238+
let len = message.len();
239+
context.stream.send(&message).await?;
240+
self.stats.sent(len);
241+
}
242+
243+
state.annotated = true;
244+
}
245+
188246
Ok(())
189247
}
190248

@@ -310,3 +368,36 @@ impl QueryEngine {
310368
}
311369
}
312370
}
371+
372+
#[derive(Debug, Default, Clone)]
373+
pub(super) struct ExplainResponseState {
374+
lines: Vec<String>,
375+
row_description: Option<RowDescription>,
376+
annotated: bool,
377+
supported: bool,
378+
}
379+
380+
impl ExplainResponseState {
381+
pub fn new(trace: ExplainTrace) -> Self {
382+
Self {
383+
lines: trace.render_lines(),
384+
row_description: None,
385+
annotated: false,
386+
supported: false,
387+
}
388+
}
389+
390+
pub fn capture_row_description(&mut self, row_description: RowDescription) {
391+
self.supported = row_description.fields.len() == 1
392+
&& matches!(row_description.field(0).map(|f| f.type_oid), Some(25));
393+
if self.supported {
394+
self.row_description = Some(row_description);
395+
} else {
396+
self.annotated = true;
397+
}
398+
}
399+
400+
pub fn should_emit(&self) -> bool {
401+
self.supported && !self.annotated
402+
}
403+
}

pgdog/src/frontend/router/parser/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub struct QueryParserContext<'a> {
4444
pub(super) multi_tenant: &'a Option<MultiTenant>,
4545
/// Dry run enabled?
4646
pub(super) dry_run: bool,
47+
/// Expanded EXPLAIN annotations enabled?
48+
pub(super) expanded_explain: bool,
4749
}
4850

4951
impl<'a> QueryParserContext<'a> {
@@ -61,6 +63,7 @@ impl<'a> QueryParserContext<'a> {
6163
pub_sub_enabled: config.config.general.pub_sub_enabled(),
6264
multi_tenant: router_context.cluster.multi_tenant(),
6365
dry_run: config.config.general.dry_run,
66+
expanded_explain: config.config.general.expanded_explain,
6467
router_context,
6568
}
6669
}
@@ -136,4 +139,8 @@ impl<'a> QueryParserContext<'a> {
136139
params,
137140
}
138141
}
142+
143+
pub(super) fn expanded_explain(&self) -> bool {
144+
self.expanded_explain
145+
}
139146
}

0 commit comments

Comments
 (0)