Skip to content

Commit 203c4a8

Browse files
authored
Clear V1 payload from memory after first read (#1335)
2 parents d0ca19d + 3504496 commit 203c4a8

3 files changed

Lines changed: 83 additions & 10 deletions

File tree

payjoin-directory/src/db/files.rs

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ struct V2WaitMapEntry {
3737

3838
#[derive(Debug)]
3939
struct V1WaitMapEntry {
40-
payload: Arc<Vec<u8>>,
40+
/// The V1 payload. `take()`n after the first read for data minimization —
41+
/// plaintext PSBTs should not linger in memory longer than needed.
42+
payload: Option<Arc<Vec<u8>>>,
4143
sender: oneshot::Sender<Vec<u8>>,
4244
}
4345

@@ -325,9 +327,12 @@ impl DbTrait for Db {
325327
impl Mailboxes {
326328
async fn read(&mut self, id: &ShortId) -> io::Result<Option<Arc<Vec<u8>>>> {
327329
// V1 POST requests are only stored in memory since they are
328-
// unencrypted. Check this hash table first.
329-
if let Some(V1WaitMapEntry { payload, .. }) = self.pending_v1.get(id) {
330-
return Ok(Some(payload.clone()));
330+
// unencrypted. Check this hash table first. Use take() for data
331+
// minimization — clear the plaintext payload after first read.
332+
if let Some(entry) = self.pending_v1.get_mut(id) {
333+
if let Some(payload) = entry.payload.take() {
334+
return Ok(Some(payload));
335+
}
331336
}
332337

333338
// V2 requests are stored on disk
@@ -358,8 +363,11 @@ impl Mailboxes {
358363
return Err(Error::OverCapacity);
359364
}
360365

361-
if self.pending_v1.contains_key(id) {
362-
return Err(Error::OverCapacity);
366+
if let Some(entry) = self.pending_v1.get(id) {
367+
if entry.payload.is_some() {
368+
return Err(Error::OverCapacity);
369+
}
370+
return Err(Error::AlreadyRead);
363371
}
364372

365373
let receiver = self
@@ -419,13 +427,17 @@ impl Mailboxes {
419427
let payload = payload.clone();
420428
let (sender, receiver) = oneshot::channel::<Vec<u8>>();
421429
ret = Some(receiver);
422-
V1WaitMapEntry { payload, sender }
430+
V1WaitMapEntry { payload: Some(payload), sender }
423431
});
424432

425-
// If there are pending readers, satisfy them and mark the payload as read
433+
// If there are pending readers, satisfy them with the payload
434+
// and clear the in-memory copy for data minimization
426435
if let Some(pending) = self.pending_v2.remove(id) {
427436
trace!("notifying pending readers for {} (v1 fallback)", id);
428-
pending.sender.send(payload).expect("sending on oneshot channel must succeed");
437+
pending.sender.send(payload.clone()).expect("sending on oneshot channel must succeed");
438+
if let Some(entry) = self.pending_v1.get_mut(id) {
439+
entry.payload.take();
440+
}
429441
}
430442

431443
Ok(ret)
@@ -568,6 +580,9 @@ pub enum Error {
568580
/// Operation rejected due to lack of capacity
569581
OverCapacity,
570582

583+
/// Indicates receiver already consumed the plaintext V1 request payload
584+
AlreadyRead,
585+
571586
/// Indicates the sender that was waiting for the reply is no longer there
572587
V1SenderUnavailable,
573588

@@ -584,6 +599,7 @@ impl From<Error> for super::Error<std::io::Error> {
584599
match val {
585600
Error::V1SenderUnavailable => super::Error::V1SenderUnavailable,
586601
Error::OverCapacity => super::Error::OverCapacity,
602+
Error::AlreadyRead => super::Error::AlreadyRead,
587603
Error::IO(e) => super::Error::Operational(e),
588604
}
589605
}
@@ -603,6 +619,7 @@ impl std::fmt::Display for Error {
603619
use Error::*;
604620
match self {
605621
OverCapacity => "Database over capacity".fmt(f),
622+
AlreadyRead => "Mailbox payload already read".fmt(f),
606623
V1SenderUnavailable => "Sender no longer connected".fmt(f),
607624
IO(e) => write!(f, "Internal Error: {e}"),
608625
}
@@ -780,7 +797,7 @@ async fn test_v2_wait() -> std::io::Result<()> {
780797

781798
match db.wait_for_v2_payload(&id).await {
782799
Err(super::Error::Timeout(_)) => {}
783-
res => panic!("expected timeout, got {:?}", res),
800+
res => panic!("expected timeout, got {res:?}"),
784801
}
785802

786803
let read_task1 = tokio::spawn({
@@ -870,6 +887,59 @@ async fn test_v1_wait() -> std::io::Result<()> {
870887
Ok(())
871888
}
872889

890+
#[tokio::test]
891+
async fn test_v1_data_minimization() -> std::io::Result<()> {
892+
let dir = tempfile::tempdir()?;
893+
894+
let db = Arc::new(
895+
Db::init(Duration::from_millis(500), dir.path().to_owned())
896+
.await
897+
.expect("initializing mailbox database should succeed"),
898+
);
899+
900+
let id = ShortId([0u8; 8]);
901+
902+
// Spawn v1 sender in background
903+
let v1_sender_task = tokio::spawn({
904+
let db = db.clone();
905+
async move { db.post_v1_request_and_wait_for_response(&id, b"request".to_vec()).await }
906+
});
907+
908+
// Small delay to let v1 request post
909+
tokio::time::sleep(Duration::from_millis(10)).await;
910+
911+
// First read should return the payload
912+
let res = db.wait_for_v2_payload(&id).await.expect("first read should succeed");
913+
assert_eq!(&res[..], b"request", "first read should return the payload");
914+
915+
// Subsequent reads should not return the plaintext payload again.
916+
assert!(
917+
matches!(db.wait_for_v2_payload(&id).await, Err(super::Error::AlreadyRead)),
918+
"subsequent reads should indicate the payload was already consumed"
919+
);
920+
921+
// Verify the payload was cleared from memory by checking directly
922+
{
923+
let guard = db.mailboxes.lock().await;
924+
let entry = guard.pending_v1.get(&id);
925+
assert!(
926+
entry.is_none_or(|e| e.payload.is_none()),
927+
"v1 payload should have been cleared after first read"
928+
);
929+
}
930+
931+
// V1 response flow should still work even after payload was cleared
932+
db.post_v1_response(&id, b"response".to_vec()).await.expect("posting response should succeed");
933+
934+
let res = v1_sender_task
935+
.await
936+
.expect("joining task should succeed")
937+
.expect("v1 sender should get response");
938+
assert_eq!(&res[..], b"response", "v1 sender should receive the response");
939+
940+
Ok(())
941+
}
942+
873943
// Simulate elapsed time deterministically by shifting stored timestamps
874944
// backward instead of sleeping. tokio::time::pause() can't be used because
875945
// prune compares against SystemTime (timestamps originate from disk).

payjoin-directory/src/db/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub enum Error<OperationalError: SendableError> {
1616
Operational(OperationalError),
1717
Timeout(tokio::time::error::Elapsed),
1818
OverCapacity,
19+
AlreadyRead,
1920
V1SenderUnavailable,
2021
}
2122

@@ -33,6 +34,7 @@ impl<E: SendableError> std::fmt::Display for Error<E> {
3334
Operational(error) => write!(f, "Db error: {error}"),
3435
Timeout(timeout) => write!(f, "Timeout: {timeout}"),
3536
OverCapacity => "Database over capacity".fmt(f),
37+
AlreadyRead => "Mailbox payload already read".fmt(f),
3638
V1SenderUnavailable => "Sender no longer connected".fmt(f),
3739
}
3840
}

payjoin-directory/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ fn handle_peek<Error: db::SendableError>(
456456
db::Error::OverCapacity => Err(HandlerError::ServiceUnavailable(anyhow::Error::msg(
457457
"mailbox storage at capacity",
458458
))),
459+
db::Error::AlreadyRead => Ok(timeout_response),
459460
db::Error::V1SenderUnavailable => Err(HandlerError::SenderGone(anyhow::Error::msg(
460461
"Sender is unavailable try a new request",
461462
))),

0 commit comments

Comments
 (0)