Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ The library has three core types in `src/`:

- **Client** (`client.rs`): Wraps a single SQLite connection. Spawns a background `std::thread` that receives commands (closures) via a `crossbeam_channel`. Results are returned through `futures_channel::oneshot`. This design makes it runtime-agnostic. Client is cheaply cloneable.

- **Pool** (`pool.rs`): Manages multiple `Client` instances with round-robin selection via an atomic counter. Provides the same API as Client plus `conn_for_each()` for executing on all connections. Defaults to CPU-count connections.
- **Pool** (`pool.rs`): Manages multiple `Client` instances with round-robin selection via an atomic counter. Provides the same API as Client plus `conn_for_each()` for executing on all connections. File-backed and named shared-memory pools default to CPU-count connections; anonymous in-memory pools default to one connection and reject explicit multi-connection configuration.

- **Error** (`error.rs`): Non-exhaustive enum wrapping `rusqlite::Error`, channel errors, and pragma failures.
- **Error** (`error.rs`): Non-exhaustive enum wrapping config errors, `rusqlite::Error`, channel errors, panics, and pragma failures.

All database operations use a closure-based API (e.g., `conn(|conn| { ... })`) to avoid lifetime issues with the cross-thread boundary. Both blocking and async variants exist for all operations.

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ println!("Value is: {value}");
A `Pool` represents a collection of background sqlite3 connections that can be
called concurrently from any thread in your program.

`PoolBuilder::new().open()` and `path(":memory:")` use a single anonymous
in-memory connection by default, since separate SQLite `:memory:` connections
do not share schema or data. File-backed pools default to the logical CPU
count. For multiple connections to a named in-memory database, use
`shared_memory("name")`; this uses SQLite shared-cache mode, which has caveats,
so prefer a file-backed database when possible.

To create a sqlite pool and run a query:

