Skip to content

Commit aebd76f

Browse files
Merge pull request #10 from SolidLabResearch/codex/on-log-extension-functions
Add ON LOG parsing and historical extension functions
2 parents c47e6c6 + 51b0cc2 commit aebd76f

16 files changed

Lines changed: 557 additions & 22 deletions

benches/historical_fixed.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
22
use janus::{
33
execution::historical_executor::HistoricalExecutor,
4-
parsing::janusql_parser::{WindowDefinition, WindowType},
4+
parsing::janusql_parser::{SourceKind, WindowDefinition, WindowType},
55
querying::oxigraph_adapter::OxigraphAdapter,
66
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
77
};
@@ -43,6 +43,7 @@ fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
4343
}
4444
let window = WindowDefinition {
4545
window_name: "w".to_string(),
46+
source_kind: SourceKind::Stream,
4647
stream_name: "http://example.org/stream".to_string(),
4748
width: n as u64,
4849
slide: n as u64,

benches/historical_sliding.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
22
use janus::{
33
execution::historical_executor::HistoricalExecutor,
4-
parsing::janusql_parser::{WindowDefinition, WindowType},
4+
parsing::janusql_parser::{SourceKind, WindowDefinition, WindowType},
55
querying::oxigraph_adapter::OxigraphAdapter,
66
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
77
};
@@ -53,6 +53,7 @@ fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
5353
}
5454
let window = WindowDefinition {
5555
window_name: "w".to_string(),
56+
source_kind: SourceKind::Stream,
5657
stream_name: "http://example.org/stream".to_string(),
5758
width: RANGE_MS,
5859
slide: SLIDE_MS,

src/execution/historical_executor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,7 @@ mod tests {
501501

502502
let window = WindowDefinition {
503503
window_name: "test_window".to_string(),
504+
source_kind: crate::parsing::janusql_parser::SourceKind::Stream,
504505
stream_name: "test_stream".to_string(),
505506
width: 1000,
506507
slide: 100,
@@ -528,6 +529,7 @@ mod tests {
528529

529530
let window = WindowDefinition {
530531
window_name: "test_window".to_string(),
532+
source_kind: crate::parsing::janusql_parser::SourceKind::Stream,
531533
stream_name: "test_stream".to_string(),
532534
width: 1000,
533535
slide: 100,

src/extensions/math.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//! Pure numeric helpers used by Janus extension functions.
2+
3+
/// Absolute difference between live and historical values: `|live - hist|`.
4+
pub fn abs_diff(live: f64, hist: f64) -> f64 {
5+
(live - hist).abs()
6+
}
7+
8+
/// Relative change from historical to live: `(live - hist) / hist`.
9+
///
10+
/// Returns `f64::NAN` when `hist` is zero.
11+
pub fn relative_change(live: f64, hist: f64) -> f64 {
12+
(live - hist) / hist
13+
}
14+
15+
/// Z-score of `value` given a distribution with `mean` and `sigma`.
16+
///
17+
/// Returns `0.0` when `sigma` is zero.
18+
pub fn zscore(value: f64, mean: f64, sigma: f64) -> f64 {
19+
if sigma.abs() < f64::EPSILON {
20+
0.0
21+
} else {
22+
(value - mean) / sigma
23+
}
24+
}
25+
26+
#[cfg(test)]
27+
mod tests {
28+
use super::*;
29+
30+
#[test]
31+
fn abs_diff_positive_delta() {
32+
assert!((abs_diff(3.0, 1.0) - 2.0).abs() < f64::EPSILON);
33+
}
34+
35+
#[test]
36+
fn abs_diff_negative_delta() {
37+
assert!((abs_diff(1.0, 3.0) - 2.0).abs() < f64::EPSILON);
38+
}
39+
40+
#[test]
41+
fn relative_change_increase() {
42+
let rc = relative_change(1.1, 1.0);
43+
assert!((rc - 0.1).abs() < 1e-10);
44+
}
45+
46+
#[test]
47+
fn relative_change_zero_hist_is_nan_or_infinite() {
48+
let rc = relative_change(1.0, 0.0);
49+
assert!(rc.is_nan() || rc.is_infinite());
50+
}
51+
52+
#[test]
53+
fn zscore_zero_sigma_returns_zero() {
54+
assert!(zscore(99.0, 1.0, 0.0).abs() < f64::EPSILON);
55+
}
56+
}

src/extensions/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
//! Janus SPARQL extension functions.
2+
//!
3+
//! This module contains reusable function registry and Oxigraph wiring for
4+
//! Janus-specific extension functions exposed under the `https://janus.rs/fn#`
5+
//! namespace.
6+
7+
pub mod math;
8+
pub mod query_options;
9+
pub mod registry;
10+
pub mod rules;

src/extensions/query_options.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
//! Oxigraph evaluator builder preloaded with Janus extension functions.
2+
3+
use oxigraph::model::{Literal, NamedNode, Term};
4+
use oxigraph::sparql::SparqlEvaluator;
5+
6+
use crate::extensions::math::{abs_diff, relative_change, zscore};
7+
use crate::extensions::registry::{
8+
FunctionRegistry, FN_ABSOLUTE_THRESHOLD, FN_ABS_DIFF, FN_CATCH_UP, FN_IS_OUTLIER,
9+
FN_RELATIVE_CHANGE, FN_RELATIVE_THRESHOLD, FN_TREND_DIVERGENT, FN_VOLATILITY_INCREASE,
10+
FN_ZSCORE,
11+
};
12+
13+
fn term_to_f64(term: &Term) -> Option<f64> {
14+
if let Term::Literal(literal) = term {
15+
literal.value().parse::<f64>().ok()
16+
} else {
17+
None
18+
}
19+
}
20+
21+
fn bool_term(value: bool) -> Term {
22+
Term::Literal(Literal::new_typed_literal(
23+
if value { "true" } else { "false" },
24+
NamedNode::new("http://www.w3.org/2001/XMLSchema#boolean").unwrap(),
25+
))
26+
}
27+
28+
fn decimal_term(value: f64) -> Term {
29+
Term::Literal(Literal::new_typed_literal(
30+
&value.to_string(),
31+
NamedNode::new("http://www.w3.org/2001/XMLSchema#decimal").unwrap(),
32+
))
33+
}
34+
35+
fn args_to_floats(args: &[Term]) -> Option<Vec<f64>> {
36+
args.iter().map(term_to_f64).collect()
37+
}
38+
39+
/// Build a `SparqlEvaluator` with Janus extension functions registered.
40+
pub fn build_evaluator() -> SparqlEvaluator {
41+
let registry = FunctionRegistry::new();
42+
let mut evaluator = SparqlEvaluator::new();
43+
44+
for (uri, rule) in registry.all_rules() {
45+
let name = NamedNode::new(uri).expect("constant URI must be valid");
46+
evaluator = evaluator.with_custom_function(name, move |args| {
47+
let floats = args_to_floats(args)?;
48+
match rule.evaluate(&floats) {
49+
Ok(result) => Some(bool_term(result)),
50+
Err(_) => None,
51+
}
52+
});
53+
}
54+
55+
evaluator = evaluator.with_custom_function(NamedNode::new(FN_ABS_DIFF).unwrap(), |args| {
56+
if args.len() != 2 {
57+
return None;
58+
}
59+
let left = term_to_f64(&args[0])?;
60+
let right = term_to_f64(&args[1])?;
61+
Some(decimal_term(abs_diff(left, right)))
62+
});
63+
64+
evaluator =
65+
evaluator.with_custom_function(NamedNode::new(FN_RELATIVE_CHANGE).unwrap(), |args| {
66+
if args.len() != 2 {
67+
return None;
68+
}
69+
let left = term_to_f64(&args[0])?;
70+
let right = term_to_f64(&args[1])?;
71+
let result = relative_change(left, right);
72+
if result.is_finite() {
73+
Some(decimal_term(result))
74+
} else {
75+
None
76+
}
77+
});
78+
79+
evaluator = evaluator.with_custom_function(NamedNode::new(FN_ZSCORE).unwrap(), |args| {
80+
if args.len() != 3 {
81+
return None;
82+
}
83+
let value = term_to_f64(&args[0])?;
84+
let mean = term_to_f64(&args[1])?;
85+
let sigma = term_to_f64(&args[2])?;
86+
Some(decimal_term(zscore(value, mean, sigma)))
87+
});
88+
89+
evaluator
90+
}
91+
92+
#[cfg(test)]
93+
mod tests {
94+
use super::*;
95+
use oxigraph::model::{GraphName, Quad};
96+
use oxigraph::sparql::QueryResults;
97+
use oxigraph::store::Store;
98+
99+
fn named(uri: &str) -> NamedNode {
100+
NamedNode::new(uri).unwrap()
101+
}
102+
103+
fn lit(value: &str) -> Term {
104+
Term::Literal(Literal::new_simple_literal(value))
105+
}
106+
107+
fn insert(store: &Store, subject: &str, predicate: &str, object: &str) {
108+
store
109+
.insert(&Quad::new(
110+
named(subject),
111+
named(predicate),
112+
lit(object),
113+
GraphName::DefaultGraph,
114+
))
115+
.unwrap();
116+
}
117+
118+
#[test]
119+
fn absolute_threshold_filter_works_in_real_oxigraph_query() {
120+
let store = Store::new().unwrap();
121+
insert(&store, "https://janus.rs/test#sensor1", "https://janus.rs/test#live", "2.5");
122+
insert(&store, "https://janus.rs/test#sensor1", "https://janus.rs/test#hist", "1.0");
123+
insert(&store, "https://janus.rs/test#sensor2", "https://janus.rs/test#live", "1.1");
124+
insert(&store, "https://janus.rs/test#sensor2", "https://janus.rs/test#hist", "1.0");
125+
126+
let query = r#"
127+
PREFIX janus: <https://janus.rs/fn#>
128+
PREFIX test: <https://janus.rs/test#>
129+
SELECT ?sensor WHERE {
130+
?sensor test:live ?live ;
131+
test:hist ?hist .
132+
FILTER(janus:absolute_threshold_exceeded(?live, ?hist, 0.2))
133+
}
134+
"#;
135+
136+
let evaluator = build_evaluator();
137+
let parsed_query = evaluator.parse_query(query).unwrap();
138+
let results = parsed_query.on_store(&store).execute().unwrap();
139+
140+
if let QueryResults::Solutions(solutions) = results {
141+
let rows: Vec<_> = solutions.collect::<Result<Vec<_>, _>>().unwrap();
142+
assert_eq!(rows.len(), 1);
143+
let sensor = rows[0].get("sensor").unwrap();
144+
assert_eq!(sensor.to_string(), "<https://janus.rs/test#sensor1>");
145+
} else {
146+
panic!("expected SELECT solutions");
147+
}
148+
}
149+
}

src/extensions/registry.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//! Registry of Janus extension-function names.
2+
3+
use std::collections::HashMap;
4+
use std::sync::Arc;
5+
6+
use crate::extensions::rules::{
7+
AbsoluteThreshold, CatchUp, ExtensionRule, IsOutlier, RelativeThreshold, TrendDivergent,
8+
VolatilityIncrease,
9+
};
10+
11+
/// Namespace shared by all Janus extension functions.
12+
pub const JANUS_NS: &str = "https://janus.rs/fn#";
13+
14+
/// Fully qualified URIs for registered scalar and boolean functions.
15+
pub const FN_ABS_DIFF: &str = "https://janus.rs/fn#abs_diff";
16+
pub const FN_RELATIVE_CHANGE: &str = "https://janus.rs/fn#relative_change";
17+
pub const FN_ZSCORE: &str = "https://janus.rs/fn#zscore";
18+
pub const FN_ABSOLUTE_THRESHOLD: &str = "https://janus.rs/fn#absolute_threshold_exceeded";
19+
pub const FN_RELATIVE_THRESHOLD: &str = "https://janus.rs/fn#relative_threshold_exceeded";
20+
pub const FN_CATCH_UP: &str = "https://janus.rs/fn#catch_up";
21+
pub const FN_VOLATILITY_INCREASE: &str = "https://janus.rs/fn#volatility_increase";
22+
pub const FN_IS_OUTLIER: &str = "https://janus.rs/fn#is_outlier";
23+
pub const FN_TREND_DIVERGENT: &str = "https://janus.rs/fn#trend_divergent";
24+
25+
/// Registry that maps Janus function URIs to boolean rule implementations.
26+
pub struct FunctionRegistry {
27+
rules: HashMap<&'static str, Arc<dyn ExtensionRule>>,
28+
}
29+
30+
impl FunctionRegistry {
31+
/// Build the default registry with all boolean rules pre-populated.
32+
pub fn new() -> Self {
33+
let mut rules: HashMap<&'static str, Arc<dyn ExtensionRule>> = HashMap::new();
34+
rules.insert(FN_ABSOLUTE_THRESHOLD, Arc::new(AbsoluteThreshold));
35+
rules.insert(FN_RELATIVE_THRESHOLD, Arc::new(RelativeThreshold));
36+
rules.insert(FN_CATCH_UP, Arc::new(CatchUp));
37+
rules.insert(FN_VOLATILITY_INCREASE, Arc::new(VolatilityIncrease));
38+
rules.insert(FN_IS_OUTLIER, Arc::new(IsOutlier));
39+
rules.insert(FN_TREND_DIVERGENT, Arc::new(TrendDivergent));
40+
Self { rules }
41+
}
42+
43+
/// Look up a boolean rule by its fully-qualified URI.
44+
pub fn lookup(&self, name: &str) -> Option<&dyn ExtensionRule> {
45+
self.rules.get(name).map(Arc::as_ref)
46+
}
47+
48+
/// Iterate over all registry entries.
49+
pub fn all_rules(&self) -> impl Iterator<Item = (&'static str, Arc<dyn ExtensionRule>)> + '_ {
50+
self.rules.iter().map(|(key, value)| (*key, Arc::clone(value)))
51+
}
52+
}
53+
54+
impl Default for FunctionRegistry {
55+
fn default() -> Self {
56+
Self::new()
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use super::*;
63+
64+
#[test]
65+
fn lookup_known_rule_returns_some() {
66+
let registry = FunctionRegistry::new();
67+
assert!(registry.lookup(FN_ABSOLUTE_THRESHOLD).is_some());
68+
}
69+
70+
#[test]
71+
fn lookup_unknown_name_returns_none() {
72+
let registry = FunctionRegistry::new();
73+
assert!(registry.lookup("https://janus.rs/fn#nonexistent").is_none());
74+
}
75+
}

0 commit comments

Comments
 (0)