11use std:: {
22 net:: SocketAddr ,
3- sync:: { Arc , LazyLock , Mutex } ,
3+ sync:: {
4+ Arc , LazyLock , Mutex ,
5+ atomic:: { AtomicBool , Ordering } ,
6+ } ,
47} ;
58
69use defguard_version:: {
710 DefguardComponent , Version ,
811 server:: { DefguardVersionLayer , grpc:: DefguardVersionInterceptor } ,
912} ;
10- use tokio:: sync:: mpsc;
13+ use tokio:: sync:: { mpsc, oneshot } ;
1114use tokio_stream:: wrappers:: UnboundedReceiverStream ;
1215use tonic:: { Request , Response , Status , transport:: Server } ;
1316
1417use crate :: {
1518 CommsChannel , LogsReceiver , MIN_CORE_VERSION , VERSION ,
19+ config:: EnvConfig ,
1620 error:: ApiError ,
1721 grpc:: Configuration ,
1822 proto:: { CertificateInfo , DerPayload , LogEntry , proxy_setup_server} ,
@@ -32,6 +36,7 @@ pub(crate) struct ProxySetupServer {
3236 key_pair : Arc < Mutex < Option < defguard_certs:: RcGenKeyPair > > > ,
3337 logs_rx : LogsReceiver ,
3438 current_session_token : Arc < Mutex < Option < String > > > ,
39+ adoption_expired : Arc < AtomicBool > ,
3540}
3641
3742impl Clone for ProxySetupServer {
@@ -40,6 +45,7 @@ impl Clone for ProxySetupServer {
4045 key_pair : Arc :: clone ( & self . key_pair ) ,
4146 logs_rx : Arc :: clone ( & self . logs_rx ) ,
4247 current_session_token : Arc :: clone ( & self . current_session_token ) ,
48+ adoption_expired : Arc :: clone ( & self . adoption_expired ) ,
4349 }
4450 }
4551}
@@ -50,6 +56,7 @@ impl ProxySetupServer {
5056 key_pair : Arc :: new ( Mutex :: new ( None ) ) ,
5157 logs_rx,
5258 current_session_token : Arc :: new ( Mutex :: new ( None ) ) ,
59+ adoption_expired : Arc :: new ( AtomicBool :: new ( false ) ) ,
5360 }
5461 }
5562
@@ -59,14 +66,37 @@ impl ProxySetupServer {
5966 /// `GetCsr`, `SendCert`. The server shuts down as soon as `SendCert` deposits a
6067 /// `Configuration` into `SETUP_CHANNEL`, after which this function returns the received
6168 /// gRPC configuration (locally generated key pair and remotely signed certificate).
69+ ///
70+ /// A timeout is started in the background using `config.adoption_timeout()`. If the timeout
71+ /// elapses before setup completes, the `adoption_expired` flag is set and incoming `Start`
72+ /// requests are rejected with `failed_precondition` until the Edge is restarted.
73+ /// On successful adoption the timeout is cancelled.
6274 pub ( crate ) async fn await_initial_setup (
6375 & self ,
6476 addr : SocketAddr ,
77+ config : & EnvConfig ,
6578 ) -> Result < Configuration , anyhow:: Error > {
66- info ! ( "gRPC waiting for setup connection from Core on {addr}" ) ;
79+ let adoption_timeout = config. adoption_timeout ( ) ;
80+ info ! (
81+ "gRPC waiting for setup connection from Core on {addr} for {} min" ,
82+ adoption_timeout. as_secs( ) / 60
83+ ) ;
6784
85+ let adoption_expired = Arc :: clone ( & self . adoption_expired ) ;
86+ let ( cancel_tx, cancel_rx) = oneshot:: channel :: < ( ) > ( ) ;
87+ tokio:: spawn ( async move {
88+ tokio:: select! {
89+ _ = tokio:: time:: sleep( adoption_timeout) => {
90+ adoption_expired. store( true , Ordering :: Relaxed ) ;
91+ error!(
92+ "Edge adoption expired and is now blocked. Restart the Edge to enable adoption."
93+ ) ;
94+ }
95+ _ = cancel_rx => { }
96+ }
97+ } ) ;
6898 let own_version = Version :: parse ( VERSION ) ?;
69- debug ! ( "Proxy version: {}" , VERSION ) ;
99+ debug ! ( "Edge version: {}" , VERSION ) ;
70100
71101 let config_slot: Arc < tokio:: sync:: Mutex < Option < Configuration > > > =
72102 Arc :: new ( tokio:: sync:: Mutex :: new ( None ) ) ;
@@ -107,14 +137,17 @@ impl ProxySetupServer {
107137 ApiError :: Unexpected ( "No configuration received after setup" . into ( ) )
108138 } ) ?;
109139
140+ // Skip blocking Edge adoption if adoption was already done
141+ let _ = cancel_tx. send ( ( ) ) ;
142+
110143 Ok ( configuration)
111144 }
112145
113146 fn is_setup_in_progress ( & self ) -> bool {
114147 let in_progress = self
115148 . current_session_token
116149 . lock ( )
117- . expect ( "Failed to acquire lock on current session token during proxy setup" )
150+ . expect ( "Failed to acquire lock on current session token during Edge setup" )
118151 . is_some ( ) ;
119152 debug ! ( "Setup in progress check: {}" , in_progress) ;
120153 in_progress
@@ -124,7 +157,7 @@ impl ProxySetupServer {
124157 debug ! ( "Terminating setup session" ) ;
125158 self . current_session_token
126159 . lock ( )
127- . expect ( "Failed to acquire lock on current session token during proxy setup" )
160+ . expect ( "Failed to acquire lock on current session token during Edge setup" )
128161 . take ( ) ;
129162 debug ! ( "Setup session terminated" ) ;
130163 }
@@ -133,7 +166,7 @@ impl ProxySetupServer {
133166 debug ! ( "Establishing new setup session with Core" ) ;
134167 self . current_session_token
135168 . lock ( )
136- . expect ( "Failed to acquire lock on current session token during proxy setup" )
169+ . expect ( "Failed to acquire lock on current session token during Edge setup" )
137170 . replace ( token) ;
138171 debug ! ( "Setup session established" ) ;
139172 }
@@ -143,7 +176,7 @@ impl ProxySetupServer {
143176 let is_valid = ( * self
144177 . current_session_token
145178 . lock ( )
146- . expect ( "Failed to acquire lock on current session token during proxy setup" ) )
179+ . expect ( "Failed to acquire lock on current session token during Edge setup" ) )
147180 . as_ref ( )
148181 . is_some_and ( |t| t == token) ;
149182 debug ! ( "Authorization validation result: {}" , is_valid) ;
@@ -158,6 +191,12 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
158191 #[ instrument( skip( self , request) ) ]
159192 async fn start ( & self , request : Request < ( ) > ) -> Result < Response < Self :: StartStream > , Status > {
160193 debug ! ( "Core initiated setup process, preparing to stream logs" ) ;
194+ if self . adoption_expired . load ( Ordering :: Relaxed ) {
195+ let error_message =
196+ "Edge adoption expired and is now blocked. Restart the Edge to enable adoption." ;
197+ error ! ( "{error_message}" ) ;
198+ return Err ( Status :: failed_precondition ( error_message) ) ;
199+ }
161200 if self . is_setup_in_progress ( ) {
162201 error ! ( "Setup already in progress, rejecting new setup request" ) ;
163202 return Err ( Status :: resource_exhausted ( "Setup already in progress" ) ) ;
@@ -174,7 +213,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
174213 debug ! ( "Setup session authenticated successfully" ) ;
175214 self . initialize_setup_session ( token. to_string ( ) ) ;
176215
177- debug ! ( "Preparing to forward Proxy logs to Core in real-time" ) ;
216+ debug ! ( "Preparing to forward Edge logs to Core in real-time" ) ;
178217 let logs_rx = self . logs_rx . clone ( ) ;
179218
180219 let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
@@ -208,7 +247,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
208247 self_clone. clear_setup_session ( ) ;
209248 } ) ;
210249
211- debug ! ( "Log stream established, Core will now receive real-time Proxy logs" ) ;
250+ debug ! ( "Log stream established, Core will now receive real-time Edge logs" ) ;
212251 Ok ( Response :: new ( UnboundedReceiverStream :: new ( rx) ) )
213252 }
214253
@@ -274,7 +313,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
274313
275314 self . key_pair
276315 . lock ( )
277- . expect ( "Failed to acquire lock on key pair during proxy setup when trying to store generated key pair" )
316+ . expect ( "Failed to acquire lock on key pair during Edge setup when trying to store generated key pair" )
278317 . replace ( key_pair) ;
279318
280319 debug ! ( "Encoding Certificate Signing Request for transmission" ) ;
@@ -331,17 +370,17 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
331370 let key_pair = self
332371 . key_pair
333372 . lock ( )
334- . expect ( "Failed to acquire lock on key pair during proxy setup when trying to receive certificate" )
373+ . expect ( "Failed to acquire lock on key pair during Edge setup when trying to receive certificate" )
335374 . take ( ) ;
336375 if let Some ( kp) = key_pair {
337376 kp
338377 } else {
339378 error ! (
340- "Key pair not found during Proxy setup. Key pair generation step might have failed."
379+ "Key pair not found during Edge setup. Key pair generation step might have failed."
341380 ) ;
342381 self . clear_setup_session ( ) ;
343382 return Err ( Status :: internal (
344- "Key pair not found during Proxy setup. Key pair generation step might have failed." ,
383+ "Key pair not found during Edge setup. Key pair generation step might have failed." ,
345384 ) ) ;
346385 }
347386 } ;
@@ -353,7 +392,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
353392
354393 debug ! ( "Passing configuration to gRPC server for finalization" ) ;
355394 match SETUP_CHANNEL . 0 . lock ( ) . await . send ( Some ( configuration) ) . await {
356- Ok ( ( ) ) => info ! ( "Proxy configuration passed to gRPC server successfully" ) ,
395+ Ok ( ( ) ) => info ! ( "Edge configuration passed to gRPC server successfully" ) ,
357396 Err ( err) => {
358397 error ! ( "Failed to send configuration to gRPC server: {err}" ) ;
359398 self . clear_setup_session ( ) ;
0 commit comments