Skip to content

Commit 767fb3e

Browse files
committed
Guard concurrent sends with exclusive DB lock and URI/RK checks
Two concurrent send commands targeting the same URI could select different coins. Set PRAGMA locking_mode = EXCLUSIVE so only one process holds write access at a time. Check uniqueness of URI and recceiver pubkey with duplicate checks
1 parent 66492aa commit 767fb3e

4 files changed

Lines changed: 136 additions & 11 deletions

File tree

payjoin-cli/src/app/v2/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ impl AppTrait for App {
229229
Some((sender_state, persister)) => (sender_state, persister),
230230
None => {
231231
let persister =
232-
SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
232+
SenderPersister::new(self.db.clone(), bip21, receiver_pubkey)?;
233233
let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
234234
let sender =
235235
SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
@@ -307,17 +307,17 @@ impl AppTrait for App {
307307

308308
// Process sender sessions
309309
for session_id in send_session_ids {
310-
let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id.clone());
311-
match replay_sender_event_log(&sender_persiter) {
310+
let sender_persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
311+
match replay_sender_event_log(&sender_persister) {
312312
Ok((sender_state, _)) => {
313313
let self_clone = self.clone();
314314
tasks.push(tokio::spawn(async move {
315-
self_clone.process_sender_session(sender_state, &sender_persiter).await
315+
self_clone.process_sender_session(sender_state, &sender_persister).await
316316
}));
317317
}
318318
Err(e) => {
319319
tracing::error!("An error {:?} occurred while replaying Sender session", e);
320-
Self::close_failed_session(&sender_persiter, &session_id, "sender");
320+
Self::close_failed_session(&sender_persister, &session_id, "sender");
321321
}
322322
}
323323
}

payjoin-cli/src/db/error.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ use rusqlite::Error as RusqliteError;
66

77
pub(crate) type Result<T> = std::result::Result<T, Error>;
88

9+
#[cfg(feature = "v2")]
10+
#[derive(Debug)]
11+
pub(crate) enum DuplicateKind {
12+
Uri,
13+
ReceiverPubkey,
14+
}
15+
916
#[derive(Debug)]
1017
pub(crate) enum Error {
1118
Rusqlite(RusqliteError),
@@ -14,6 +21,8 @@ pub(crate) enum Error {
1421
Serialize(serde_json::Error),
1522
#[cfg(feature = "v2")]
1623
Deserialize(serde_json::Error),
24+
#[cfg(feature = "v2")]
25+
DuplicateSendSession(DuplicateKind),
1726
}
1827

1928
impl fmt::Display for Error {
@@ -25,6 +34,15 @@ impl fmt::Display for Error {
2534
Error::Serialize(e) => write!(f, "Serialization failed: {e}"),
2635
#[cfg(feature = "v2")]
2736
Error::Deserialize(e) => write!(f, "Deserialization failed: {e}"),
37+
#[cfg(feature = "v2")]
38+
Error::DuplicateSendSession(DuplicateKind::Uri) => {
39+
write!(f, "A send session for this URI is already active")
40+
}
41+
#[cfg(feature = "v2")]
42+
Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey) => write!(
43+
f,
44+
"A send session with this receiver pubkey is already active under a different URI"
45+
),
2846
}
2947
}
3048
}
@@ -38,6 +56,8 @@ impl std::error::Error for Error {
3856
Error::Serialize(e) => Some(e),
3957
#[cfg(feature = "v2")]
4058
Error::Deserialize(e) => Some(e),
59+
#[cfg(feature = "v2")]
60+
Error::DuplicateSendSession(_) => None,
4161
}
4262
}
4363
}

