@@ -8,6 +8,13 @@ use rand::Rng;
88use tokio_listener:: { Listener , SystemOptions , UserOptions } ;
99use tower:: Service ;
1010use tracing:: info;
11+ pub mod ohttp;
12+
13+ use http_body_util:: combinators:: BoxBody ;
14+ use hyper:: body:: Bytes ;
15+ use hyper:: { Request , StatusCode } ;
16+ use ohttp:: { OhttpGatewayConfig , OhttpGatewayLayer } ;
17+ use tower:: { ServiceBuilder , ServiceExt } ;
1118
1219pub mod cli;
1320pub mod config;
@@ -16,6 +23,7 @@ pub mod config;
1623struct Services {
1724 directory : payjoin_directory:: Service < payjoin_directory:: FilesDb > ,
1825 relay : ohttp_relay:: Service ,
26+ sentinel_tag : SentinelTag ,
1927}
2028
2129pub async fn serve ( config : Config ) -> anyhow:: Result < ( ) > {
@@ -24,6 +32,7 @@ pub async fn serve(config: Config) -> anyhow::Result<()> {
2432 let services = Services {
2533 directory : init_directory ( & config, sentinel_tag) . await ?,
2634 relay : ohttp_relay:: Service :: new ( sentinel_tag) . await ,
35+ sentinel_tag,
2736 } ;
2837 let app = Router :: new ( ) . fallback ( route_request) . with_state ( services) ;
2938
@@ -56,6 +65,7 @@ pub async fn serve_manual_tls(
5665 let services = Services {
5766 directory : init_directory ( & config, sentinel_tag) . await ?,
5867 relay : ohttp_relay:: Service :: new_with_roots ( root_store, sentinel_tag) . await ,
68+ sentinel_tag,
5969 } ;
6070 let app = Router :: new ( ) . fallback ( route_request) . with_state ( services) ;
6171
@@ -119,22 +129,68 @@ fn init_ohttp_config(
119129 }
120130}
121131
122- async fn route_request (
123- State ( mut services) : State < Services > ,
124- req : axum:: extract:: Request ,
125- ) -> Response {
132+ async fn route_request ( State ( services) : State < Services > , req : axum:: extract:: Request ) -> Response {
126133 if is_relay_request ( & req) {
127- match services. relay . call ( req) . await {
134+ let mut relay = services. relay . clone ( ) ;
135+ match relay. call ( req) . await {
128136 Ok ( res) => res. into_response ( ) ,
129- Err ( e) => ( axum :: http :: StatusCode :: BAD_GATEWAY , e. to_string ( ) ) . into_response ( ) ,
137+ Err ( e) => ( StatusCode :: BAD_GATEWAY , e. to_string ( ) ) . into_response ( ) ,
130138 }
131139 } else {
132- // The directory service handles all other requests (including 404)
133- match services. directory . call ( req) . await {
134- Ok ( res) => res. into_response ( ) ,
135- Err ( e) =>
136- ( axum:: http:: StatusCode :: INTERNAL_SERVER_ERROR , e. to_string ( ) ) . into_response ( ) ,
140+ handle_directory_request ( services, req) . await
141+ }
142+ }
143+
144+ async fn handle_directory_request ( services : Services , req : axum:: extract:: Request ) -> Response {
145+ let ohttp_server = services. directory . ohttp . clone ( ) ;
146+
147+ let ohttp_config = OhttpGatewayConfig :: new ( ohttp_server, services. sentinel_tag ) ;
148+
149+ let ( parts, body) = req. into_parts ( ) ;
150+
151+ use http_body_util:: BodyExt as _;
152+
153+ let body_bytes = body
154+ . collect ( )
155+ . await
156+ . map_err ( |_| "Failed to collect body" )
157+ . expect ( "Failed to collect body" )
158+ . to_bytes ( ) ;
159+
160+ let boxed_body = BoxBody :: new ( http_body_util:: Full :: new ( body_bytes) ) ;
161+
162+ let hyper_req = Request :: from_parts ( parts, boxed_body) ;
163+
164+ let directory_service = tower:: service_fn ( {
165+ let directory = services. directory . clone ( ) ;
166+ move |req : Request < BoxBody < Bytes , hyper:: Error > > | {
167+ let mut dir = directory. clone ( ) ;
168+ async move {
169+ dir. call ( req) . await . map_err ( |e| {
170+ Box :: new ( std:: io:: Error :: other ( e. to_string ( ) ) )
171+ as Box < dyn std:: error:: Error + Send + Sync >
172+ } )
173+ }
137174 }
175+ } ) ;
176+
177+ let mut service_with_ohttp = ServiceBuilder :: new ( )
178+ . layer ( OhttpGatewayLayer :: new ( ohttp_config) )
179+ . service ( directory_service)
180+ . boxed_clone ( ) ;
181+
182+ match service_with_ohttp. ready ( ) . await {
183+ Ok ( ready_service) => match ready_service. call ( hyper_req) . await {
184+ Ok ( response) => {
185+ let ( parts, body) = response. into_parts ( ) ;
186+ let axum_body = axum:: body:: Body :: new ( body) ;
187+ Response :: from_parts ( parts, axum_body) . into_response ( )
188+ }
189+ Err ( e) =>
190+ ( StatusCode :: INTERNAL_SERVER_ERROR , format ! ( "Service error: {}" , e) ) . into_response ( ) ,
191+ } ,
192+ Err ( e) =>
193+ ( StatusCode :: INTERNAL_SERVER_ERROR , format ! ( "Service not ready: {}" , e) ) . into_response ( ) ,
138194 }
139195}
140196
0 commit comments