Skip to content

Commit 5981327

Browse files
committed
runtime: harden query lifecycle state handling
1 parent 922b7db commit 5981327

5 files changed

Lines changed: 123 additions & 18 deletions

File tree

src/api/janus_api.rs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ struct RunningQuery {
9999
historical_handles: Vec<thread::JoinHandle<()>>,
100100
baseline_handle: Option<thread::JoinHandle<()>>,
101101
live_handle: Option<thread::JoinHandle<()>>,
102-
mqtt_subscriber_handle: Option<thread::JoinHandle<()>>,
102+
mqtt_subscriber_handles: Vec<thread::JoinHandle<()>>,
103103
// shutdown sender signals used to stop the workers
104104
shutdown_senders: Vec<Sender<()>>,
105105
// MQTT subscriber instances (for stopping)
@@ -224,13 +224,13 @@ impl JanusApi {
224224
parsed.baseline.as_ref().map(|baseline| baseline.window_name.clone());
225225
let mut historical_handles = Vec::new();
226226
let mut shutdown_senders = Vec::new();
227-
let status = Arc::new(RwLock::new(
227+
let initial_status =
228228
if !parsed.live_windows.is_empty() && !parsed.historical_windows.is_empty() {
229229
ExecutionStatus::WarmingBaseline
230230
} else {
231231
ExecutionStatus::Running
232-
},
233-
));
232+
};
233+
let status = Arc::new(RwLock::new(initial_status.clone()));
234234

235235
// 4. Spawn historical worker threads (one per historical window)
236236
for (i, window) in parsed.historical_windows.iter().enumerate() {
@@ -308,7 +308,7 @@ impl JanusApi {
308308

309309
// 5. Spawn live worker thread and MQTT subscribers (if there are live windows)
310310
let mut mqtt_subscribers = Vec::new();
311-
let mut mqtt_subscriber_handle = None;
311+
let mut mqtt_subscriber_handles = Vec::new();
312312
let mut baseline_handle = None;
313313

314314
let live_handle = if !parsed.live_windows.is_empty() && !parsed.rspql_query.is_empty() {
@@ -354,6 +354,8 @@ impl JanusApi {
354354
let parsed_clone = parsed.clone();
355355
let processor_for_baseline = Arc::clone(&live_processor);
356356
let status_for_baseline = Arc::clone(&status);
357+
let registry_for_baseline = Arc::clone(&self.registry);
358+
let query_id_for_baseline = query_id.clone();
357359
let baseline_mode = effective_baseline_mode;
358360
let baseline_window = effective_baseline_window.clone();
359361
let (baseline_shutdown_tx, baseline_shutdown_rx) = mpsc::channel::<()>();
@@ -390,12 +392,18 @@ impl JanusApi {
390392
*state = ExecutionStatus::Running;
391393
}
392394
}
395+
let _ =
396+
registry_for_baseline.set_status(&query_id_for_baseline, "Running");
393397
}
394398
Err(err) => {
395399
eprintln!("Async baseline warm-up error: {}", err);
396400
if let Ok(mut state) = status_for_baseline.write() {
397401
*state = ExecutionStatus::Failed(err.to_string());
398402
}
403+
let _ = registry_for_baseline.set_status(
404+
&query_id_for_baseline,
405+
format!("Failed({err})"),
406+
);
399407
}
400408
}
401409
}));
@@ -431,7 +439,7 @@ impl JanusApi {
431439
});
432440

433441
mqtt_subscribers.push(subscriber);
434-
mqtt_subscriber_handle = Some(sub_handle);
442+
mqtt_subscriber_handles.push(sub_handle);
435443
}
436444

437445
// Spawn live worker thread to receive results
@@ -470,6 +478,21 @@ impl JanusApi {
470478
None
471479
};
472480

