Skip to content

Commit a4bd987

Browse files
author
builder
committed
Basic Cursor Implementatuon
1 parent f6fa202 commit a4bd987

4 files changed

Lines changed: 347 additions & 1 deletion

File tree

rwf/src/model/cursor.rs

Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
use super::ToConnectionRequest;
2+
use super::{FromRow, Query, Row};
3+
use crate::{
4+
model::{Escape, Placeholders},
5+
prelude::*,
6+
};
7+
use std::{marker::PhantomData, sync::atomic::AtomicI64, time::Instant, vec};
8+
9+
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
10+
pub enum FetchDirection {
11+
NEXT,
12+
PRIOR,
13+
FIRST,
14+
LAST,
15+
ABSOLUTE(i64),
16+
RELATIVE(i64),
17+
FORWARD(i64),
18+
FORWARD_ALL,
19+
BACKWARD(i64),
20+
BACKWARD_ALL,
21+
}
22+
23+
impl Default for FetchDirection {
24+
fn default() -> Self {
25+
Self::NEXT
26+
}
27+
}
28+
29+
impl std::fmt::Display for FetchDirection {
30+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31+
match self {
32+
Self::NEXT => write!(f, "NEXT"),
33+
Self::PRIOR => write!(f, "PRIOR"),
34+
Self::FIRST => write!(f, "FIRST"),
35+
Self::LAST => write!(f, "LAST"),
36+
Self::ABSOLUTE(n) => write!(f, "{} {}", "ABSOLUTE", n),
37+
Self::RELATIVE(n) => write!(f, "{} {}", "RELATIVE", n),
38+
Self::FORWARD(n) => write!(f, "{} {}", "FORWARD", n),
39+
Self::FORWARD_ALL => write!(f, "FORWARD ALL"),
40+
Self::BACKWARD(n) => write!(f, "{} {}", "BACKWARD", n),
41+
Self::BACKWARD_ALL => write!(f, "BACKWARD ALL"),
42+
}
43+
}
44+
}
45+
46+
impl FetchDirection {
47+
pub fn expected_row_count(&self, row_count: &i64) -> bool {
48+
use FetchDirection::*;
49+
match self {
50+
ABSOLUTE(0) => 0.eq(row_count),
51+
NEXT | PRIOR | FIRST | LAST => 1.eq(row_count),
52+
RELATIVE(n) | ABSOLUTE(n) => 1.eq(row_count),
53+
54+
FORWARD(0) | BACKWARD(0) => 1.eq(row_count),
55+
FORWARD(n) | BACKWARD(n) => n.abs().eq(row_count),
56+
FORWARD_ALL | BACKWARD_ALL => 0.lt(row_count),
57+
}
58+
}
59+
pub fn to_position_update(&self, row_count: &i64) -> Self {
60+
use FetchDirection::*;
61+
if self.expected_row_count(row_count) {
62+
match self {
63+
FORWARD_ALL => FORWARD(*row_count),
64+
BACKWARD_ALL => BACKWARD(*row_count),
65+
fd => *fd,
66+
}
67+
} else {
68+
match self {
69+
NEXT | FIRST | LAST | PRIOR | FORWARD_ALL | BACKWARD_ALL => RELATIVE(0),
70+
ABSOLUTE(_) => FORWARD_ALL,
71+
RELATIVE(n) => {
72+
if 0.eq(n) {
73+
RELATIVE(0)
74+
} else if 0.lt(n) {
75+
FORWARD_ALL
76+
} else {
77+
BACKWARD_ALL
78+
}
79+
}
80+
FORWARD(n) => {
81+
if 0.le(n) {
82+
FORWARD(*row_count)
83+
} else {
84+
FORWARD(-*row_count)
85+
}
86+
}
87+
BACKWARD(n) => {
88+
if 0.le(n) {
89+
BACKWARD(*row_count)
90+
} else {
91+
BACKWARD(-*row_count)
92+
}
93+
}
94+
}
95+
}
96+
}
97+
}
98+
99+
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
100+
pub enum Sensitivity {
101+
INSENSITIVE,
102+
ASENSITIVE,
103+
SENSITIVE,
104+
}
105+
impl Default for Sensitivity {
106+
fn default() -> Self {
107+
Self::INSENSITIVE
108+
}
109+
}
110+
111+
impl std::fmt::Display for Sensitivity {
112+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113+
match self {
114+
Self::INSENSITIVE => write!(f, "INSENSITIVE"),
115+
Self::ASENSITIVE => write!(f, "ASENSITIVE"),
116+
Self::SENSITIVE => write!(f, "SENSITIVE"),
117+
}
118+
}
119+
}
120+
121+
impl ToSql for FetchDirection {
122+
fn to_sql(&self) -> String {
123+
format!(" {} ", self)
124+
}
125+
}
126+
127+
impl ToSql for Sensitivity {
128+
fn to_sql(&self) -> String {
129+
match self {
130+
Self::SENSITIVE => unimplemented!("Postgres has no support for SENSITIVE Cursors. SENSITIVE is only implemented for compability with the SQL Standard"),
131+
sensitivity => format!(" {} ", sensitivity.to_string())
132+
}
133+
}
134+
}
135+
136+
#[derive(Debug, Clone, Serialize, Deserialize)]
137+
pub struct DeclareCursor<T: FromRow + ?Sized = Row> {
138+
query: Query<T>,
139+
sensitivity: Sensitivity,
140+
hold: bool,
141+
scroll: bool,
142+
name: String,
143+
}
144+
145+
impl<T: FromRow + ?Sized> From<Query<T>> for DeclareCursor<T> {
146+
fn from(value: Query<T>) -> Self {
147+
match value {
148+
Query::Select(select) => {
149+
let sensitivity = if select.lock.is_locked() {
150+
Sensitivity::ASENSITIVE
151+
} else {
152+
Sensitivity::INSENSITIVE
153+
};
154+
let query = Query::Select(select);
155+
DeclareCursor {
156+
query,
157+
sensitivity,
158+
hold: false,
159+
scroll: false,
160+
name: String::new(),
161+
}
162+
}
163+
Query::Picked(picked) => {
164+
let sensitivity = if picked.select.lock.is_locked() {
165+
Sensitivity::ASENSITIVE
166+
} else {
167+
Sensitivity::INSENSITIVE
168+
};
169+
let query = Query::Picked(picked);
170+
DeclareCursor {
171+
query,
172+
sensitivity,
173+
hold: false,
174+
scroll: false,
175+
name: String::new(),
176+
}
177+
}
178+
Query::Raw {
179+
query,
180+
placeholders,
181+
} => DeclareCursor {
182+
query: Query::Raw {
183+
query,
184+
placeholders,
185+
},
186+
sensitivity: Sensitivity::INSENSITIVE,
187+
hold: false,
188+
scroll: false,
189+
name: String::new(),
190+
},
191+
_query => unimplemented!("Cursor are only defined for SELECT Queries"),
192+
}
193+
}
194+
}
195+
196+
impl<T: Model> DeclareCursor<T> {
197+
pub fn insensitive(mut self) -> Self {
198+
self.sensitivity = Sensitivity::INSENSITIVE;
199+
self
200+
}
201+
pub fn asensitive(mut self) -> Self {
202+
self.sensitivity = Sensitivity::ASENSITIVE;
203+
self
204+
}
205+
pub fn hold(mut self) -> Self {
206+
self.hold = !self.hold;
207+
self
208+
}
209+
pub fn scroll(mut self) -> Self {
210+
self.scroll = !self.scroll;
211+
self
212+
}
213+
pub fn name(mut self, name: impl ToString) -> Self {
214+
self.name = name.to_string();
215+
self
216+
}
217+
pub fn placeholders(&self) -> &Placeholders {
218+
self.query.get_placeholders()
219+
}
220+
}
221+
222+
impl<T: Model> ToSql for DeclareCursor<T> {
223+
fn to_sql(&self) -> String {
224+
format!(
225+
r#"DECLARE "{}" BINARY{}{} SCROLL CURSOR {} HOLD FOR {}"#,
226+
self.name.escape(),
227+
self.sensitivity.to_sql(),
228+
self.scroll.then(|| "").unwrap_or("NO"),
229+
self.hold.then(|| "WITH").unwrap_or("WITHOUT"),
230+
self.query.to_sql()
231+
)
232+
}
233+
}
234+
235+
#[derive(Debug)]
236+
pub struct ModelCursor<T: Model> {
237+
name: String,
238+
created: Instant,
239+
used: Instant,
240+
position: AtomicI64,
241+
fetched: AtomicI64,
242+
_marker: PhantomData<T>,
243+
}
244+
245+
struct Fetch<'a>
246+
where
247+
Self: Send + Sync,
248+
{
249+
fetch: bool,
250+
direction: FetchDirection,
251+
cursor: &'a mut dyn Cursor,
252+
}
253+
254+
impl<'a> ToSql for Fetch<'a> {
255+
fn to_sql(&self) -> String {
256+
format!(
257+
r#"{}{}FOR" {}""#,
258+
self.fetch.then(|| "FETCH").unwrap_or("MOVE"),
259+
self.direction.to_sql(),
260+
self.cursor.name().escape()
261+
)
262+
}
263+
}
264+
265+
pub trait Cursor: Sync + Send {
266+
fn name(&self) -> &str;
267+
fn created(&self) -> Instant;
268+
fn last_used(&self) -> Instant;
269+
fn position(&self) -> i64;
270+
fn fetched(&self) -> i64;
271+
fn update(&mut self, fd: FetchDirection, row_count: i64) -> () {
272+
self.update_used();
273+
self.update_fetched(row_count);
274+
self.update_position(fd.to_position_update(&row_count));
275+
}
276+
fn update_position(&mut self, fd: FetchDirection) -> () {
277+
use FetchDirection::*;
278+
let order = std::sync::atomic::Ordering::Relaxed;
279+
let cur = self.get_position_mut();
280+
match fd {
281+
NEXT => cur.fetch_add(1, order),
282+
PRIOR => cur.fetch_add(-1, order),
283+
FIRST => cur.swap(1, order),
284+
LAST => cur.swap(-1, order),
285+
ABSOLUTE(n) => cur.swap(n, order),
286+
RELATIVE(n) => cur.fetch_add(n, order),
287+
FORWARD(n) => cur.fetch_add(n, order),
288+
BACKWARD(n) => cur.fetch_add(-n, order),
289+
FORWARD_ALL => cur.swap(i64::MAX, order),
290+
BACKWARD_ALL => cur.swap(0, order),
291+
};
292+
}
293+
fn update_fetched(&mut self, count: i64) -> () {
294+
self.get_fetched_mut()
295+
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
296+
}
297+
fn update_used(&mut self) -> ();
298+
fn get_position_mut(&mut self) -> &mut AtomicI64;
299+
fn get_fetched_mut(&mut self) -> &mut AtomicI64;
300+
}
301+
302+
impl<T: Model + Send + Sync> Cursor for ModelCursor<T> {
303+
fn name(&self) -> &str {
304+
self.name.as_str()
305+
}
306+
fn created(&self) -> Instant {
307+
self.created
308+
}
309+
fn last_used(&self) -> Instant {
310+
self.used
311+
}
312+
fn position(&self) -> i64 {
313+
self.position.load(std::sync::atomic::Ordering::Relaxed)
314+
}
315+
fn fetched(&self) -> i64 {
316+
self.fetched.load(std::sync::atomic::Ordering::Relaxed)
317+
}
318+
fn update_used(&mut self) -> () {
319+
self.used = Instant::now();
320+
}
321+
fn get_position_mut(&mut self) -> &mut AtomicI64 {
322+
&mut self.position
323+
}
324+
fn get_fetched_mut(&mut self) -> &mut AtomicI64 {
325+
&mut self.fetched
326+
}
327+
}
328+
329+
#[async_trait]
330+
trait FetchInternal<'a>: Cursor + Send + Sync {
331+
async fn fetch_cursor(
332+
fetch: Fetch<'a>,
333+
req: impl ToConnectionRequest<'_> + Send,
334+
) -> Result<Vec<tokio_postgres::Row>, super::Error> {
335+
let conn = req.to_connection_request()?.connection().unwrap();
336+
337+
let rows = conn.query_cached(fetch.to_sql().as_str(), &[]).await?;
338+
fetch.cursor.update(fetch.direction, rows.len() as i64);
339+
Ok(rows)
340+
}
341+
}

rwf/src/model/lock.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ impl Lock {
1919
self.skip_locked = true;
2020
self
2121
}
22+
pub fn is_locked(&self) -> bool {
23+
self.lock
24+
}
2225
}
2326

2427
impl ToSql for Lock {

rwf/src/model/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tracing::{error, info};
1515
pub mod callbacks;
1616
pub mod column;
1717
pub mod combine;
18+
pub mod cursor;
1819
pub mod delete;
1920
pub mod error;
2021
pub mod escape;

rwf/src/prelude.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ pub use crate::http::{Cookie, CookieBuilder, Message, Method, Request, Response,
1616
pub use crate::job::{queue_async, queue_delay, Job};
1717
pub use crate::logging::Logger;
1818
pub use crate::model::{
19-
pool::ToConnectionRequest, FilterQuery, Migrations, Model, Pool, Scope, ToSql, ToValue,
19+
pool::ToConnectionRequest, prelude::WithQuery, CombinedQuery, FilterQuery, Migrations, Model,
20+
Pool, Scope, ToColumn, ToSql, ToValue,
2021
};
2122
pub use crate::view::{Template, ToTemplateValue, TurboStream};
2223

0 commit comments

Comments
 (0)