```rust
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub enum Error {
/// Indicates that the connection to the sqlite database is closed.
Closed,
/// Invalid builder configuration.
Config { message: &'static str },
/// Error updating PRAGMA.
PragmaUpdate {
name: &'static str,
Expand All @@ -29,6 +31,7 @@ impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Closed => write!(f, "connection to sqlite database closed"),
Error::Config { message } => write!(f, "invalid configuration: {message}"),
Error::PragmaUpdate { exp, got, name } => {
write!(f, "updating pragma {name}: expected '{exp}', got '{got}'")
}
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
//! A `Pool` represents a collection of background sqlite3 connections that can be
//! called concurrently from any thread in your program.
//!
//! `PoolBuilder::new().open()` and `path(":memory:")` use a single anonymous
//! in-memory connection by default, since separate SQLite `:memory:`
//! connections do not share schema or data. File-backed pools default to the
//! logical CPU count. For multiple connections to a named in-memory database,
//! use `shared_memory("name")`; this uses SQLite shared-cache mode, which has
//! caveats, so prefer a file-backed database when possible.
//!
//! To create a sqlite pool and run a query:
//!
//! ```rust
Expand Down
165 changes: 125 additions & 40 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use rusqlite::{Connection, OpenFlags};
#[derive(Clone, Debug, Default)]
pub struct PoolBuilder {
path: Option<PathBuf>,
shared_memory_name: Option<String>,
flags: OpenFlags,
journal_mode: Option<JournalMode>,
vfs: Option<String>,
Expand All @@ -51,6 +52,30 @@ impl PoolBuilder {
/// By default, an in-memory database is used.
pub fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.path = Some(path.as_ref().into());
self.shared_memory_name = None;
self
}

/// Use a named shared in-memory sqlite database.
///
/// This opens connections with a URI of the form
/// `file:<name>?mode=memory&cache=shared` and enables
/// [`OpenFlags::SQLITE_OPEN_URI`] and
/// [`OpenFlags::SQLITE_OPEN_SHARED_CACHE`].
///
/// SQLite shared-cache mode has caveats and is discouraged by SQLite for
/// many workloads. Prefer a file-backed database when possible. The
/// in-memory database is deleted after the last connection using this name
/// is closed.
///
/// ```
/// use async_sqlite::PoolBuilder;
///
/// let builder = PoolBuilder::new().shared_memory("my-pool").num_conns(2);
/// ```
pub fn shared_memory<N: AsRef<str>>(mut self, name: N) -> Self {
self.path = None;
self.shared_memory_name = Some(name.as_ref().to_owned());
self
}

Expand Down Expand Up @@ -78,8 +103,11 @@ impl PoolBuilder {

/// Specify the number of sqlite connections to open as part of the pool.
///
/// Defaults to the number of logical CPUs of the current system. Values
/// less than `1` are clamped to `1`.
/// File-backed and shared-memory pools default to the number of logical
/// CPUs of the current system. Anonymous in-memory pools, including
/// `path(":memory:")`, default to `1` connection because each sqlite
/// `:memory:` connection is a separate database. Values less than `1` are
/// clamped to `1`.
///
/// ```
/// use async_sqlite::PoolBuilder;
Expand All @@ -104,30 +132,16 @@ impl PoolBuilder {
/// ```
pub async fn open(self) -> Result<Pool, Error> {
let num_conns = self.get_num_conns();
self.validate(num_conns)?;

// Open the first connection with full config (including journal_mode).
// This must complete before opening remaining connections to avoid
// concurrent PRAGMA writes on a new database file.
let first = ClientBuilder {
path: self.path.clone(),
flags: self.flags,
journal_mode: self.journal_mode,
vfs: self.vfs.clone(),
}
.open()
.await?;
let first = self.client_builder().open().await?;

// Open remaining connections with journal_mode too, so connection-local
// modes are applied consistently across the pool.
let opens = (1..num_conns).map(|_| {
ClientBuilder {
path: self.path.clone(),
flags: self.flags,
journal_mode: self.journal_mode,
vfs: self.vfs.clone(),
}
.open()
});
let opens = (1..num_conns).map(|_| self.client_builder().open());
let mut clients = vec![first];
clients.extend(
join_all(opens)
Expand Down Expand Up @@ -158,30 +172,17 @@ impl PoolBuilder {
/// ```
pub fn open_blocking(self) -> Result<Pool, Error> {
let num_conns = self.get_num_conns();
self.validate(num_conns)?;

// Open the first connection with full config (including journal_mode).
let first = ClientBuilder {
path: self.path.clone(),
flags: self.flags,
journal_mode: self.journal_mode,
vfs: self.vfs.clone(),
}
.open_blocking()?;
let first = self.client_builder().open_blocking()?;

// Open remaining connections with journal_mode too, so connection-local
// modes are applied consistently across the pool.
let mut clients = vec![first];
clients.extend(
(1..num_conns)
.map(|_| {
ClientBuilder {
path: self.path.clone(),
flags: self.flags,
journal_mode: self.journal_mode,
vfs: self.vfs.clone(),
}
.open_blocking()
})
.map(|_| self.client_builder().open_blocking())
.collect::<Result<Vec<Client>, Error>>()?,
);

Expand All @@ -194,11 +195,95 @@ impl PoolBuilder {
}

fn get_num_conns(&self) -> usize {
self.num_conns.unwrap_or_else(|| {
available_parallelism()
.unwrap_or_else(|_| NonZeroUsize::new(1).unwrap())
.into()
})
if let Some(num_conns) = self.num_conns {
return num_conns;
}

if self.is_anonymous_memory() {
return 1;
}

available_parallelism()
.unwrap_or_else(|_| NonZeroUsize::new(1).unwrap())
.into()
}

fn validate(&self, num_conns: usize) -> Result<(), Error> {
if self
.shared_memory_name
.as_ref()
.is_some_and(|name| name.is_empty())
{
return Err(Error::Config {
message: "shared memory database name must not be empty",
});
}

if self.is_anonymous_memory() && num_conns > 1 {
return Err(Error::Config {
message: "anonymous in-memory pools cannot use multiple connections; call path(...) for file-backed pools or shared_memory(...) for named shared in-memory pools",
});
}

Ok(())
}

fn client_builder(&self) -> ClientBuilder {
ClientBuilder {
path: self.connection_path(),
flags: self.connection_flags(),
journal_mode: self.journal_mode,
vfs: self.vfs.clone(),
}
}

fn connection_path(&self) -> Option<PathBuf> {
self.shared_memory_name
.as_deref()
.map(shared_memory_uri)
.or_else(|| self.path.clone())
}

fn connection_flags(&self) -> OpenFlags {
let mut flags = self.flags;
if self.shared_memory_name.is_some() {
flags.insert(OpenFlags::SQLITE_OPEN_URI);
flags.insert(OpenFlags::SQLITE_OPEN_SHARED_CACHE);
flags.remove(OpenFlags::SQLITE_OPEN_PRIVATE_CACHE);
}
flags
}

fn is_anonymous_memory(&self) -> bool {
self.shared_memory_name.is_none()
&& self
.path
.as_deref()
.is_none_or(|path| path == Path::new(":memory:"))
}
}

fn shared_memory_uri(name: &str) -> PathBuf {
let mut uri = String::from("file:");
push_uri_encoded(name, &mut uri);
uri.push_str("?mode=memory&cache=shared");
uri.into()
}

fn push_uri_encoded(input: &str, out: &mut String) {
const HEX: &[u8; 16] = b"0123456789ABCDEF";

for byte in input.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
out.push(byte.into());
}
_ => {
out.push('%');
out.push(HEX[(byte >> 4) as usize].into());
out.push(HEX[(byte & 0x0F) as usize].into());
}
}
}
}

Expand Down
Loading
Loading