55
66use crate :: {
77 api:: janus_api:: { JanusApi , JanusApiError , QueryHandle , QueryResult , ResultSource } ,
8- parsing:: janusql_parser:: JanusQLParser ,
9- parsing:: rdf_parser,
108 registry:: query_registry:: { QueryId , QueryRegistry } ,
119 storage:: segmented_storage:: StreamingSegmentedStorage ,
1210 stream_bus:: { BrokerType , MqttConfig , StreamBus , StreamBusConfig } ,
@@ -30,9 +28,11 @@ use std::{
3028 } ,
3129 time:: Instant ,
3230} ;
33- use tokio:: sync:: mpsc ;
31+ use tokio:: sync:: broadcast ;
3432use tower_http:: cors:: { Any , CorsLayer } ;
3533
34+ const RESULT_BROADCAST_CAPACITY : usize = 1024 ;
35+
3636/// Request to register a new query
3737#[ derive( Debug , Deserialize ) ]
3838pub struct RegisterQueryRequest {
@@ -139,7 +139,12 @@ pub struct AppState {
139139 pub registry : Arc < QueryRegistry > ,
140140 pub storage : Arc < StreamingSegmentedStorage > ,
141141 pub replay_state : Arc < Mutex < ReplayState > > ,
142- pub query_handles : Arc < Mutex < HashMap < QueryId , Arc < Mutex < QueryHandle > > > > > ,
142+ pub query_streams : Arc < Mutex < HashMap < QueryId , QueryResultBroadcast > > > ,
143+ }
144+
145+ #[ derive( Clone ) ]
146+ pub struct QueryResultBroadcast {
147+ pub sender : broadcast:: Sender < QueryResult > ,
143148}
144149
145150pub struct ReplayState {
@@ -204,30 +209,42 @@ pub fn create_server(
204209 registry : Arc < QueryRegistry > ,
205210 storage : Arc < StreamingSegmentedStorage > ,
206211) -> Router {
212+ create_server_with_state ( janus_api, registry, storage) . 0
213+ }
214+
215+ /// Create the HTTP server and return the shared state for testing/integration.
216+ pub fn create_server_with_state (
217+ janus_api : Arc < JanusApi > ,
218+ registry : Arc < QueryRegistry > ,
219+ storage : Arc < StreamingSegmentedStorage > ,
220+ ) -> ( Router , Arc < AppState > ) {
207221 let state = Arc :: new ( AppState {
208222 janus_api,
209223 registry,
210224 storage,
211225 replay_state : Arc :: new ( Mutex :: new ( ReplayState :: default ( ) ) ) ,
212- query_handles : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
226+ query_streams : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
213227 } ) ;
214228
215229 // Configure CORS
216230 let cors = CorsLayer :: new ( ) . allow_origin ( Any ) . allow_methods ( Any ) . allow_headers ( Any ) ;
217231
218- Router :: new ( )
232+ let router = Router :: new ( )
219233 . route ( "/api/queries" , post ( register_query) )
220234 . route ( "/api/queries" , get ( list_queries) )
221235 . route ( "/api/queries/:id" , get ( get_query) )
222- . route ( "/api/queries/:id" , delete ( stop_query ) )
236+ . route ( "/api/queries/:id" , delete ( delete_query ) )
223237 . route ( "/api/queries/:id/start" , post ( start_query) )
238+ . route ( "/api/queries/:id/stop" , post ( stop_query) )
224239 . route ( "/api/queries/:id/results" , get ( stream_results) )
225240 . route ( "/api/replay/start" , post ( start_replay) )
226241 . route ( "/api/replay/stop" , post ( stop_replay) )
227242 . route ( "/api/replay/status" , get ( replay_status) )
228243 . route ( "/health" , get ( health_check) )
229244 . layer ( cors)
230- . with_state ( state)
245+ . with_state ( Arc :: clone ( & state) ) ;
246+
247+ ( router, state)
231248}
232249
233250/// Health check endpoint
@@ -297,34 +314,59 @@ async fn start_query(
297314 Path ( query_id) : Path < String > ,
298315) -> Result < Json < SuccessResponse > , ApiError > {
299316 let handle = state. janus_api . start_query ( & query_id) ?;
317+ let ( sender, _) = broadcast:: channel ( RESULT_BROADCAST_CAPACITY ) ;
318+ let sender_for_forwarder = sender. clone ( ) ;
319+
320+ std:: thread:: spawn ( move || forward_query_results ( handle, sender_for_forwarder) ) ;
300321
301- // Store the handle for WebSocket streaming
302322 state
303- . query_handles
323+ . query_streams
304324 . lock ( )
305325 . unwrap ( )
306- . insert ( query_id. clone ( ) , Arc :: new ( Mutex :: new ( handle ) ) ) ;
326+ . insert ( query_id. clone ( ) , QueryResultBroadcast { sender } ) ;
307327
308328 Ok ( Json ( SuccessResponse {
309329 message : format ! ( "Query '{}' started successfully" , query_id) ,
310330 } ) )
311331}
312332
313- /// DELETE /api/queries/:id - Stop a running query
333+ /// POST /api/queries/:id/stop - Stop a running query
314334async fn stop_query (
315335 State ( state) : State < Arc < AppState > > ,
316336 Path ( query_id) : Path < String > ,
317337) -> Result < Json < SuccessResponse > , ApiError > {
318338 state. janus_api . stop_query ( & query_id) ?;
319339
320- // Remove the handle
321- state. query_handles . lock ( ) . unwrap ( ) . remove ( & query_id) ;
340+ state. query_streams . lock ( ) . unwrap ( ) . remove ( & query_id) ;
322341
323342 Ok ( Json ( SuccessResponse {
324343 message : format ! ( "Query '{}' stopped successfully" , query_id) ,
325344 } ) )
326345}
327346
347+ /// DELETE /api/queries/:id - Unregister a query from the registry.
348+ async fn delete_query (
349+ State ( state) : State < Arc < AppState > > ,
350+ Path ( query_id) : Path < String > ,
351+ ) -> Result < Json < SuccessResponse > , ApiError > {
352+ if state. janus_api . is_running ( & query_id) {
353+ return Err ( ApiError :: BadRequest ( format ! (
354+ "Query '{}' is running. Stop it before deleting." ,
355+ query_id
356+ ) ) ) ;
357+ }
358+
359+ state
360+ . registry
361+ . unregister ( & query_id)
362+ . map_err ( |e| ApiError :: NotFound ( e. to_string ( ) ) ) ?;
363+ state. query_streams . lock ( ) . unwrap ( ) . remove ( & query_id) ;
364+
365+ Ok ( Json ( SuccessResponse {
366+ message : format ! ( "Query '{}' deleted successfully" , query_id) ,
367+ } ) )
368+ }
369+
328370/// WS /api/queries/:id/results - Stream query results via WebSocket
329371async fn stream_results (
330372 ws : WebSocketUpgrade ,
@@ -336,49 +378,54 @@ async fn stream_results(
336378 return Err ( ApiError :: NotFound ( format ! ( "Query '{}' not found" , query_id) ) ) ;
337379 }
338380
339- Ok ( ws. on_upgrade ( move |socket| handle_websocket ( socket, state, query_id) ) )
340- }
341-
342- async fn handle_websocket ( mut socket : WebSocket , state : Arc < AppState > , query_id : String ) {
343- // Create a channel for results
344- let ( tx, mut rx) = mpsc:: unbounded_channel :: < QueryResult > ( ) ;
345-
346- // Spawn a task to receive results from the query handle
347- let handles = state. query_handles . clone ( ) ;
348- let query_id_clone = query_id. clone ( ) ;
381+ let sender = state
382+ . query_streams
383+ . lock ( )
384+ . unwrap ( )
385+ . get ( & query_id)
386+ . map ( |stream| stream. sender . clone ( ) )
387+ . ok_or_else ( || {
388+ ApiError :: BadRequest ( format ! (
389+ "Query '{}' is not running. Start it before subscribing to results." ,
390+ query_id
391+ ) )
392+ } ) ?;
349393
350- tokio:: spawn ( async move {
351- loop {
352- // Try to get the query handle
353- let handle_opt = {
354- let handles_lock = handles. lock ( ) . unwrap ( ) ;
355- handles_lock. get ( & query_id_clone) . cloned ( )
356- } ;
394+ Ok ( ws. on_upgrade ( move |socket| handle_websocket ( socket, sender. subscribe ( ) , query_id) ) )
395+ }
357396
358- if let Some ( handle_arc) = handle_opt {
359- let handle = handle_arc. lock ( ) . unwrap ( ) ;
397+ fn forward_query_results ( handle : QueryHandle , sender : broadcast:: Sender < QueryResult > ) {
398+ while let Some ( result) = handle. receive ( ) {
399+ let _ = sender. send ( result) ;
400+ }
401+ }
360402
361- // Non-blocking receive
362- if let Some ( result) = handle. try_receive ( ) {
363- if tx. send ( result) . is_err ( ) {
364- break ;
365- }
403+ async fn handle_websocket (
404+ mut socket : WebSocket ,
405+ mut receiver : broadcast:: Receiver < QueryResult > ,
406+ query_id : String ,
407+ ) {
408+ loop {
409+ let result = match receiver. recv ( ) . await {
410+ Ok ( result) => result,
411+ Err ( broadcast:: error:: RecvError :: Closed ) => break ,
412+ Err ( broadcast:: error:: RecvError :: Lagged ( skipped) ) => {
413+ let warning = serde_json:: json!( {
414+ "query_id" : query_id,
415+ "type" : "lagged" ,
416+ "dropped_messages" : skipped,
417+ } ) ;
418+ if socket. send ( Message :: Text ( warning. to_string ( ) ) ) . await . is_err ( ) {
419+ break ;
366420 }
367- } else {
368- // Query handle not found, wait a bit and retry
369- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
421+ continue ;
370422 }
423+ } ;
371424
372- // Small delay to prevent busy waiting
373- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 10 ) ) . await ;
374- }
375- } ) ;
376-
377- // Send results to WebSocket
378- while let Some ( result) = rx. recv ( ) . await {
379425 let json_result = serde_json:: json!( {
380426 "query_id" : result. query_id,
381427 "timestamp" : result. timestamp,
428+ "type" : "result" ,
382429 "source" : match result. source {
383430 ResultSource :: Historical => "historical" ,
384431 ResultSource :: Live => "live" ,
@@ -567,7 +614,8 @@ pub async fn start_server(
567614 println ! ( " GET /api/queries - List all registered queries" ) ;
568615 println ! ( " GET /api/queries/:id - Get query details" ) ;
569616 println ! ( " POST /api/queries/:id/start - Start executing a query" ) ;
570- println ! ( " DELETE /api/queries/:id - Stop a running query" ) ;
617+ println ! ( " POST /api/queries/:id/stop - Stop a running query" ) ;
618+ println ! ( " DELETE /api/queries/:id - Delete a stopped query" ) ;
571619 println ! ( " WS /api/queries/:id/results - Stream query results (WebSocket)" ) ;
572620 println ! ( " POST /api/replay/start - Start stream bus replay" ) ;
573621 println ! ( " POST /api/replay/stop - Stop stream bus replay" ) ;
0 commit comments