payjoin-cli/src/db/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@ pub(crate) fn now() -> i64 {
1515

1616
pub(crate) const DB_PATH: &str = "payjoin.sqlite";
1717

18+
#[derive(Debug)]
1819
pub(crate) struct Database(Pool<SqliteConnectionManager>);
1920

2021
impl Database {
2122
pub(crate) fn create(path: impl AsRef<Path>) -> Result<Self> {
22-
let manager = SqliteConnectionManager::file(path.as_ref());
23+
// locking_mode is a per-connection PRAGMA, so it must be set via
24+
// with_init to apply to every connection the pool creates, not only
25+
// the first one used during init_schema.
26+
let manager = SqliteConnectionManager::file(path.as_ref())
27+
.with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;"));
2328
let pool = Pool::new(manager)?;
2429

2530
// Initialize database schema
@@ -36,6 +41,7 @@ impl Database {
3641
conn.execute(
3742
"CREATE TABLE IF NOT EXISTS send_sessions (
3843
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
44+
pj_uri TEXT NOT NULL,
3945
receiver_pubkey BLOB NOT NULL,
4046
completed_at INTEGER
4147
)",

payjoin-cli/src/db/v2.rs

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,40 @@ impl std::fmt::Display for SessionId {
2020
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
2121
}
2222

23-
#[derive(Clone)]
23+
#[derive(Clone, Debug)]
2424
pub(crate) struct SenderPersister {
2525
db: Arc<Database>,
2626
session_id: SessionId,
2727
}
2828

2929
impl SenderPersister {
30-
pub fn new(db: Arc<Database>, receiver_pubkey: HpkePublicKey) -> crate::db::Result<Self> {
30+
pub fn new(
31+
db: Arc<Database>,
32+
pj_uri: &str,
33+
receiver_pubkey: &HpkePublicKey,
34+
) -> crate::db::Result<Self> {
3135
let conn = db.get_connection()?;
36+
let receiver_pubkey_bytes = receiver_pubkey.to_compressed_bytes();
37+
38+
let (duplicate_uri, duplicate_rk): (bool, bool) = conn.query_row(
39+
"SELECT \
40+
EXISTS(SELECT 1 FROM send_sessions WHERE pj_uri = ?1), \
41+
EXISTS(SELECT 1 FROM send_sessions WHERE receiver_pubkey = ?2)",
42+
params![pj_uri, &receiver_pubkey_bytes],
43+
|row| Ok((row.get(0)?, row.get(1)?)),
44+
)?;
45+
46+
if duplicate_uri {
47+
return Err(Error::DuplicateSendSession(DuplicateKind::Uri));
48+
}
49+
if duplicate_rk {
50+
return Err(Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey));
51+
}
3252

3353
// Create a new session in send_sessions and get its ID
3454
let session_id: i64 = conn.query_row(
35-
"INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id",
36-
params![receiver_pubkey.to_compressed_bytes()],
55+
"INSERT INTO send_sessions (pj_uri, receiver_pubkey) VALUES (?1, ?2) RETURNING session_id",
56+
params![pj_uri, &receiver_pubkey_bytes],
3757
|row| row.get(0),
3858
)?;
3959

@@ -42,7 +62,6 @@ impl SenderPersister {
4262

4363
pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
4464
}
45-
4665
impl SessionPersister for SenderPersister {
4766
type SessionEvent = SenderSessionEvent;
4867
type InternalStorageError = crate::db::error::Error;
@@ -268,3 +287,83 @@ impl Database {
268287
Ok(session_ids)
269288
}
270289
}
290+
291+
#[cfg(all(test, feature = "v2"))]
292+
mod tests {
293+
use std::sync::Arc;
294+
295+
use payjoin::HpkeKeyPair;
296+
297+
use super::*;
298+
299+
fn create_test_db() -> Arc<Database> {
300+
// Use an in-memory database for tests
301+
let manager = r2d2_sqlite::SqliteConnectionManager::memory()
302+
.with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;"));
303+
let pool = r2d2::Pool::new(manager).expect("pool creation should succeed");
304+
let conn = pool.get().expect("connection should succeed");
305+
Database::init_schema(&conn).expect("schema init should succeed");
306+
Arc::new(Database(pool))
307+
}
308+
309+
fn make_receiver_pubkey() -> payjoin::HpkePublicKey { HpkeKeyPair::gen_keypair().1 }
310+
311+
// Second call with the same URI (same active session) should return DuplicateSendSession(Uri).
312+
#[test]
313+
fn test_duplicate_uri_returns_error() {
314+
let db = create_test_db();
315+
let rk1 = make_receiver_pubkey();
316+
let rk2 = make_receiver_pubkey();
317+
let uri = "bitcoin:addr1?pj=https://example.com/BBBBBBBB";
318+
319+
SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
320+
321+
let err = SenderPersister::new(db, uri, &rk2).expect_err("duplicate URI should fail");
322+
assert!(
323+
matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
324+
"expected DuplicateSendSession(Uri), got: {err:?}"
325+
);
326+
}
327+
328+
// Same receiver pubkey under a different URI should return DuplicateSendSession(ReceiverPubkey).
329+
#[test]
330+
fn test_duplicate_rk_returns_error() {
331+
let db = create_test_db();
332+
let rk = make_receiver_pubkey();
333+
let uri1 = "bitcoin:addr1?pj=https://example.com/CCCCCCCC";
334+
let uri2 = "bitcoin:addr1?pj=https://example.com/DDDDDDDD";
335+
336+
SenderPersister::new(db.clone(), uri1, &rk).expect("first session should succeed");
337+
338+
let err = SenderPersister::new(db, uri2, &rk).expect_err("duplicate RK should fail");
339+
assert!(
340+
matches!(err, Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey)),
341+
"expected DuplicateSendSession(ReceiverPubkey), got: {err:?}"
342+
);
343+
}
344+
345+
// After a session is marked completed, a new session with the same URI must still be rejected
346+
// to prevent address reuse, HPKE receiver-key reuse
347+
#[test]
348+
fn test_completed_session_blocks_reuse() {
349+
let db = create_test_db();
350+
let rk1 = make_receiver_pubkey();
351+
let rk2 = make_receiver_pubkey();
352+
let uri = "bitcoin:addr1?pj=https://example.com/EEEEEEEE";
353+
354+
let persister =
355+
SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
356+
357+
// Mark the session as completed
358+
use payjoin::persist::SessionPersister;
359+
persister.close().expect("close should succeed");
360+
361+
// A new session with the same URI must be rejected even after completion
362+
let err = SenderPersister::new(db, uri, &rk2)
363+
.expect_err("reuse of a completed session URI must be rejected");
364+
assert!(
365+
matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
366+
"expected DuplicateSendSession(Uri), got: {err:?}"
367+
);
368+
}
369+
}

0 commit comments

Comments
 (0)