Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,40 @@ impl Client {
rx.recv()?
}

/// Invokes the provided function with a [`rusqlite::Connection`],
/// blocking the current thread until completion.
///
/// Maps the result error type to a custom error; designed to be
/// used in conjunction with [`query_and_then`](https://docs.rs/rusqlite/latest/rusqlite/struct.CachedStatement.html#method.query_and_then).
pub fn conn_and_then_blocking<F, T, E>(&self, func: F) -> Result<T, E>
where
F: FnOnce(&Connection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: From<rusqlite::Error> + From<Error> + Send + 'static,
{
let rx = self
.enqueue_blocking(move |conn| run_catching_and_then(conn, |conn| func(conn)))
.map_err(Error::from)?;
rx.recv().map_err(Error::from)?
}

/// Invokes the provided function with a mutable [`rusqlite::Connection`],
/// blocking the current thread until completion.
///
/// Maps the result error type to a custom error; designed to be
/// used in conjunction with [`query_and_then`](https://docs.rs/rusqlite/latest/rusqlite/struct.CachedStatement.html#method.query_and_then).
pub fn conn_mut_and_then_blocking<F, T, E>(&self, func: F) -> Result<T, E>
where
F: FnOnce(&mut Connection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: From<rusqlite::Error> + From<Error> + Send + 'static,
{
let rx = self
.enqueue_blocking(move |conn| run_catching_and_then(conn, func))
.map_err(Error::from)?;
rx.recv().map_err(Error::from)?
}

/// Closes the underlying sqlite connection, blocking the current thread
/// until complete.
///
Expand Down
54 changes: 54 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,32 @@ impl Pool {
self.get().conn_mut(func).await
}

/// Invokes the provided function with a [`rusqlite::Connection`].
///
/// Maps the result error type to a custom error; designed to be
/// used in conjunction with [`query_and_then`](https://docs.rs/rusqlite/latest/rusqlite/struct.CachedStatement.html#method.query_and_then).
pub async fn conn_and_then<F, T, E>(&self, func: F) -> Result<T, E>
where
F: FnOnce(&Connection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: From<rusqlite::Error> + From<Error> + Send + 'static,
{
self.get().conn_and_then(func).await
}

/// Invokes the provided function with a mutable [`rusqlite::Connection`].
///
/// Maps the result error type to a custom error; designed to be
/// used in conjunction with [`query_and_then`](https://docs.rs/rusqlite/latest/rusqlite/struct.CachedStatement.html#method.query_and_then).
pub async fn conn_mut_and_then<F, T, E>(&self, func: F) -> Result<T, E>
where
F: FnOnce(&mut Connection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: From<rusqlite::Error> + From<Error> + Send + 'static,
{
self.get().conn_mut_and_then(func).await
}

/// Closes the underlying sqlite connections.
///
/// After this method returns, all calls to `self::conn()` or
Expand Down Expand Up @@ -365,6 +391,34 @@ impl Pool {
self.get().conn_mut_blocking(func)
}

/// Invokes the provided function with a [`rusqlite::Connection`], blocking
/// the current thread.
///
/// Maps the result error type to a custom error; designed to be
/// used in conjunction with [`query_and_then`](https://docs.rs/rusqlite/latest/rusqlite/struct.CachedStatement.html#method.query_and_then).
pub fn conn_and_then_blocking<F, T, E>(&self, func: F) -> Result<T, E>
where
F: FnOnce(&Connection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: From<rusqlite::Error> + From<Error> + Send + 'static,
{
self.get().conn_and_then_blocking(func)
}

/// Invokes the provided function with a mutable [`rusqlite::Connection`],
/// blocking the current thread.
///
/// Maps the result error type to a custom error; designed to be
/// used in conjunction with [`query_and_then`](https://docs.rs/rusqlite/latest/rusqlite/struct.CachedStatement.html#method.query_and_then).
pub fn conn_mut_and_then_blocking<F, T, E>(&self, func: F) -> Result<T, E>
where
F: FnOnce(&mut Connection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: From<rusqlite::Error> + From<Error> + Send + 'static,
{
self.get().conn_mut_and_then_blocking(func)
}

/// Closes the underlying sqlite connections, blocking the current thread.
///
/// After this method returns, all calls to `self::conn_blocking()` or
Expand Down
127 changes: 127 additions & 0 deletions tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,32 @@ fn journal_modes() -> [(JournalMode, &'static str); 6] {
]
}

#[derive(Debug)]
enum CustomError {
AsyncSqlite,
Rusqlite,
User(&'static str),
}

impl From<Error> for CustomError {
fn from(_value: Error) -> Self {
Self::AsyncSqlite
}
}

impl From<rusqlite::Error> for CustomError {
fn from(_value: rusqlite::Error) -> Self {
Self::Rusqlite
}
}

fn assert_user_error(result: Result<(), CustomError>, expected: &'static str) {
match result {
Err(CustomError::User(actual)) => assert_eq!(actual, expected),
other => panic!("expected CustomError::User({expected:?}), got {other:?}"),
}
}

#[test]
fn test_blocking_client() {
let tmp_dir = tempfile::tempdir().unwrap();
Expand Down Expand Up @@ -59,6 +85,39 @@ fn test_blocking_client() {
client.close_blocking().expect("closing client conn");
}

#[test]
fn test_blocking_client_and_then_api() {
let client = ClientBuilder::new()
.open_blocking()
.expect("client unable to be opened");

client
.conn_and_then_blocking(|conn| {
conn.execute(
"CREATE TABLE testing (id INTEGER PRIMARY KEY, val INTEGER NOT NULL)",
(),
)?;
conn.execute("INSERT INTO testing VALUES (1, ?)", [42])?;
Ok::<(), CustomError>(())
})
.expect("writing schema and seed data");

let val: i64 = client
.conn_mut_and_then_blocking(|conn| {
conn.query_row("SELECT val FROM testing WHERE id=?", [1], |row| row.get(0))
.map_err(CustomError::from)
})
.expect("querying for result");
assert_eq!(val, 42);

assert_user_error(
client.conn_and_then_blocking(|_| Err(CustomError::User("client"))),
"client",
);

client.close_blocking().expect("closing client conn");
}

#[test]
fn test_blocking_default_pool_in_memory_uses_one_connection() {
let pool = PoolBuilder::new()
Expand Down Expand Up @@ -125,6 +184,38 @@ fn test_blocking_pool() {
pool.close_blocking().expect("closing client conn");
}

#[test]
fn test_blocking_pool_and_then_api() {
let pool = PoolBuilder::new()
.open_blocking()
.expect("pool unable to be opened");

pool.conn_and_then_blocking(|conn| {
conn.execute(
"CREATE TABLE testing (id INTEGER PRIMARY KEY, val INTEGER NOT NULL)",
(),
)?;
conn.execute("INSERT INTO testing VALUES (1, ?)", [42])?;
Ok::<(), CustomError>(())
})
.expect("writing schema and seed data");

let val: i64 = pool
.conn_mut_and_then_blocking(|conn| {
conn.query_row("SELECT val FROM testing WHERE id=?", [1], |row| row.get(0))
.map_err(CustomError::from)
})
.expect("querying for result");
assert_eq!(val, 42);

assert_user_error(
pool.conn_and_then_blocking(|_| Err(CustomError::User("pool"))),
"pool",
);

pool.close_blocking().expect("closing pool");
}

#[test]
fn test_blocking_pool_rejects_multi_connection_anonymous_memory() {
let err = match PoolBuilder::new().num_conns(2).open_blocking() {
Expand Down Expand Up @@ -208,6 +299,7 @@ async_test!(test_journal_mode);
async_test!(test_concurrency);
async_test!(test_default_pool_in_memory_uses_one_connection);
async_test!(test_pool);
async_test!(test_pool_and_then_api);
async_test!(test_pool_rejects_multi_connection_anonymous_memory);
async_test!(test_shared_memory_pool);
async_test!(test_shared_memory_rejects_empty_name);
Expand Down Expand Up @@ -346,6 +438,41 @@ async fn test_pool() {
.expect("collecting query results");
}

async fn test_pool_and_then_api() {
let pool = PoolBuilder::new()
.open()
.await
.expect("pool unable to be opened");

pool.conn_and_then(|conn| {
conn.execute(
"CREATE TABLE testing (id INTEGER PRIMARY KEY, val INTEGER NOT NULL)",
(),
)?;
conn.execute("INSERT INTO testing VALUES (1, ?)", [42])?;
Ok::<(), CustomError>(())
})
.await
.expect("writing schema and seed data");

let val: i64 = pool
.conn_mut_and_then(|conn| {
conn.query_row("SELECT val FROM testing WHERE id=?", [1], |row| row.get(0))
.map_err(CustomError::from)
})
.await
.expect("querying for result");
assert_eq!(val, 42);

assert_user_error(
pool.conn_and_then(|_| Err(CustomError::User("pool async")))
.await,
"pool async",
);

pool.close().await.expect("closing pool");
}

async fn test_pool_rejects_multi_connection_anonymous_memory() {
let err = match PoolBuilder::new().num_conns(2).open().await {
Ok(pool) => {
Expand Down
Loading