Skip to content

Commit 93eeb8b

Browse files
committed
OHTTP keys should be rotated
This pr addresses #445. It implements OHTTP-key rotation to payjoin-mailroom Mailroom operators can now decide the time interval for keys to be rotated. Also if a key has expired, a 422 error is returned to clients. Clients can handle they key-rotation via the cach-control header returned by the directory.
1 parent 9ed621f commit 93eeb8b

5 files changed

Lines changed: 373 additions & 55 deletions

File tree

payjoin-mailroom/src/config.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ pub struct Config {
1212
pub storage_dir: PathBuf,
1313
#[serde(deserialize_with = "deserialize_duration_secs")]
1414
pub timeout: Duration,
15+
#[serde(deserialize_with = "deserialize_optional_duration_secs")]
16+
pub ohttp_keys_max_age: Option<Duration>,
1517
pub v1: Option<V1Config>,
1618
#[cfg(feature = "telemetry")]
1719
pub telemetry: Option<TelemetryConfig>,
@@ -85,6 +87,7 @@ impl Default for Config {
8587
listener: "[::]:8080".parse().expect("valid default listener address"),
8688
storage_dir: PathBuf::from("./data"),
8789
timeout: Duration::from_secs(30),
90+
ohttp_keys_max_age: Some(Duration::from_secs(7 * 24 * 60 * 60)),
8891
v1: None,
8992
#[cfg(feature = "telemetry")]
9093
telemetry: None,
@@ -104,17 +107,33 @@ where
104107
Ok(Duration::from_secs(secs))
105108
}
106109

110+
fn deserialize_optional_duration_secs<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
111+
where
112+
D: serde::Deserializer<'de>,
113+
{
114+
let secs: Option<u64> = Option::deserialize(deserializer)?;
115+
match secs {
116+
None => Ok(None),
117+
Some(0) => Err(<D::Error as serde::de::Error>::custom(
118+
"ohttp_keys_max_age must be greater than 0 seconds when set",
119+
)),
120+
Some(s) => Ok(Some(Duration::from_secs(s))),
121+
}
122+
}
123+
107124
impl Config {
108125
pub fn new(
109126
listener: ListenerAddress,
110127
storage_dir: PathBuf,
111128
timeout: Duration,
129+
ohttp_keys_max_age: Option<Duration>,
112130
v1: Option<V1Config>,
113131
) -> Self {
114132
Self {
115133
listener,
116134
storage_dir,
117135
timeout,
136+
ohttp_keys_max_age,
118137
v1,
119138
#[cfg(feature = "telemetry")]
120139
telemetry: None,

payjoin-mailroom/src/directory.rs

Lines changed: 192 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1+
use std::path::PathBuf;
12
use std::pin::Pin;
23
use std::str::FromStr;
34
use std::sync::Arc;
45
use std::task::{Context, Poll};
6+
use std::time::{Duration, Instant};
57

68
use anyhow::Result;
79
use axum::body::{Body, Bytes};
8-
use axum::http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, CONTENT_TYPE};
10+
use axum::http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL, CONTENT_TYPE};
911
use axum::http::{Method, Request, Response, StatusCode, Uri};
1012
use http_body_util::BodyExt;
1113
use payjoin::directory::{ShortId, ShortIdError, ENCAPSULATED_MESSAGE_BYTES};
14+
use tokio::sync::RwLock;
1215
use tracing::{debug, error, trace, warn};
1316

1417
use crate::db::{Db, Error as DbError, SendableError};
@@ -28,6 +31,97 @@ const V1_VERSION_UNSUPPORTED_RES_JSON: &str =
2831

2932
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
3033

34+
// Two-slot OHTTP key set supporting rotation overlap.
35+
//
36+
// Key IDs alternate between 1 and 2. The current key is served to new
37+
// clients; both slots are accepted for decapsulation so that clients
38+
// with a cached previous key still work during the overlap window.
39+
#[derive(Debug)]
40+
pub struct KeyRotatingServer {
41+
keys: [Option<ohttp::Server>; 2],
42+
current_key_id: u8,
43+
current_key_created_at: Instant,
44+
}
45+
46+
impl KeyRotatingServer {
47+
pub fn from_single(server: ohttp::Server, key_id: u8) -> Self {
48+
assert!(key_id == 1 || key_id == 2, "key_id must be 1 or 2");
49+
let mut keys = [None, None];
50+
keys[(key_id - 1) as usize] = Some(server);
51+
Self { current_key_id: key_id, keys, current_key_created_at: Instant::now() }
52+
}
53+
54+
pub fn from_pair(
55+
current: (u8, ohttp::Server),
56+
previous: Option<(u8, ohttp::Server)>,
57+
current_key_age: Duration,
58+
) -> Self {
59+
assert!(current.0 == 1 || current.0 == 2, "key_id must be 1 or 2");
60+
let mut keys = [None, None];
61+
keys[(current.0 - 1) as usize] = Some(current.1);
62+
if let Some((id, server)) = previous {
63+
assert!(id == 1 || id == 2, "key_id must be 1 or 2");
64+
keys[(id - 1) as usize] = Some(server);
65+
}
66+
let created_at = Instant::now().checked_sub(current_key_age).unwrap_or_else(Instant::now);
67+
Self { current_key_id: current.0, keys, current_key_created_at: created_at }
68+
}
69+
70+
pub fn current_key_id(&self) -> u8 { self.current_key_id }
71+
pub fn current_key_created_at(&self) -> Instant { self.current_key_created_at }
72+
pub fn next_key_id(&self) -> u8 {
73+
if self.current_key_id == 1 {
74+
2
75+
} else {
76+
1
77+
}
78+
}
79+
80+
// Look up the server matching the key_id in an OHTTP message and
81+
// decapsulate. The first byte of an OHTTP encapsulated request is the
82+
// key identifier (RFC 9458 Section 4.3).
83+
pub fn decapsulate(
84+
&self,
85+
ohttp_body: &[u8],
86+
) -> std::result::Result<(Vec<u8>, ohttp::ServerResponse), ohttp::Error> {
87+
let key_id = ohttp_body.first().copied().unwrap_or(0);
88+
let server = key_id
89+
.checked_sub(1)
90+
.filter(|&i| (i as usize) < 2)
91+
.and_then(|i| self.keys[i as usize].as_ref());
92+
match server {
93+
Some(s) => s.decapsulate(ohttp_body),
94+
None => Err(ohttp::Error::KeyId),
95+
}
96+
}
97+
98+
// Encode the current key's config for serving to clients.
99+
pub fn encode_current(&self) -> std::result::Result<Vec<u8>, ohttp::Error> {
100+
self.keys[(self.current_key_id - 1) as usize]
101+
.as_ref()
102+
.expect("current key must exist")
103+
.config()
104+
.encode()
105+
}
106+
107+
// Install a new key as current, displacing whatever occupied that slot.
108+
// The old current key remains in its slot for overlap decapsulation
109+
// until retire() clears it.
110+
pub fn rotate(&mut self, server: ohttp::Server) {
111+
let new_key_id = self.next_key_id();
112+
self.keys[(new_key_id - 1) as usize] = Some(server);
113+
self.current_key_id = new_key_id;
114+
self.current_key_created_at = Instant::now();
115+
}
116+
117+
// Clear a key slot so it is no longer accepted for decapsulation.
118+
pub fn retire(&mut self, key_id: u8) {
119+
assert!(key_id == 1 || key_id == 2, "key_id must be 1 or 2");
120+
assert_ne!(key_id, self.current_key_id, "cannot retire the current key");
121+
self.keys[(key_id - 1) as usize] = None;
122+
}
123+
}
124+
31125
/// Opaque blocklist of Bitcoin addresses stored as script pubkeys.
32126
///
33127
/// Addresses are converted to `ScriptBuf` at parse time so that
@@ -91,7 +185,8 @@ fn parse_address_lines(text: &str) -> std::collections::HashSet<bitcoin::ScriptB
91185
#[derive(Clone)]
92186
pub struct Service<D: Db> {
93187
db: D,
94-
ohttp: ohttp::Server,
188+
ohttp: Arc<RwLock<KeyRotatingServer>>,
189+
ohttp_keys_max_age: Option<Duration>,
95190
sentinel_tag: SentinelTag,
96191
v1: Option<V1>,
97192
}
@@ -117,10 +212,18 @@ where
117212
}
118213

119214
impl<D: Db> Service<D> {
120-
pub fn new(db: D, ohttp: ohttp::Server, sentinel_tag: SentinelTag, v1: Option<V1>) -> Self {
121-
Self { db, ohttp, sentinel_tag, v1 }
215+
pub fn new(
216+
db: D,
217+
ohttp: Arc<RwLock<KeyRotatingServer>>,
218+
ohttp_keys_max_age: Option<Duration>,
219+
sentinel_tag: SentinelTag,
220+
v1: Option<V1>,
221+
) -> Self {
222+
Self { db, ohttp, ohttp_keys_max_age, sentinel_tag, v1 }
122223
}
123224

225+
pub fn ohttp_key_set(&self) -> &Arc<RwLock<KeyRotatingServer>> { &self.ohttp }
226+
124227
async fn serve_request<B>(&self, req: Request<B>) -> Result<Response<Body>>
125228
where
126229
B: axum::body::HttpBody<Data = Bytes> + Send + 'static,
@@ -200,11 +303,15 @@ impl<D: Db> Service<D> {
200303
.map_err(|e| HandlerError::BadRequest(anyhow::anyhow!(e.into())))?
201304
.to_bytes();
202305

203-
// Decapsulate OHTTP request
204-
let (bhttp_req, res_ctx) = self
205-
.ohttp
206-
.decapsulate(&ohttp_body)
207-
.map_err(|e| HandlerError::OhttpKeyRejection(e.into()))?;
306+
// Decapsulate OHTTP request using the key matching the message's key_id.
307+
// Drop the read guard immediately so long-polling handlers don't
308+
// block key rotation or other readers waiting behind a queued writer.
309+
let (bhttp_req, res_ctx) = {
310+
let keyset = self.ohttp.read().await;
311+
keyset
312+
.decapsulate(&ohttp_body)
313+
.map_err(|e| HandlerError::OhttpKeyRejection(e.into()))?
314+
};
208315
let mut cursor = std::io::Cursor::new(bhttp_req);
209316
let req = bhttp::Message::read_bhttp(&mut cursor)
210317
.map_err(|e| HandlerError::BadRequest(e.into()))?;
@@ -378,13 +485,22 @@ impl<D: Db> Service<D> {
378485
}
379486

380487
async fn get_ohttp_keys(&self) -> Result<Response<Body>, HandlerError> {
381-
let ohttp_keys = self
382-
.ohttp
383-
.config()
384-
.encode()
385-
.map_err(|e| HandlerError::InternalServerError(e.into()))?;
488+
let keyset = self.ohttp.read().await;
489+
let ohttp_keys =
490+
keyset.encode_current().map_err(|e| HandlerError::InternalServerError(e.into()))?;
386491
let mut res = Response::new(full(ohttp_keys));
387492
res.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/ohttp-keys"));
493+
if let Some(max_age) = self.ohttp_keys_max_age {
494+
let remaining = max_age.saturating_sub(keyset.current_key_created_at().elapsed());
495+
res.headers_mut().insert(
496+
CACHE_CONTROL,
497+
HeaderValue::from_str(&format!(
498+
"public, s-maxage={}, immutable",
499+
remaining.as_secs()
500+
))
501+
.expect("valid header value"),
502+
);
503+
}
388504
Ok(res)
389505
}
390506

@@ -412,6 +528,60 @@ impl<D: Db> Service<D> {
412528
}
413529
}
414530

531+
// Grace period after rotation during which the old key is still
532+
// accepted. Accounts for network latency and reasonable clock skew
533+
// without letting two valid key IDs persist long enough to fingerprint
534+
// clients by which key they present.
535+
const ROTATION_OVERLAP: Duration = Duration::from_secs(30);
536+
537+
// Background task that rotates OHTTP keys on a fixed interval.
538+
//
539+
// Every interval the task generates a fresh key, persists it to
540+
// keys_dir, and installs it as the current key. The previous key is
541+
// accepted for ROTATION_OVERLAP after rotation, then retired so that
542+
// only a single valid key exists most of the time.
543+
pub fn spawn_key_rotation(
544+
keyset: Arc<RwLock<KeyRotatingServer>>,
545+
keys_dir: PathBuf,
546+
interval: Duration,
547+
) {
548+
tokio::spawn(async move {
549+
loop {
550+
tokio::time::sleep(interval).await;
551+
552+
let new_key_id = keyset.read().await.next_key_id();
553+
554+
let config = match crate::key_config::gen_ohttp_server_config_with_id(new_key_id) {
555+
Ok(c) => c,
556+
Err(e) => {
557+
tracing::error!("Failed to generate OHTTP key: {e}");
558+
continue;
559+
}
560+
};
561+
let _ = tokio::fs::remove_file(keys_dir.join(format!("{new_key_id}.ikm"))).await;
562+
if let Err(e) = crate::key_config::persist_key_config(&config, &keys_dir) {
563+
tracing::error!("Failed to persist OHTTP key: {e}");
564+
continue;
565+
}
566+
567+
let old_key_id = {
568+
let mut ks = keyset.write().await;
569+
let old = ks.current_key_id();
570+
ks.rotate(config.into_server());
571+
old
572+
};
573+
tracing::info!("Rotated OHTTP keys: key_id {old_key_id} -> {new_key_id}");
574+
575+
tokio::time::sleep(ROTATION_OVERLAP).await;
576+
577+
keyset.write().await.retire(old_key_id);
578+
let old_path = keys_dir.join(format!("{old_key_id}.ikm"));
579+
let _ = tokio::fs::remove_file(&old_path).await;
580+
tracing::info!("Retired OHTTP key_id {old_key_id}");
581+
}
582+
});
583+
}
584+
415585
fn handle_peek<E: SendableError>(
416586
result: Result<Arc<Vec<u8>>, DbError<E>>,
417587
timeout_response: Response<Body>,
@@ -485,8 +655,8 @@ impl HandlerError {
485655
}
486656
HandlerError::OhttpKeyRejection(e) => {
487657
const OHTTP_KEY_REJECTION_RES_JSON: &str = r#"{"type":"https://iana.org/assignments/http-problem-types#ohttp-key", "title": "key identifier unknown"}"#;
488-
warn!("Bad request: Key configuration rejected: {}", e);
489-
*res.status_mut() = StatusCode::BAD_REQUEST;
658+
warn!("Key configuration rejected: {}", e);
659+
*res.status_mut() = StatusCode::UNPROCESSABLE_ENTITY;
490660
res.headers_mut()
491661
.insert(CONTENT_TYPE, HeaderValue::from_static("application/problem+json"));
492662
*res.body_mut() = full(OHTTP_KEY_REJECTION_RES_JSON);
@@ -592,9 +762,9 @@ mod tests {
592762
async fn test_service(v1: Option<V1>) -> Service<FilesDb> {
593763
let dir = tempfile::tempdir().expect("tempdir");
594764
let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init");
595-
let ohttp: ohttp::Server =
596-
crate::key_config::gen_ohttp_server_config().expect("ohttp config").into();
597-
Service::new(db, ohttp, SentinelTag::new([0u8; 32]), v1)
765+
let config = crate::key_config::gen_ohttp_server_config().expect("ohttp config");
766+
let keyset = Arc::new(RwLock::new(KeyRotatingServer::from_single(config.into_server(), 1)));
767+
Service::new(db, keyset, None, SentinelTag::new([0u8; 32]), v1)
598768
}
599769

600770
/// A valid ShortId encoded as bech32 for use in URL paths.
@@ -826,9 +996,9 @@ mod tests {
826996
let dir = tempfile::tempdir().expect("tempdir");
827997
let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init");
828998
let db = MetricsDb::new(db, metrics);
829-
let ohttp: ohttp::Server =
830-
crate::key_config::gen_ohttp_server_config().expect("ohttp config").into();
831-
let svc = Service::new(db, ohttp, SentinelTag::new([0u8; 32]), None);
999+
let config = crate::key_config::gen_ohttp_server_config().expect("ohttp config");
1000+
let keyset = Arc::new(RwLock::new(KeyRotatingServer::from_single(config.into_server(), 1)));
1001+
let svc = Service::new(db, keyset, None, SentinelTag::new([0u8; 32]), None);
8321002

8331003
let id = valid_short_id_path();
8341004
let res = svc

0 commit comments

Comments
 (0)