Skip to content

Commit d1417a0

Browse files
committed
Evaluate live extension functions in Janus
1 parent aebd76f commit d1417a0

3 files changed

Lines changed: 236 additions & 4 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Live Extension Function Architecture
2+
3+
This document describes how Janus executes Janus-specific extension functions for live queries
4+
without modifying the upstream `rsp-rs` crate.
5+
6+
## Flow
7+
8+
```mermaid
9+
flowchart TD
10+
A["JanusQL query"] --> B["JanusQLParser"]
11+
B --> C["Historical windows + SPARQL"]
12+
B --> D["Live windows + RSP-QL"]
13+
14+
D --> E["LiveStreamProcessing"]
15+
E --> F["rsp-rs RSPEngine::initialize()"]
16+
F --> G["rsp-rs stream registry"]
17+
F --> H["rsp-rs CSPARQL windows"]
18+
19+
I["MQTT / live RDF events"] --> G
20+
G --> H
21+
22+
H --> J["Janus window subscriptions"]
23+
J --> K["Merge emitted window content with sibling windows"]
24+
K --> L["Add mirrored static background quads"]
25+
L --> M["Oxigraph Store"]
26+
M --> N["build_evaluator()"]
27+
N --> O["Oxigraph SPARQL execution"]
28+
O --> P["Janus extension functions"]
29+
P --> Q["BindingWithTimestamp / QueryResult"]
30+
31+
C --> R["HistoricalExecutor"]
32+
R --> S["OxigraphAdapter"]
33+
S --> N
34+
```
35+
36+
## Responsibilities
37+
38+
- `rsp-rs`
39+
- stream ingestion
40+
- timestamp-driven window lifecycle
41+
- window materialization
42+
- window closure notifications
43+
44+
- `Janus`
45+
- JanusQL parsing
46+
- historical execution
47+
- live query orchestration
48+
- Janus-specific custom function registration through `build_evaluator()`
49+
- final SPARQL evaluation for both historical and live paths
50+
51+
## Why this design
52+
53+
- Keeps `rsp-rs` minimal and reusable.
54+
- Avoids a Janus-specific fork or API expansion in `rsp-rs`.
55+
- Lets Janus use the same extension-function mechanism on both historical and live queries.
56+
- Intercepts at the materialized-window stage, so Janus does not re-evaluate already-produced live
57+
bindings. Instead, it performs the final SPARQL evaluation itself once per emitted window.

src/stream/live_stream_processing.rs

Lines changed: 135 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44
//! It integrates RSP-QL query execution with Janus's RDFEvent data model.
55
66
use crate::core::RDFEvent;
7+
use crate::extensions::query_options::build_evaluator;
78
use oxigraph::model::{GraphName, NamedNode, Quad, Term};
8-
use rsp_rs::{BindingWithTimestamp, RDFStream, RSPEngine};
9-
use std::collections::HashMap;
9+
use oxigraph::sparql::QueryResults;
10+
use oxigraph::store::Store;
11+
use rsp_rs::{BindingWithTimestamp, RDFStream, RSPEngine, StreamType};
12+
use std::collections::{HashMap, HashSet};
1013
use std::sync::mpsc::{Receiver, RecvError};
14+
use std::sync::{mpsc, Arc, Mutex};
1115

1216
/// Live stream processing engine for RSP-QL queries
1317
pub struct LiveStreamProcessing {
@@ -17,6 +21,8 @@ pub struct LiveStreamProcessing {
1721
streams: HashMap<String, RDFStream>,
1822
/// Result receiver for query results
1923
result_receiver: Option<Receiver<BindingWithTimestamp>>,
24+
/// Static quads mirrored in Janus for Janus-side live query evaluation.
25+
static_data: Arc<Mutex<HashSet<Quad>>>,
2026
/// Flag indicating if processing has started
2127
processing_started: bool,
2228
}
@@ -81,6 +87,7 @@ impl LiveStreamProcessing {
8187
engine,
8288
streams: HashMap::new(),
8389
result_receiver: None,
90+
static_data: Arc::new(Mutex::new(HashSet::new())),
8491
processing_started: false,
8592
})
8693
}
@@ -117,7 +124,7 @@ impl LiveStreamProcessing {
117124
return Err(LiveStreamProcessingError("Processing already started".to_string()));
118125
}
119126

