Skip to content

Commit 3504496

Browse files
DanGouldspacebear21
authored andcommitted
Clear V1 payload from memory after first read
V1 payjoin requests carry plaintext PSBTs that should not linger in memory longer than necessary. Wrap the V1 payload in Option and take() it on first read so subsequent reads see AlreadyRead instead of the raw transaction data.
1 parent f458784 commit 3504496

3 files changed

Lines changed: 83 additions & 10 deletions

File tree

payjoin-directory/src/db/files.rs

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ struct V2WaitMapEntry {
3737

3838
#[derive(Debug)]
3939
struct V1WaitMapEntry {
40-
payload: Arc<Vec<u8>>,
40+
/// The V1 payload. `take()`n after the first read for data minimization —
41+
/// plaintext PSBTs should not linger in memory longer than needed.
42+
payload: Option<Arc<Vec<u8>>>,
4143
sender: oneshot::Sender<Vec<u8>>,
4244
}
4345

@@ -325,9 +327,12 @@ impl DbTrait for Db {
325327
impl Mailboxes {
326328
async fn read(&mut self, id: &ShortId) -> io::Result<Option<Arc<Vec<u8>>>> {
327329
// V1 POST requests are only stored in memory since they are
328-
// unencrypted. Check this hash table first.
329-
if let Some(V1WaitMapEntry { payload, .. }) = self.pending_v1.get(id) {
330-
return Ok(Some(payload.clone()));
330+
// unencrypted. Check this hash table first. Use take() for data
331+
// minimization — clear the plaintext payload after first read.
332+
if let Some(entry) = self.pending_v1.get_mut(id) {
333+
if let Some(payload) = entry.payload.take() {
334+
return Ok(Some(payload));
335+
}
331336
}
332337

333338
// V2 requests are stored on disk
@@ -358,8 +363,11 @@ impl Mailboxes {
358363
return Err(Error::OverCapacity);
359364
}
360365

361-
if self.pending_v1.contains_key(id) {
362-
return Err(Error::OverCapacity);
366+
if let Some(entry) = self.pending_v1.get(id) {
367+
if entry.payload.is_some() {
368+
return Err(Error::OverCapacity);
369+
}
370+
return Err(Error::AlreadyRead);
363371
}
364372

365373
let receiver = self
@@ -419,13 +427,17 @@ impl Mailboxes {
419427
let payload = payload.clone();
420428
let (sender, receiver) = oneshot::channel::<Vec<u8>>();
421429
ret = Some(receiver);
422-
V1WaitMapEntry { payload, sender }
430+
V1WaitMapEntry { payload: Some(payload), sender }
423431
});
424432

425-
// If there are pending readers, satisfy them and mark the payload as read
433+
// If there are pending readers, satisfy them with the payload
434+
// and clear the in-memory copy for data minimization
426435
if let Some(pending) = self.pending_v2.remove(id) {
427436
trace!("notifying pending readers for {} (v1 fallback)", id);
428-
pending.sender.send(payload).expect("sending on oneshot channel must succeed");
437+
pending.sender.send(payload.clone()).expect("sending on oneshot channel must succeed");
438+
if let Some(entry) = self.pending_v1.get_mut(id) {
439+
entry.payload.take();
440+
}
429441
}
430442

431443
Ok(ret)
@@ -568,6 +580,9 @@ pub enum Error {
568580
/// Operation rejected due to lack of capacity
569581
OverCapacity,
570582

583+
/// Indicates receiver already consumed the plaintext V1 request payload
584+
AlreadyRead,
585+
571586
/// Indicates the sender that was waiting for the reply is no longer there
572587
V1SenderUnavailable,
573588

@@ -584,6 +599,7 @@ impl From<Error> for super::Error<std::io::Error> {
584599
match val {
585600
Error::V1SenderUnavailable => super::Error::V1SenderUnavailable,
586601
Error::OverCapacity => super::Error::OverCapacity,
602+
Error::AlreadyRead => super::Error::AlreadyRead,
587603
Error::IO(e) => super::Error::Operational(e),
588604
}
589605
}
@@ -603,6 +619,7 @@ impl std::fmt::Display for Error {
603619
use Error::*;
604620
match self {
605621
OverCapacity => "Database over capacity".fmt(f),
622+
AlreadyRead => "Mailbox payload already read".fmt(f),
606623
V1SenderUnavailable => "Sender no longer connected".fmt(f),
607624
IO(e) => write!(f, "Internal Error: {e}"),
608625
}
@@ -780,7 +797,7 @@ async fn test_v2_wait() -> std::io::Result<()> {
780797

781798
match db.wait_for_v2_payload(&id).await {
782799
Err(super::Error::Timeout(_)) => {}
783-
res => panic!("expected timeout, got {:?}", res),
800+
res => panic!("expected timeout, got {res:?}"),
784801
}
785802

786803
let read_task1 = tokio::spawn({
@@ -870,6 +887,59 @@ async fn test_v1_wait() -> std::io::Result<()> {
870887
Ok(())
871888
}
872889

890+
#[tokio::test]
891+
async fn test_v1_data_minimization() -> std::io::Result<()> {
892+
let dir = tempfile::tempdir()?;
893+
894+
let db = Arc::new(
895+
Db::init(Duration::from_millis(500), dir.path().to_owned())
896+
.await
897+
.expect("initializing mailbox database should succeed"),
898+
);
899+
900+
let id = ShortId([0u8; 8]);
901+
902+
// Spawn v1 sender in background
903+
let v1_sender_task = tokio::spawn({
904+
let db = db.clone();
905+
async move { db.post_v1_request_and_wait_for_response(&id, b"request".to_vec()).await }
906+
});
907+
908+
// Small delay to let v1 request post
909+
tokio::time::sleep(Duration::from_millis(10)).await;
910+
911+
// First read should return the payload
912+
let res = db.wait_for_v2_payload(&id).await.expect("first read should succeed");
913+
assert_eq!(&res[..], b"request", "first read should return the payload");
914+
915+
// Subsequent reads should not return the plaintext payload again.
916+
assert!(
917+
matches!(db.wait_for_v2_payload(&id).await, Err(super::Error::AlreadyRead)),
918+
"subsequent reads should indicate the payload was already consumed"
919+
);
920+
921+
// Verify the payload was cleared from memory by checking directly
922+
{
923+
let guard = db.mailboxes.lock().await;
924+
let entry = guard.pending_v1.get(&id);
925+
assert!(
926+
entry.is_none_or(|e| e.payload.is_none()),
927+
"v1 payload should have been cleared after first read"
928+
);
929+
}
930+
931+
// V1 response flow should still work even after payload was cleared
932+
db.post_v1_response(&id, b"response".to_vec()).await.expect("posting response should succeed");
933+
934+
let res = v1_sender_task
935+
.await
936+
.expect("joining task should succeed")
937+
.expect("v1 sender should get response");
938+
assert_eq!(&res[..], b"response", "v1 sender should receive the response");
939+
940+
Ok(())
941+
}
942+
873943
// Simulate elapsed time deterministically by shifting stored timestamps
874944
// backward instead of sleeping. tokio::time::pause() can't be used because
875945
// prune compares against SystemTime (timestamps originate from disk).

payjoin-directory/src/db/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub enum Error<OperationalError: SendableError> {
1616
Operational(OperationalError),
1717
Timeout(tokio::time::error::Elapsed),
1818
OverCapacity,
19+
AlreadyRead,
1920
V1SenderUnavailable,
2021
}
2122

@@ -33,6 +34,7 @@ impl<E: SendableError> std::fmt::Display for Error<E> {
3334
Operational(error) => write!(f, "Db error: {error}"),
3435
Timeout(timeout) => write!(f, "Timeout: {timeout}"),
3536
OverCapacity => "Database over capacity".fmt(f),
37+
AlreadyRead => "Mailbox payload already read".fmt(f),
3638
V1SenderUnavailable => "Sender no longer connected".fmt(f),
3739
}
3840
}

payjoin-directory/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ fn handle_peek<Error: db::SendableError>(
453453
db::Error::OverCapacity => Err(HandlerError::ServiceUnavailable(anyhow::Error::msg(
454454
"mailbox storage at capacity",
455455
))),
456+
db::Error::AlreadyRead => Ok(timeout_response),
456457
db::Error::V1SenderUnavailable => Err(HandlerError::SenderGone(anyhow::Error::msg(
457458
"Sender is unavailable try a new request",
458459
))),

0 commit comments

Comments
 (0)