481+
self.registry.increment_execution_count(query_id).map_err(|e| {
482+
JanusApiError::RegistryError(format!(
483+
"Failed to increment execution count for '{}': {}",
484+
query_id, e
485+
))
486+
})?;
487+
self.registry
488+
.set_status(query_id, format!("{:?}", initial_status))
489+
.map_err(|e| {
490+
JanusApiError::RegistryError(format!(
491+
"Failed to update query status for '{}': {}",
492+
query_id, e
493+
))
494+
})?;
495+
473496
// 6. Store running query information
474497
let running = RunningQuery {
475498
metadata,
@@ -479,7 +502,7 @@ impl JanusApi {
479502
historical_handles,
480503
baseline_handle,
481504
live_handle,
482-
mqtt_subscriber_handle,
505+
mqtt_subscriber_handles,
483506
shutdown_senders,
484507
mqtt_subscribers,
485508
};
@@ -506,6 +529,7 @@ impl JanusApi {
506529
let running = running_map.remove(query_id).ok_or_else(|| {
507530
JanusApiError::ExecutionError(format!("Query '{}' is not running", query_id))
508531
})?;
532+
drop(running_map);
509533

510534
// Send shutdown signals
511535
for shutdown_tx in running.shutdown_senders {
@@ -521,6 +545,25 @@ impl JanusApi {
521545
if let Ok(mut status) = running.status.write() {
522546
*status = ExecutionStatus::Stopped;
523547
}
548+
self.registry.set_status(query_id, "Stopped").map_err(|e| {
549+
JanusApiError::RegistryError(format!(
550+
"Failed to update query status for '{}': {}",
551+
query_id, e
552+
))
553+
})?;
554+
555+
for handle in running.historical_handles {
556+
let _ = handle.join();
557+
}
558+
if let Some(handle) = running.baseline_handle {
559+
let _ = handle.join();
560+
}
561+
if let Some(handle) = running.live_handle {
562+
let _ = handle.join();
563+
}
564+
for handle in running.mqtt_subscriber_handles {
565+
let _ = handle.join();
566+
}
524567

525568
Ok(())
526569
}

src/http/server.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -296,15 +296,6 @@ async fn get_query(
296296
.ok_or_else(|| ApiError::NotFound(format!("Query '{}' not found", query_id)))?;
297297

298298
let is_running = state.janus_api.is_running(&query_id);
299-
let status = if is_running {
300-
state
301-
.janus_api
302-
.get_query_status(&query_id)
303-
.map(|s| format!("{:?}", s))
304-
.unwrap_or_else(|| "Unknown".to_string())
305-
} else {
306-
"Registered".to_string()
307-
};
308299

309300
Ok(Json(QueryDetailsResponse {
310301
query_id: metadata.query_id,
@@ -313,7 +304,7 @@ async fn get_query(
313304
registered_at: metadata.registered_at,
314305
execution_count: metadata.execution_count,
315306
is_running,
316-
status,
307+
status: metadata.status,
317308
}))
318309
}
319310

src/registry/query_registry.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub struct QueryMetadata {
1515
pub baseline_mode: BaselineBootstrapMode,
1616
pub registered_at: u64,
1717
pub execution_count: u64,
18+
pub status: String,
1819
pub subscribers: Vec<QueryId>,
1920
}
2021

@@ -102,6 +103,7 @@ impl QueryRegistry {
102103
baseline_mode,
103104
registered_at: Self::current_timestamp(),
104105
execution_count: 0,
106+
status: "Registered".to_string(),
105107
subscribers: Vec::new(),
106108
};
107109

@@ -145,6 +147,20 @@ impl QueryRegistry {
145147
Ok(())
146148
}
147149

150+
pub fn set_status(
151+
&self,
152+
query_id: &QueryId,
153+
status: impl Into<String>,
154+
) -> Result<(), QueryRegistryError> {
155+
let mut queries = self.queries.write().unwrap();
156+
let query = queries
157+
.get_mut(query_id)
158+
.ok_or_else(|| QueryRegistryError::QueryNotFound(query_id.clone()))?;
159+
160+
query.status = status.into();
161+
Ok(())
162+
}
163+
148164
/// To remove a query from the registry
149165
pub fn unregister(&self, query_id: &QueryId) -> Result<QueryMetadata, QueryRegistryError> {
150166
let mut queries = self.queries.write().unwrap();

tests/http_server_integration_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ async fn test_stop_route_stops_running_query_and_delete_requires_stop() {
226226
assert!(get_response.status().is_success());
227227
let get_body: Value = get_response.json().await.expect("invalid get response");
228228
assert_eq!(get_body["is_running"], false);
229-
assert_eq!(get_body["status"], "Registered");
229+
assert_eq!(get_body["status"], "Stopped");
230+
assert_eq!(get_body["execution_count"], 1);
230231

231232
let delete_response = server
232233
.client

tests/janus_api_integration_test.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,60 @@ fn test_stop_query() {
355355
);
356356
}
357357

358+
#[test]
359+
fn test_execution_count_and_status_update_across_lifecycle() {
360+
let storage = Arc::new(
361+
StreamingSegmentedStorage::new(StreamingConfig::default())
362+
.expect("Failed to create storage"),
363+
);
364+
let parser = JanusQLParser::new().expect("Failed to create parser");
365+
let registry = Arc::new(QueryRegistry::new());
366+
367+
let api =
368+
JanusApi::new(parser, Arc::clone(&registry), storage).expect("Failed to create API");
369+
370+
let janusql = r#"
371+
PREFIX ex: <http://example.org/>
372+
SELECT ?s
373+
FROM NAMED WINDOW ex:w ON STREAM ex:stream1 [RANGE 1000 STEP 200]
374+
WHERE { WINDOW ex:w { ?s ?p ?o } }
375+
"#;
376+
377+
let metadata = api
378+
.register_query("lifecycle_query".into(), janusql)
379+
.expect("Failed to register query");
380+
assert_eq!(metadata.execution_count, 0);
381+
assert_eq!(metadata.status, "Registered");
382+
383+
let _handle = api
384+
.start_query(&"lifecycle_query".into())
385+
.expect("Failed to start query");
386+
387+
let after_start = registry
388+
.get(&"lifecycle_query".into())
389+
.expect("query should exist after start");
390+
assert_eq!(after_start.execution_count, 1);
391+
assert_eq!(after_start.status, "Running");
392+
393+
api.stop_query(&"lifecycle_query".into()).expect("Failed to stop query");
394+
395+
let after_stop = registry
396+
.get(&"lifecycle_query".into())
397+
.expect("query should exist after stop");
398+
assert_eq!(after_stop.execution_count, 1);
399+
assert_eq!(after_stop.status, "Stopped");
400+
401+
let _handle = api
402+
.start_query(&"lifecycle_query".into())
403+
.expect("Failed to restart query");
404+
405+
let after_restart = registry
406+
.get(&"lifecycle_query".into())
407+
.expect("query should exist after restart");
408+
assert_eq!(after_restart.execution_count, 2);
409+
assert_eq!(after_restart.status, "Running");
410+
}
411+
358412
#[test]
359413
fn test_multiple_queries_concurrent() {
360414
let storage = create_test_storage_with_data().expect("Failed to create storage");

0 commit comments

Comments
 (0)