120-
let receiver = self.engine.start_processing();
127+
let receiver = self.register_live_callbacks()?;
121128
self.result_receiver = Some(receiver);
122129
self.processing_started = true;
123130

@@ -265,7 +272,8 @@ impl LiveStreamProcessing {
265272
/// * `event` - RDFEvent representing static knowledge
266273
pub fn add_static_data(&mut self, event: RDFEvent) -> Result<(), LiveStreamProcessingError> {
267274
let quad = self.rdf_event_to_quad(&event)?;
268-
self.engine.add_static_data(quad);
275+
self.engine.add_static_data(quad.clone());
276+
self.static_data.lock().unwrap().insert(quad);
269277
Ok(())
270278
}
271279

@@ -420,6 +428,129 @@ impl LiveStreamProcessing {
420428
Ok(Quad::new(subject, predicate, object, graph))
421429
}
422430

431+
fn register_live_callbacks(
432+
&self,
433+
) -> Result<Receiver<BindingWithTimestamp>, LiveStreamProcessingError> {
434+
let parsed_query = self.engine.parsed_query().clone();
435+
let sparql_query = Arc::new(parsed_query.sparql_query.clone());
436+
let (tx, rx) = mpsc::channel();
437+
438+
let mut windows = HashMap::new();
439+
for window_def in &parsed_query.s2r {
440+
let window = self.engine.get_window(&window_def.window_name).ok_or_else(|| {
441+
LiveStreamProcessingError(format!(
442+
"Window '{}' not found in engine",
443+
window_def.window_name
444+
))
445+
})?;
446+
windows.insert(window_def.window_name.clone(), window);
447+
}
448+
let windows = Arc::new(windows);
449+
let static_data = Arc::clone(&self.static_data);
450+
451+
for window_def in parsed_query.s2r {
452+
let window_arc = windows.get(&window_def.window_name).cloned().ok_or_else(|| {
453+
LiveStreamProcessingError(format!(
454+
"Window '{}' not available for subscription",
455+
window_def.window_name
456+
))
457+
})?;
458+
let tx_clone = tx.clone();
459+
let sparql_query = Arc::clone(&sparql_query);
460+
let all_windows = Arc::clone(&windows);
461+
let static_data = Arc::clone(&static_data);
462+
let window_name = window_def.window_name.clone();
463+
let window_width = window_def.width;
464+
465+
let mut window = window_arc.lock().unwrap();
466+
window.subscribe(StreamType::RStream, move |mut container| {
467+
let timestamp = container.last_timestamp_changed;
468+
469+
for (other_name, other_window_arc) in all_windows.iter() {
470+
if other_name == &window_name {
471+
continue;
472+
}
473+
if let Ok(other_window) = other_window_arc.lock() {
474+
if let Some(other_container) =
475+
other_window.get_content_from_window(timestamp)
476+
{
477+
for quad in &other_container.elements {
478+
container.add(quad.clone(), timestamp);
479+
}
480+
}
481+
}
482+
}
483+
484+
match Self::execute_live_query(
485+
&container,
486+
&sparql_query,
487+
&static_data.lock().unwrap(),
488+
) {
489+
Ok(bindings) => {
490+
for binding in bindings {
491+
let result = BindingWithTimestamp {
492+
bindings: binding,
493+
timestamp_from: timestamp,
494+
timestamp_to: timestamp + window_width,
495+
};
496+
let _ = tx_clone.send(result);
497+
}
498+
}
499+
Err(err) => {
500+
eprintln!("Live Janus evaluation error: {}", err);
501+
}
502+
}
503+
});
504+
}
505+
506+
Ok(rx)
507+
}
508+
509+
fn execute_live_query(
510+
container: &rsp_rs::QuadContainer,
511+
query: &str,
512+
static_data: &HashSet<Quad>,
513+
) -> Result<Vec<String>, LiveStreamProcessingError> {
514+
let store = Store::new()
515+
.map_err(|e| LiveStreamProcessingError(format!("Failed to create store: {}", e)))?;
516+
517+
for quad in &container.elements {
518+
store.insert(quad).map_err(|e| {
519+
LiveStreamProcessingError(format!("Failed to insert live quad into store: {}", e))
520+
})?;
521+
}
522+
for quad in static_data {
523+
store.insert(quad).map_err(|e| {
524+
LiveStreamProcessingError(format!(
525+
"Failed to insert static quad into live store: {}",
526+
e
527+
))
528+
})?;
529+
}
530+
531+
let parsed_query = build_evaluator().parse_query(query).map_err(|e| {
532+
LiveStreamProcessingError(format!("Failed to parse live SPARQL: {}", e))
533+
})?;
534+
let results = parsed_query.on_store(&store).execute().map_err(|e| {
535+
LiveStreamProcessingError(format!("Failed to execute live SPARQL: {}", e))
536+
})?;
537+
538+
let mut bindings = Vec::new();
539+
if let QueryResults::Solutions(solutions) = results {
540+
for solution in solutions {
541+
let solution = solution.map_err(|e| {
542+
LiveStreamProcessingError(format!(
543+
"Failed to evaluate live solution binding: {}",
544+
e
545+
))
546+
})?;
547+
bindings.push(format!("{:?}", solution));
548+
}
549+
}
550+
551+
Ok(bindings)
552+
}
553+
423554
/// Returns the list of registered stream URIs
424555
pub fn get_registered_streams(&self) -> Vec<String> {
425556
self.streams.keys().cloned().collect()

tests/live_stream_integration_test.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,50 @@ fn test_literal_and_uri_objects() {
272272
assert!(!results.is_empty());
273273
}
274274

275+
#[test]
276+
fn test_live_query_with_janus_extension_function() {
277+
let query = r#"
278+
PREFIX ex: <http://example.org/>
279+
PREFIX janus: <https://janus.rs/fn#>
280+
REGISTER RStream <output> AS
281+
SELECT ?sensor ?reading
282+
FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 1000 STEP 500]
283+
WHERE {
284+
WINDOW ex:w1 {
285+
?sensor ex:hasReading ?reading .
286+
FILTER(janus:absolute_threshold_exceeded(?reading, "25", "2"))
287+
}
288+
}
289+
"#;
290+
291+
let mut processor = LiveStreamProcessing::new(query.to_string()).unwrap();
292+
processor.register_stream("http://example.org/stream1").unwrap();
293+
processor.start_processing().unwrap();
294+
295+
processor
296+
.add_event(
297+
"http://example.org/stream1",
298+
RDFEvent::new(
299+
0,
300+
"http://example.org/sensor-pass",
301+
"http://example.org/hasReading",
302+
"30",
303+
"",
304+
),
305+
)
306+
.unwrap();
307+
308+
processor.close_stream("http://example.org/stream1", 3000).unwrap();
309+
thread::sleep(Duration::from_millis(500));
310+
311+
let results = processor.collect_results(None).unwrap();
312+
assert!(
313+
results.iter().any(|result| result.bindings.contains("sensor-pass")),
314+
"Expected at least one live result to pass the Janus extension-function filter, got {:?}",
315+
results
316+
);
317+
}
318+
275319
#[test]
276320
fn test_rapid_event_stream() {
277321
let query = r#"

0 commit comments

Comments
 (0)