Skip to content

Commit 504fd9d

Browse files
authored
fix: apply query_timeout to entire client/server exhange (#755)
Apply `query_timeout` to entire communication exchange between client and server, not just individual server messages. Protects against servers that stopped receiving mid transaction.
1 parent f9b8777 commit 504fd9d

3 files changed

Lines changed: 29 additions & 25 deletions

File tree

pgdog/src/frontend/client/query_engine/multi_step/insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl<'a> InsertMulti<'a> {
6464
.await?;
6565

6666
while self.engine.backend.has_more_messages() {
67-
let message = self.engine.read_server_message(context).await?;
67+
let message = self.engine.read_server_message().await?;
6868

6969
if self.state.forward(&message)? {
7070
self.engine.process_server_message(context, message).await?;

pgdog/src/frontend/client/query_engine/multi_step/update.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ impl<'a> UpdateMulti<'a> {
172172
let mut checker = ForwardCheck::new(context.client_request);
173173

174174
while self.engine.backend.has_more_messages() {
175-
let message = self.engine.read_server_message(context).await?;
175+
let message = self.engine.read_server_message().await?;
176176
let code = message.code();
177177

178178
if code == 'E' {
@@ -202,7 +202,7 @@ impl<'a> UpdateMulti<'a> {
202202
.await?;
203203

204204
while self.engine.backend.has_more_messages() {
205-
let message = self.engine.read_server_message(context).await?;
205+
let message = self.engine.read_server_message().await?;
206206
self.engine.process_server_message(context, message).await?;
207207
}
208208

@@ -236,7 +236,7 @@ impl<'a> UpdateMulti<'a> {
236236
let mut rows = 0;
237237

238238
while self.engine.backend.has_more_messages() {
239-
let message = self.engine.read_server_message(context).await?;
239+
let message = self.engine.read_server_message().await?;
240240
match message.code() {
241241
'D' => {
242242
row.data_row = DataRow::try_from(message)?;

pgdog/src/frontend/client/query_engine/query.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,28 @@ impl QueryEngine {
5858
}
5959
}
6060

61+
match timeout(
62+
context.timeouts.query_timeout(&State::Active),
63+
self.client_server_exchange(context),
64+
)
65+
.await
66+
{
67+
Ok(response) => response?,
68+
Err(err) => {
69+
// Close the conn, it could be stuck executing a query
70+
// or dead.
71+
self.backend.force_close();
72+
return Err(err.into());
73+
}
74+
}
75+
76+
Ok(())
77+
}
78+
79+
async fn client_server_exchange(
80+
&mut self,
81+
context: &mut QueryEngineContext<'_>,
82+
) -> Result<(), Error> {
6183
match context.rewrite_result.take() {
6284
Some(RewriteResult::InsertSplit(requests)) => {
6385
multi_step::InsertMulti::from_engine(self, requests)
@@ -75,7 +97,7 @@ impl QueryEngine {
7597
&& !self.streaming
7698
&& !self.test_mode.enabled
7799
{
78-
let message = self.read_server_message(context).await?;
100+
let message = self.read_server_message().await?;
79101
self.process_server_message(context, message).await?;
80102
}
81103
}
@@ -90,26 +112,8 @@ impl QueryEngine {
90112
Ok(())
91113
}
92114

93-
pub async fn read_server_message(
94-
&mut self,
95-
context: &mut QueryEngineContext<'_>,
96-
) -> Result<Message, Error> {
97-
let message = match timeout(
98-
context.timeouts.query_timeout(&State::Active),
99-
self.backend.read(),
100-
)
101-
.await
102-
{
103-
Ok(response) => response?,
104-
Err(err) => {
105-
// Close the conn, it could be stuck executing a query
106-
// or dead.
107-
self.backend.force_close();
108-
return Err(err.into());
109-
}
110-
};
111-
112-
Ok(message)
115+
pub async fn read_server_message(&mut self) -> Result<Message, Error> {
116+
Ok(self.backend.read().await?)
113117
}
114118

115119
pub async fn process_server_message(

0 commit comments

Comments
 (0)