diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 16e8f41..8db9fc9 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -186,7 +186,6 @@ Minimal dependency philosophy: - `src/storage/indexing/dictionary.rs` (172 lines) - Dictionary encoding - `tests/dictionary_encoding_test.rs` (624 lines) - Comprehensive integration tests - `BENCHMARK_RESULTS.md` - Performance metrics and analysis -- `ARCHITECTURE.md` - High-level design (note: mentions TypeScript, but actual impl is Rust) - `Makefile` - Common development commands # Copilot Instructions diff --git a/.github/workflows/docs-links.yml b/.github/workflows/docs-links.yml new file mode 100644 index 0000000..ef7981b --- /dev/null +++ b/.github/workflows/docs-links.yml @@ -0,0 +1,20 @@ +name: Docs Link Check + +on: + pull_request: + branches: [main, develop] + paths: + - "**/*.md" + - ".github/workflows/docs-links.yml" + - "scripts/check_doc_links.sh" + +jobs: + docs-links: + name: Validate Markdown Links + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Check local markdown links + run: bash scripts/check_doc_links.sh diff --git a/GETTING_STARTED.md b/GETTING_STARTED.md index 67585d2..8102305 100644 --- a/GETTING_STARTED.md +++ b/GETTING_STARTED.md @@ -40,10 +40,7 @@ cargo run --example http_client_example That example covers query registration, start, stop, replay control, and WebSocket result consumption. -## Optional Local Demo UI - -This repository keeps a small static demo at -`examples/demo_dashboard.html` for manual browser testing. +## Optional Frontend The maintained Svelte dashboard lives in the separate `SolidLabResearch/janus-dashboard` repository. @@ -75,9 +72,9 @@ make ci-check - `src/stream`: live stream processing - `src/storage`: segmented RDF storage - `src/bin`: executable binaries -- `examples`: runnable examples and a minimal static demo +- `examples`: runnable examples, including the HTTP client example - `tests`: integration coverage -- `docs`: current docs plus older design notes +- `docs`: current product docs plus a small amount of retained background material ## Where to Read Next diff --git a/Makefile b/Makefile index b49ceb1..6df1e25 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help build test clean fmt lint check ci-check install run dev doc bench audit deps update watch all release +.PHONY: help build test clean fmt lint check ci-check install run dev doc bench audit deps update watch all release doc-links # Default target .DEFAULT_GOAL := help @@ -59,6 +59,10 @@ lint: ## Run clippy lints check: fmt-check lint ## Run all checks (formatting and linting) @echo "$(GREEN)All checks passed!$(NC)" +doc-links: ## Check local markdown links + @echo "$(BLUE)Checking markdown links...$(NC)" + @bash ./scripts/check_doc_links.sh + ci-check: ## Run full CI/CD checks locally before pushing @echo "$(BLUE)Running CI/CD checks...$(NC)" @./scripts/ci-check.sh diff --git a/README.md b/README.md index f9ee576..672b042 100644 --- a/README.md +++ b/README.md @@ -70,8 +70,6 @@ The maintained dashboard lives in a separate repository: - `https://github.com/SolidLabResearch/janus-dashboard` -The `janus-dashboard/` folder in this repository is a lightweight local demo client, not the primary frontend. - ## Performance Janus uses dictionary encoding and segmented storage for high-throughput ingestion and historical reads. @@ -129,9 +127,7 @@ This example demonstrates: The maintained web dashboard lives in the separate `SolidLabResearch/janus-dashboard` repository. -This repository keeps a small static demo at -[`examples/demo_dashboard.html`](./examples/demo_dashboard.html) for manual API -testing, but frontend development should happen in the dedicated dashboard repo. +Frontend development should happen in the dedicated dashboard repo. ## Development @@ -155,7 +151,6 @@ The repository includes runnable examples under [`examples/`](./examples), inclu - [`examples/http_client_example.rs`](./examples/http_client_example.rs) - [`examples/comparator_demo.rs`](./examples/comparator_demo.rs) -- [`examples/demo_dashboard.html`](./examples/demo_dashboard.html) for a minimal local demo ## Documentation @@ -169,7 +164,8 @@ Start here: ## Notes -- `src/main.rs` is currently a benchmark-style executable, not the main user-facing interface. +- `src/main.rs` is now a lightweight entry binary that points to the main Janus + executables and benchmark helpers. - The primary user-facing entry point is `http_server`. ## Licence diff --git a/START_HERE.md b/START_HERE.md index c5edc6f..18ec195 100644 --- a/START_HERE.md +++ b/START_HERE.md @@ -18,7 +18,7 @@ cargo run --example http_client_example - `http_server` is the main backend entry point - `stream_bus_cli` is the ingestion and replay CLI -- `examples/demo_dashboard.html` is a minimal manual demo +- `examples/http_client_example.rs` is the fastest local API exercise - the maintained Svelte dashboard lives in the separate `janus-dashboard` repository ## Current Docs diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md deleted file mode 100644 index a2dcb16..0000000 --- a/docs/ARCHITECTURE.md +++ /dev/null @@ -1,115 +0,0 @@ -# Janus Architecture Documentation - -## Overview - -Janus is a hybrid engine for unified Live and Historical RDF Stream Processing, implemented in Rust. It is designed for high-throughput ingestion and querying of RDF streams, utilizing a custom segmented storage engine with dictionary encoding and two-level indexing. - -## System Architecture - -The system is composed of the following core modules: - -```mermaid -graph TD - Input[RDF Stream Input] -->|write_rdf| API[Public API] - API -->|Encode| Dictionary[Dictionary Encoding] - Dictionary -->|Event IDs| BatchBuffer[Batch Buffer] - - subgraph StorageSystem [Storage Engine] - BatchBuffer -->|Flush| SegmentedStorage[Segmented Storage] - SegmentedStorage -->|Write| DataFile[Data File .log] - SegmentedStorage -->|Index| IndexFile[Index File .idx] - - IndexFile -->|Load| InMemoryIndex[In-Memory Index Directory] - end - - Query[Query Request] -->|Parse| Parser[JanusQL Parser] - Parser -->|Execute| SegmentedStorage - - SegmentedStorage -->|Search| BatchBuffer - SegmentedStorage -->|Search| InMemoryIndex - InMemoryIndex -->|Locate| IndexFile - IndexFile -->|Locate| DataFile - - DataFile -->|Read Records| Decoder[Decoder] - BatchBuffer -->|Read Records| Decoder - Decoder -->|Decode| Result[RDF Event Result] - - Dictionary -.->|Strings <-> IDs| Decoder -``` - -## Core Components - -### 1. Core Data Structures (`src/core`) - -* **Event**: The internal representation of an RDF event. It uses integer IDs for efficient storage and processing. - * `timestamp`: `u64` (milliseconds since epoch) - * `subject`, `predicate`, `object`, `graph`: `u32` (dictionary encoded IDs) - * **Size**: 24 bytes (vs ~40+ bytes for raw pointers/strings), achieving ~40% space efficiency. -* **RDFEvent**: The user-facing representation with `String` fields for subject, predicate, object, and graph. - -### 2. Storage Engine (`src/storage`) - -The storage engine is designed for high write throughput and fast time-based range queries. - -* **StreamingSegmentedStorage**: The main coordinator. - * **BatchBuffer**: An in-memory `VecDeque` that buffers incoming events. This allows for immediate visibility of recent data and high ingestion rates. - * **Segments**: Data is persisted in immutable segments on disk. Each segment consists of: - * **Data File (`.log`)**: Stores fixed-size (24-byte) `Event` records sequentially. - * **Index File (`.idx`)**: Stores sparse index blocks mapping timestamps to file offsets. - * **Background Flushing**: A dedicated thread monitors the `BatchBuffer` and flushes it to a new disk segment when it reaches size or time limits. - -* **Indexing Strategy**: - * **Two-Level Indexing**: - 1. **Level 1 (Memory)**: A directory of `IndexBlock` metadata (min/max timestamp, file offset) is kept in memory for each segment. This allows quickly pruning irrelevant segments and locating the relevant index blocks on disk. - 2. **Level 2 (Disk)**: The actual `.idx` file contains sparse entries (e.g., every 1000th event). - * **Binary Search**: Used within the loaded index blocks to find the exact start offset in the data file. - -### 3. Dictionary Encoding (`src/storage/indexing/dictionary.rs`) - -* Maps RDF term strings (URIs, literals) to unique `u32` integers. -* Bi-directional mapping allows encoding (String -> ID) for storage and decoding (ID -> String) for query results. -* Crucial for reducing storage footprint and improving comparison performance. - -### 4. Query Language & Parsing (`src/parsing`) - -* **JanusQL**: A query language extending SPARQL/RSP-QL. -* **JanusQLParser**: Parses queries using Regex. - * **Windows**: Supports `Live`, `HistoricalSliding`, and `HistoricalFixed` windows. - * **Syntax**: `FROM NAMED WINDOW ON STREAM [RANGE ... STEP ...]` - * **Output**: Generates internal query structures and translates to SPARQL/RSP-QL. - -## Data Flow - -### Write Path -1. User calls `write_rdf` with string arguments. -2. Strings are encoded to `u32` IDs using the `Dictionary`. -3. The `Event` (24 bytes) is pushed to the `BatchBuffer`. -4. If the buffer is full, the background thread flushes it: - * Sorts events by timestamp. - * Writes events to a new `.log` file. - * Builds and writes sparse index entries to a `.idx` file. - * Updates the in-memory segment list. - -### Read Path (Query) -1. User calls `query_rdf` with a time range. -2. **BatchBuffer Scan**: Scans the in-memory buffer for matching events. -3. **Segment Scan**: - * Filters segments based on time overlap. - * For each relevant segment: - * Checks in-memory `IndexBlock` directory. - * Loads relevant index blocks from `.idx` file. - * Performs binary search to find start offset. - * Scans `.log` file from offset until end timestamp. -4. Results are merged, sorted, and decoded back to strings using the `Dictionary`. - -## Performance Characteristics - -* **Write Throughput**: High, due to append-only in-memory buffering and background flushing. -* **Read Latency**: Low, due to two-level indexing minimizing disk I/O. -* **Space Efficiency**: High, due to dictionary encoding and compact binary format. - -## Future Improvements - -* **WAL (Write-Ahead Log)**: To ensure durability in case of crashes before buffer flush (currently mentioned in comments/benchmarks but not fully integrated in the main flow shown). -* **Advanced Querying**: Full SPARQL engine integration. -* **Compression**: Block-level compression for data files. diff --git a/docs/DOCUMENTATION_INDEX.md b/docs/DOCUMENTATION_INDEX.md index 7bccdf6..a5b5ae3 100644 --- a/docs/DOCUMENTATION_INDEX.md +++ b/docs/DOCUMENTATION_INDEX.md @@ -58,20 +58,8 @@ This is the shortest path to understanding the current Janus implementation. - [README_HTTP_API.md](./README_HTTP_API.md) - [QUICKSTART_HTTP_API.md](./QUICKSTART_HTTP_API.md) -## Legacy Material - -The following files remain useful as background, but they are not the main entrypoint for the current code: - -- [ARCHITECTURE.md](./ARCHITECTURE.md) -- [EXECUTION_ARCHITECTURE.md](./EXECUTION_ARCHITECTURE.md) -- [MVP_ARCHITECTURE.md](./MVP_ARCHITECTURE.md) -- [MVP_TODO.md](./MVP_TODO.md) -- [RSP_INTEGRATION_COMPLETE.md](./RSP_INTEGRATION_COMPLETE.md) -- [SPARQL_BINDINGS_UPGRADE.md](./SPARQL_BINDINGS_UPGRADE.md) - ## Dashboard Boundary -- Local demo dashboard in this repository: `examples/demo_dashboard.html` - Maintained dashboard repository: `https://github.com/SolidLabResearch/janus-dashboard` ## Related Code diff --git a/docs/EXECUTION_ARCHITECTURE.md b/docs/EXECUTION_ARCHITECTURE.md deleted file mode 100644 index 46225c5..0000000 --- a/docs/EXECUTION_ARCHITECTURE.md +++ /dev/null @@ -1,756 +0,0 @@ -# Execution Architecture Documentation - -**Date:** 2024 -**Version:** 0.1.0 -**Status:** ✅ Complete - -## Overview - -The Janus execution layer provides internal components for executing both historical and live RDF stream queries. This architecture separates query execution logic from the public API, enabling clean separation of concerns and testability. - -## Architecture Layers - -``` -┌─────────────────────────────────────────────────────────┐ -│ Public API Layer │ -│ (JanusApi in src/api/) │ -│ - User-facing query registration and execution │ -│ - Returns unified QueryResult stream via QueryHandle │ -└────────────────┬────────────────────────────────────────┘ - │ - │ spawns threads, coordinates execution - │ - ┌───────────┴───────────┐ - │ │ - ▼ ▼ -┌─────────────────┐ ┌──────────────────────┐ -│ Historical │ │ Live Stream │ -│ Executor │ │ Processing │ -│ (Internal) │ │ (Existing) │ -└────────┬────────┘ └──────────┬───────────┘ - │ │ - │ │ - ┌───┴────────────────────┬──┴──────────────┐ - │ │ │ - ▼ ▼ ▼ -┌─────────────┐ ┌──────────────┐ ┌─────────────┐ -│ Window │ │ SPARQL │ │ RSP-RS │ -│ Operators │ │ Engine │ │ Engine │ -│ │ │ (Oxigraph) │ │ │ -└──────┬──────┘ └──────────────┘ └─────────────┘ - │ - ▼ -┌─────────────────┐ -│ Storage │ -│ Backend │ -└─────────────────┘ -``` - -## Components - -### 1. HistoricalExecutor (`src/execution/historical_executor.rs`) - -**Purpose:** Executes SPARQL queries over historical RDF data stored in the segmented storage backend. - -**Key Responsibilities:** -- Query storage via window definitions (Fixed/Sliding) -- Convert internal Event format → RDFEvent → Oxigraph Quad -- Execute SPARQL queries with structured bindings -- Return results as `Vec>` - -**Public Methods:** - -```rust -// Execute a fixed window query (returns once) -pub fn execute_fixed_window( - &self, - window: &WindowDefinition, - sparql_query: &str, -) -> Result>, JanusApiError> - -// Execute sliding windows (returns iterator) -pub fn execute_sliding_windows<'a>( - &self, - window: &WindowDefinition, - sparql_query: &'a str, -) -> impl Iterator>, JanusApiError>> + 'a -``` - -**Internal Flow:** - -``` -1. Extract time range from WindowDefinition - ├─ Fixed: Use explicit start/end timestamps - └─ Sliding: Calculate from offset/width/slide - -2. Query storage for Event data - └─ StreamingSegmentedStorage.query(start, end) -> Vec - -3. Decode Event → RDFEvent - ├─ Get Dictionary from storage - ├─ Decode subject ID → URI string - ├─ Decode predicate ID → URI string - ├─ Decode object ID → URI/literal string - └─ Decode graph ID → URI string - -4. Convert RDFEvent → Quad - ├─ Parse subject as NamedNode - ├─ Parse predicate as NamedNode - ├─ Parse object as NamedNode or Literal - └─ Parse graph as NamedNode or DefaultGraph - -5. Build QuadContainer - └─ Collect quads into HashSet with timestamp - -6. Execute SPARQL - └─ OxigraphAdapter.execute_query_bindings() -> Vec> - -7. Return structured results -``` - -**Example Usage:** - -```rust -use janus::execution::HistoricalExecutor; - -let executor = HistoricalExecutor::new(storage, OxigraphAdapter::new()); - -// Fixed window query -let window = WindowDefinition { - start: Some(1000), - end: Some(2000), - window_type: WindowType::HistoricalFixed, - // ... other fields -}; - -let results = executor.execute_fixed_window(&window, "SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?; -for binding in results { - println!("Subject: {:?}", binding.get("s")); -} - -// Sliding window query -let window = WindowDefinition { - width: 1000, - slide: 200, - offset: Some(5000), - window_type: WindowType::HistoricalSliding, - // ... other fields -}; - -for window_result in executor.execute_sliding_windows(&window, "SELECT ?s WHERE { ?s ?p ?o }") { - match window_result { - Ok(bindings) => println!("Window has {} results", bindings.len()), - Err(e) => eprintln!("Error: {}", e), - } -} -``` - -**Design Decisions:** - -1. **Direct Storage Queries vs Window Operators** - - Currently queries storage directly instead of using `HistoricalFixedWindowOperator`/`HistoricalSlidingWindowOperator` - - Reason: Window operators use `Rc`, but executor has `Arc` - - Arc→Rc conversion is non-trivial without unsafe code - - Future: Refactor window operators to use Arc for thread-safety - -2. **Structured Bindings** - - Returns `Vec>` (variable name → value) - - Uses new `execute_query_bindings()` from OxigraphAdapter - - Enables easy programmatic access to results - -3. **Iterator for Sliding Windows** - - Returns `impl Iterator` instead of collecting all results - - Memory efficient for large time ranges - - Allows consumer to control processing - -### 2. ResultConverter (`src/execution/result_converter.rs`) - -**Purpose:** Converts execution results from different engines into unified `QueryResult` format. - -**Key Responsibilities:** -- Convert historical bindings (HashMap) → QueryResult -- Convert live bindings (BindingWithTimestamp) → QueryResult -- Attach metadata (query_id, timestamp, source) - -**Public Methods:** - -```rust -// Convert historical SPARQL bindings -pub fn from_historical_bindings( - &self, - bindings: Vec>, - timestamp: u64, -) -> QueryResult - -// Convert single historical binding -pub fn from_historical_binding( - &self, - binding: HashMap, - timestamp: u64, -) -> QueryResult - -// Convert live stream binding -pub fn from_live_binding(&self, binding: BindingWithTimestamp) -> QueryResult - -// Batch convert historical bindings (one QueryResult per binding) -pub fn from_historical_bindings_batch( - &self, - bindings: Vec>, - timestamp: u64, -) -> Vec - -// Create empty result -pub fn empty_result(&self, timestamp: u64, source: ResultSource) -> QueryResult -``` - -**Example Usage:** - -```rust -use janus::execution::ResultConverter; -use janus::api::janus_api::ResultSource; - -let converter = ResultConverter::new("query_123".into()); - -// Convert historical results -let bindings = vec![ - hashmap!{"s" => "", "p" => ""}, - hashmap!{"s" => "", "p" => ""}, -]; - -let result = converter.from_historical_bindings(bindings, 1000); -assert_eq!(result.source, ResultSource::Historical); -assert_eq!(result.bindings.len(), 2); - -// Convert live results -let live_binding = /* received from RSP-RS */; -let result = converter.from_live_binding(live_binding); -assert_eq!(result.source, ResultSource::Live); -``` - -**RSP-RS Binding Conversion:** - -Currently uses a simplified approach: -- `BindingWithTimestamp` has fields: `timestamp_from`, `timestamp_to`, `bindings` (String) -- The `bindings` field is a formatted string representation -- Stored under `_raw_bindings` key in HashMap -- **TODO:** Implement proper parsing of RSP-RS binding format - -### 3. Integration with JanusApi (`src/api/janus_api.rs`) - -**Status:** ✅ **FULLY IMPLEMENTED** - -The JanusApi now provides a complete implementation that orchestrates both historical and live query execution. - -**Key Methods:** - -```rust -impl JanusApi { - // Register a JanusQL query - pub fn register_query( - &self, - query_id: QueryId, - janusql: &str, - ) -> Result - - // Start execution (spawns historical + live threads) - pub fn start_query(&self, query_id: &QueryId) -> Result - - // Stop a running query - pub fn stop_query(&self, query_id: &QueryId) -> Result<(), JanusApiError> - - // Check if query is running - pub fn is_running(&self, query_id: &QueryId) -> bool - - // Get query execution status - pub fn get_query_status(&self, query_id: &QueryId) -> Option -} -``` - -**Implementation Details:** - -```rust -pub fn start_query(&self, query_id: &QueryId) -> Result { - // 1. Get registered query metadata - let metadata = self.registry.get(query_id)?; - let parsed = &metadata.parsed; // ParsedJanusQuery - - // 2. Create unified result channel - let (result_tx, result_rx) = mpsc::channel::(); - - // 3. Spawn HISTORICAL worker threads (one per historical window) - for (i, window) in parsed.historical_windows.iter().enumerate() { - let sparql_query = parsed.sparql_queries.get(i)?.clone(); - let tx = result_tx.clone(); - let storage = Arc::clone(&self.storage); - - thread::spawn(move || { - let executor = HistoricalExecutor::new(storage, OxigraphAdapter::new()); - let converter = ResultConverter::new(query_id.clone()); - - match window.window_type { - WindowType::HistoricalFixed => { - if let Ok(bindings) = executor.execute_fixed_window(&window, &sparql_query) { - let result = converter.from_historical_bindings( - bindings, - window.end.unwrap_or(0) - ); - let _ = tx.send(result); - } - } - WindowType::HistoricalSliding => { - for window_result in executor.execute_sliding_windows(&window, &sparql_query) { - if let Ok(bindings) = window_result { - let result = converter.from_historical_bindings(bindings, current_time()); - let _ = tx.send(result); - } - } - } - _ => {} - } - }); - } - - // 4. Spawn LIVE worker thread (if there are live windows) - if !parsed.live_windows.is_empty() { - let tx = result_tx.clone(); - let rspql = parsed.rspql_query.clone(); - let live_windows = parsed.live_windows.clone(); - - thread::spawn(move || { - let mut live_processor = LiveStreamProcessing::new(rspql).unwrap(); - - // Register all live streams - for window in &live_windows { - let _ = live_processor.register_stream(&window.stream_name); - } - - live_processor.start_processing().unwrap(); - let converter = ResultConverter::new(query_id.clone()); - - // Continuously receive live results - loop { - match live_processor.try_receive_result() { - Ok(Some(binding)) => { - let result = converter.from_live_binding(binding); - if tx.send(result).is_err() { - break; // Channel closed - } - } - Ok(None) => thread::sleep(Duration::from_millis(10)), - Err(_) => break, - } - } - }); - } - - // 5. Store running query and return handle - Ok(QueryHandle { - query_id: query_id.clone(), - receiver: result_rx, - }) -} -``` - -**Complete User Experience:** - -```rust -use janus::api::janus_api::JanusApi; -use janus::parsing::janusql_parser::JanusQLParser; -use janus::registry::query_registry::QueryRegistry; -use janus::storage::segmented_storage::StreamingSegmentedStorage; -use std::sync::Arc; - -// 1. Initialize Janus components -let parser = JanusQLParser::new()?; -let registry = Arc::new(QueryRegistry::new()); -let storage = Arc::new(StreamingSegmentedStorage::new(config)?); - -let api = JanusApi::new(parser, registry, storage)?; - -// 2. Register JanusQL query (combines historical + live) -let janusql = r#" - PREFIX ex: - - REGISTER RStream AS - SELECT ?sensor ?temp - - -- Historical: Last hour of data - FROM NAMED WINDOW ex:history ON STREAM ex:sensors - [OFFSET 3600000 RANGE 3600000 STEP 60000] - - -- Live: Continuous stream - FROM NAMED WINDOW ex:live ON STREAM ex:sensors - [RANGE 10000 STEP 2000] - - WHERE { - WINDOW ex:history { ?sensor ex:temperature ?temp } - WINDOW ex:live { ?sensor ex:temperature ?temp } - } -"#; - -api.register_query("temp_monitor".into(), janusql)?; - -// 3. Start execution (both historical and live) -let handle = api.start_query(&"temp_monitor".into())?; - -// 4. Receive unified stream of results -while let Some(result) = handle.receive() { - match result.source { - ResultSource::Historical => { - // Historical results arrive first - println!("📜 Historical [t={}]: {:?}", result.timestamp, result.bindings); - } - ResultSource::Live => { - // Live results stream continuously - println!("🔴 Live [t={}]: {:?}", result.timestamp, result.bindings); - } - } -} - -// 5. Stop query when done -api.stop_query(&"temp_monitor".into())?; -``` - -**Testing:** - -Comprehensive integration tests verify: -- ✅ Query registration -- ✅ Historical fixed window execution -- ✅ Historical sliding window execution -- ✅ Live stream processing -- ✅ Concurrent query execution -- ✅ Query lifecycle (start/stop/status) - -Run tests: -```bash -cargo test --test janus_api_integration_test -``` - -## Data Flow - -### Historical Query Execution - -``` -User - ↓ -JanusApi.start_query() - ↓ -Spawn Historical Thread - ↓ -HistoricalExecutor.execute_fixed_window() or execute_sliding_windows() - ↓ -Storage.query(start, end) → Vec - ↓ -Dictionary.decode(event.subject/predicate/object/graph) → RDFEvent - ↓ -RDFEvent → Quad (NamedNode/Literal parsing) - ↓ -QuadContainer - ↓ -OxigraphAdapter.execute_query_bindings(sparql, container) - ↓ -Vec> - ↓ -ResultConverter.from_historical_bindings() - ↓ -QueryResult { source: Historical, bindings, ... } - ↓ -Send to channel - ↓ -QueryHandle.receive() - ↓ -User receives result -``` - -### Live Query Execution - -``` -User - ↓ -JanusApi.start_query() - ↓ -Spawn Live Thread - ↓ -LiveStreamProcessing.start_processing() - ↓ -RSP-RS Engine (continuous processing) - ↓ -BindingWithTimestamp (from RSP-RS) - ↓ -ResultConverter.from_live_binding() - ↓ -QueryResult { source: Live, bindings, ... } - ↓ -Send to channel - ↓ -QueryHandle.receive() - ↓ -User receives result -``` - -## File Structure - -``` -src/ -├── execution/ -│ ├── mod.rs # Module definition -│ ├── historical_executor.rs # Historical query execution -│ └── result_converter.rs # Result format conversion -│ -├── api/ -│ ├── mod.rs -│ └── janus_api.rs # Public API (uses execution/) -│ -├── stream/ -│ ├── operators/ -│ │ ├── mod.rs -│ │ ├── historical_fixed_window.rs # Window operators -│ │ └── historical_sliding_window.rs -│ └── live_stream_processing.rs # Live execution -│ -├── querying/ -│ └── oxigraph_adapter.rs # SPARQL engine adapter -│ -└── storage/ - └── segmented_storage.rs # Storage backend -``` - -## Performance Characteristics - -### Memory - -**HistoricalExecutor:** -- Loads one window's worth of events into memory at a time -- Sliding windows use iterator pattern (lazy evaluation) -- Quads are collected into HashSet for SPARQL execution -- Memory usage: ~O(events_per_window × (24 bytes + quad_size)) - -**ResultConverter:** -- Minimal overhead - just wraps existing data structures -- No large allocations or buffering - -### CPU - -**Conversion Overhead:** -- Event → RDFEvent: ~O(n) dictionary lookups (4 per event) -- RDFEvent → Quad: ~O(n) URI parsing -- SPARQL execution: Depends on query complexity -- Total: Dominated by SPARQL execution time - -**Concurrency:** -- Historical and live threads run independently -- No shared mutable state between threads -- Results sent via channels (lock-free message passing) - -### I/O - -**Storage Queries:** -- Range queries use two-level indexing (sparse + dense) -- Binary search over index blocks -- Sequential reads of data segments -- Typical query: <10ms for 1000s of events - -## Testing - -### Unit Tests - -**HistoricalExecutor:** -- ✅ Executor creation -- ✅ Time range extraction (fixed windows) -- ✅ Time range extraction (sliding windows) -- ✅ RDFEvent → Quad conversion (URI objects) -- ✅ RDFEvent → Quad conversion (literal objects) -- ✅ Invalid URI error handling - -**ResultConverter:** -- ✅ Historical binding conversion -- ✅ Historical bindings batch conversion -- ✅ Empty result creation -- ✅ Converter reuse -- ✅ Multiple query IDs - -**Run Tests:** -```bash -cargo test --lib execution -``` - -### Integration Tests - -Currently lacking full integration tests. **TODO:** -- Create test with actual storage writes -- Query historical data via executor -- Verify SPARQL results -- Test sliding window iteration - -## Error Handling - -### Error Types - -```rust -pub enum JanusApiError { - ParseError(String), // JanusQL parsing failed - ExecutionError(String), // SPARQL execution or conversion failed - RegistryError(String), // Query not found in registry - StorageError(String), // Storage query failed - LiveProcessingError(String), // Live stream processing error -} -``` - -### Error Propagation - -- All execution methods return `Result` -- Errors bubble up to thread spawner -- Threads log errors and terminate gracefully -- User receives no result (channel closes) - -## Future Enhancements - -### Short-Term - -1. **Window Operator Integration** - - Refactor operators to use `Arc` - - Replace direct storage queries with operator usage - - Better code reuse - -2. **Improved RSP-RS Binding Parsing** - - Parse `bindings` String into structured HashMap - - Extract variable names and values properly - - Match historical binding format - -3. **Integration Tests** - - End-to-end tests with real data - - Multi-window sliding tests - - Error scenario coverage - -### Long-Term - -1. **Query Optimization** - - Push-down filters to storage layer - - Index-aware query planning - - Parallel window processing - -2. **Caching** - - Cache decoded RDFEvents - - Reuse QuadContainers across queries - - Memoize SPARQL results - -3. **Metrics and Monitoring** - - Query execution time tracking - - Memory usage monitoring - - Result throughput metrics - -4. **Advanced Window Types** - - Tumbling windows - - Session windows - - Custom aggregation windows - -## Known Limitations - -1. **Arc/Rc Impedance Mismatch** - - Window operators expect `Rc`, executor has `Arc` - - Currently bypassed by querying storage directly - - Need operator refactoring for proper thread-safety - -2. **RSP-RS Binding Format** - - Currently stores raw string representation - - Not parsed into structured variables - - Limits usability of live results - -3. **No Query Cancellation** - - Once started, historical queries run to completion - - No mechanism to stop mid-execution - - Future: Add shutdown signals - -4. **Single-Threaded Historical Execution** - - Each query gets one thread - - Sliding windows processed sequentially - - Future: Parallel window processing - -## Related Documentation - -- **SPARQL Bindings:** `docs/SPARQL_BINDINGS_UPGRADE.md` -- **Architecture:** `docs/ARCHITECTURE.md` -- **RSP Integration:** `docs/RSP_INTEGRATION_COMPLETE.md` -- **API Reference:** Generated via `cargo doc` - -## Verification - -```bash -# Build execution module -cargo build --lib - -# Run execution tests -cargo test --lib execution - -# Run all tests -cargo test --lib - -# Check for warnings -cargo clippy --lib - -# Build documentation -cargo doc --no-deps --open -``` - -## Implementation Status - -### ✅ Completed - -1. **HistoricalExecutor** (585 lines) - - Fixed window execution - - Sliding window execution - - Event → RDFEvent → Quad conversion - - SPARQL execution with structured bindings - - 6 unit tests - -2. **ResultConverter** (297 lines) - - Historical result conversion - - Live result conversion - - Batch conversion utilities - - 6 unit tests - -3. **JanusApi Integration** (400+ lines) - - `register_query()` - Parse and store JanusQL - - `start_query()` - Spawn historical + live threads - - `stop_query()` - Graceful shutdown - - `is_running()` - Status checking - - `get_query_status()` - Execution monitoring - - 11 integration tests - -### 🎯 Key Achievements - -- ✅ **Unified Query Execution** - Single API for historical + live -- ✅ **Thread-Safe** - Message passing via channels -- ✅ **Structured Results** - HashMap bindings, not debug strings -- ✅ **Concurrent Queries** - Multiple queries run independently -- ✅ **Graceful Shutdown** - Stop queries cleanly -- ✅ **Comprehensive Testing** - 23 total tests (12 unit + 11 integration) -- ✅ **Full Documentation** - Architecture + usage examples - -### 📊 Test Results - -``` -Unit Tests (execution module): - running 12 tests - test result: ok. 12 passed - -Integration Tests (JanusApi): - running 11 tests - test result: ok. 11 passed - -Total: 23 tests passing -``` - -### 🚀 Production Ready - -The execution architecture is complete and production-ready: - -- ✅ Separates concerns (execution vs. API) -- ✅ Enables unified historical + live results -- ✅ Uses structured SPARQL bindings -- ✅ Supports both fixed and sliding windows -- ✅ Thread-safe with message passing -- ✅ Well-tested and documented -- ✅ **FULLY INTEGRATED with JanusApi** - -**Status:** ✅ **COMPLETE** - Ready for production use. \ No newline at end of file diff --git a/docs/HTTP_API.md b/docs/HTTP_API.md index 50404a0..55f892d 100644 --- a/docs/HTTP_API.md +++ b/docs/HTTP_API.md @@ -857,8 +857,7 @@ Options: ## Additional Resources - [JanusQL Query Language Documentation](./JANUSQL.md) -- [Stream Bus CLI Documentation](./STREAM_BUS.md) -- [Architecture Overview](./ARCHITECTURE.md) +- [Stream Bus CLI Documentation](./STREAM_BUS_CLI.md) - [Benchmark Results](./BENCHMARK_RESULTS.md) --- diff --git a/docs/HTTP_API_CURRENT.md b/docs/HTTP_API_CURRENT.md index 40af504..1572637 100644 --- a/docs/HTTP_API_CURRENT.md +++ b/docs/HTTP_API_CURRENT.md @@ -14,7 +14,41 @@ cargo run --bin http_server -- --host 127.0.0.1 --port 8080 --storage-dir ./data `GET /health` -Returns a simple success payload. +Returns a service health payload with storage background-flush status. + +Healthy response: + +```json +{ + "status": "ok", + "message": "Janus HTTP API is running", + "storage_status": "ok", + "storage_error": null +} +``` + +If background storage flushing has failed, the endpoint returns HTTP `503 +Service Unavailable` with: + +```json +{ + "status": "degraded", + "message": "Janus HTTP API is running with storage errors", + "storage_status": "error", + "storage_error": "Background flush failed: ..." +} +``` + +### Ops Status + +`GET /ops/status` + +Returns a richer operational snapshot with: + +- overall service status +- storage background-flush health +- replay metrics +- query lifecycle counts ### Register Query diff --git a/docs/LIVE_STREAMING_GUIDE.md b/docs/LIVE_STREAMING_GUIDE.md index 296da69..14836bd 100644 --- a/docs/LIVE_STREAMING_GUIDE.md +++ b/docs/LIVE_STREAMING_GUIDE.md @@ -50,9 +50,9 @@ docker-compose up -d mosquitto ./scripts/start_http_server.sh --clean ``` -### 3. Open Dashboard +### 3. Start a Client -Open `examples/demo_dashboard.html` in your browser. +Use the maintained dashboard repository or run the local HTTP client example. ### 4. Start Replay (Publishes to MQTT + Storage) @@ -424,4 +424,4 @@ Janus now provides complete live streaming support via MQTT integration: ✓ WebSocket streams results in real-time ✓ Auto-cleanup when queries stop -For questions or issues, check the server logs at `/tmp/janus_server.log`. \ No newline at end of file +For questions or issues, check the server logs at `/tmp/janus_server.log`. diff --git a/docs/MVP_ARCHITECTURE.md b/docs/MVP_ARCHITECTURE.md deleted file mode 100644 index ee14447..0000000 --- a/docs/MVP_ARCHITECTURE.md +++ /dev/null @@ -1,560 +0,0 @@ -# Janus MVP Architecture Overview - -## Current State vs. Target State - -### Legend -- ✅ **Implemented & Working** -- ⚠️ **Partially Implemented** -- ❌ **Missing / Not Implemented** - ---- - -## System Architecture Diagram - -``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ JANUS HYBRID RDF ENGINE │ -└─────────────────────────────────────────────────────────────────────────────┘ - -┌─────────────────────────────────────────────────────────────────────────────┐ -│ CLIENT LAYER │ -├─────────────────────────────────────────────────────────────────────────────┤ -│ │ -│ ✅ Stream Bus CLI ❌ Query CLI ❌ HTTP/WebSocket API │ -│ (Data Ingestion) (Query Execution) (Dashboard Integration) │ -│ │ -│ $ stream_bus_cli $ query_cli REST + WebSocket │ -│ --input data.nq --register q1 GET /api/queries │ -│ --storage path --execute q1 POST /api/queries/:id │ -│ --rate 1000 --format json WS /api/queries/:id/results│ -│ │ -└───────────────────────┬───────────────────┬─────────────────────────────────┘ - │ │ - │ │ -┌───────────────────────▼───────────────────▼─────────────────────────────────┐ -│ JANUS API LAYER │ -├─────────────────────────────────────────────────────────────────────────────┤ -│ │ -│ ⚠️ JanusApi (src/api/janus_api.rs) │ -│ │ -│ ✅ register_query(query_id, janusql) → QueryMetadata │ -│ ├─ Parses JanusQL via JanusQLParser │ -│ ├─ Stores in QueryRegistry │ -│ └─ Returns metadata │ -│ │ -│ ❌ start_query(query_id) → QueryHandle <-- CRITICAL MISSING PIECE │ -│ ├─ ❌ Spawn Historical Worker │ -│ │ ├─ Query storage for time range │ -│ │ ├─ Decode Event → RDFEvent │ -│ │ ├─ Execute SPARQL via OxigraphAdapter │ -│ │ └─ Send results with ResultSource::Historical │ -│ │ │ -│ ├─ ❌ Spawn Live Worker │ -│ │ ├─ Initialize LiveStreamProcessing │ -│ │ ├─ Subscribe to EventBus for incoming events │ -│ │ ├─ Add events to RSP engine │ -│ │ └─ Send results with ResultSource::Live │ -│ │ │ -│ └─ Return QueryHandle { query_id, receiver } │ -│ │ -│ ❌ stop_query(query_id) → Result<(), Error> │ -│ └─ Send shutdown signals, join threads │ -│ │ -└─────────────────────────────────────────────────────────────────────────────┘ - │ │ │ - │ │ │ - ▼ ▼ ▼ -┌────────────────┐ ┌──────────────────┐ ┌─────────────────────┐ -│ ✅ QueryRegistry│ │ ✅ JanusQLParser │ │ ❌ Event Bus │ -├────────────────┤ ├──────────────────┤ ├─────────────────────┤ -│ Stores queries │ │ Parses JanusQL │ │ Pub/Sub for events │ -│ with metadata │ │ Generates: │ │ │ -│ │ │ - RSP-QL │ │ publish(event) │ -│ register() │ │ - SPARQL │ │ subscribe() → rx │ -│ get() │ │ - Windows │ │ │ -│ unregister() │ │ - Prefixes │ │ Connects: │ -│ list_all() │ │ │ │ StreamBus → Live │ -└────────────────┘ └──────────────────┘ └─────────────────────┘ - - -┌─────────────────────────────────────────────────────────────────────────────┐ -│ DATA INGESTION LAYER │ -├─────────────────────────────────────────────────────────────────────────────┤ -│ │ -│ ✅ StreamBus (src/stream_bus/stream_bus.rs) │ -│ │ -│ Input: RDF file (N-Triples/N-Quads) │ -│ │ │ -│ ├─► Parse RDF lines → RDFEvent │ -│ │ │ -│ ├─► Write to Storage (via Dictionary encoding) │ -│ │ └─ Event (24 bytes) = u32 IDs + u64 timestamp │ -│ │ │ -│ ├─► ❌ Publish to EventBus (for live processing) <-- MISSING │ -│ │ │ -│ └─► Publish to Kafka/MQTT (optional) │ -│ │ -└─────────────────────────────────────────────────────────────────────────────┘ - │ - │ - ▼ -┌─────────────────────────────────────────────────────────────────────────────┐ -│ STORAGE & INDEXING LAYER │ -├─────────────────────────────────────────────────────────────────────────────┤ -│ │ -│ ✅ StreamingSegmentedStorage (src/storage/segmented_storage.rs) │ -│ │ -│ Architecture: │ -│ │ -│ ┌──────────────────┐ Background Thread │ -│ │ BatchBuffer │◄──────────────────────────────┐ │ -│ │ (Arc) │ │ │ -│ └────────┬─────────┘ │ │ -│ │ │ │ -│ │ Flush when threshold exceeded │ │ -│ │ │ │ -│ ▼ │ │ -│ ┌──────────────────────────────────────────────────┴─────┐ │ -│ │ Segment Files (data/ directory) │ │ -│ │ ├─ segment_0000.dat (Event records, 24 bytes each) │ │ -│ │ ├─ segment_0001.dat │ │ -│ │ └─ segment_NNNN.dat │ │ -│ └────────────────────────────────────────────────────────┘ │ -│ │ │ -│ ▼ │ -│ ┌──────────────────────────────────────────────────────┐ │ -│ │ Indexing (src/storage/indexing/) │ │ -│ │ ├─ Sparse Index (every Nth record) │ │ -│ │ ├─ Dense Index (every record) │ │ -│ │ └─ Dictionary (URI ←→ u32 ID mapping) │ │ -│ └──────────────────────────────────────────────────────┘ │ -│ │ -│ Key Methods: │ -│ ✅ write(events: &[RDFEvent]) → Result<()> │ -│ ✅ read_range(start_ts, end_ts) → Result> │ -│ ✅ background_flush_loop() │ -│ │ -│ Performance: │ -│ - 2.6-3.14 Million quads/sec write throughput │ -│ - Sub-millisecond point queries │ -│ - 40% compression (40 bytes → 24 bytes) │ -│ │ -└─────────────────────────────────────────────────────────────────────────────┘ - - -┌─────────────────────────────────────────────────────────────────────────────┐ -│ QUERY EXECUTION LAYER │ -├─────────────────────────────────────────────────────────────────────────────┤ -│ │ -│ ┌───────────────────────────────────────────────────────────────┐ │ -│ │ HISTORICAL PATH (Batch Processing) │ │ -│ ├───────────────────────────────────────────────────────────────┤ │ -│ │ │ │ -│ │ ❌ HistoricalExecutor (src/api/historical_executor.rs) │ │ -│ │ │ │ │ -│ │ ├─► Query storage.read_range(start_ts, end_ts) │ │ -│ │ │ └─ Returns Vec (24-byte records) │ │ -│ │ │ │ │ -│ │ ├─► Decode via Dictionary: Event → RDFEvent │ │ -│ │ │ └─ Expand u32 IDs to full URI strings │ │ -│ │ │ │ │ -│ │ ├─► Convert RDFEvent → Oxigraph Quad │ │ -│ │ │ │ │ -│ │ ├─► Build QuadContainer │ │ -│ │ │ │ │ -│ │ ├─► ⚠️ Execute SPARQL via OxigraphAdapter │ │ -│ │ │ └─ Returns Vec (needs proper binding format) │ │ -│ │ │ │ │ -│ │ └─► Convert to QueryResult │ │ -│ │ └─ { query_id, timestamp, ResultSource::Historical, │ │ -│ │ bindings: Vec> } │ │ -│ │ │ │ -│ └───────────────────────────────────────────────────────────────┘ │ -│ │ -│ ┌───────────────────────────────────────────────────────────────┐ │ -│ │ LIVE PATH (Stream Processing) │ │ -│ ├───────────────────────────────────────────────────────────────┤ │ -│ │ │ │ -│ │ ✅ LiveStreamProcessing (src/stream/live_stream_processing.rs)│ │ -│ │ │ │ │ -│ │ ├─► Initialize RSPEngine with RSP-QL query │ │ -│ │ │ │ │ -│ │ ├─► Register streams from query windows │ │ -│ │ │ │ │ -│ │ ├─► start_processing() → Receiver │ │ -│ │ │ │ │ -│ │ ├─► ❌ Subscribe to EventBus for incoming events │ │ -│ │ │ │ │ -│ │ ├─► add_event(stream_uri, RDFEvent) │ │ -│ │ │ └─ Converts to Quad, adds to RDFStream │ │ -│ │ │ │ │ -│ │ ├─► Windows trigger automatically (time-based) │ │ -│ │ │ │ │ -│ │ ├─► receive_result() / collect_results() │ │ -│ │ │ └─ Gets BindingWithTimestamp from RSP engine │ │ -│ │ │ │ │ -│ │ └─► Convert to QueryResult │ │ -│ │ └─ { query_id, timestamp, ResultSource::Live, │ │ -│ │ bindings: Vec> } │ │ -│ │ │ │ -│ └───────────────────────────────────────────────────────────────┘ │ -│ │ -└─────────────────────────────────────────────────────────────────────────────┘ - │ - │ - ▼ -┌─────────────────────────────────────────────────────────────────────────────┐ -│ SPARQL ENGINES │ -├─────────────────────────────────────────────────────────────────────────────┤ -│ │ -│ ⚠️ OxigraphAdapter (src/querying/oxigraph_adapter.rs) │ -│ │ -│ execute_query(sparql: &str, container: &QuadContainer) │ -│ → Result, Error> ⚠️ Returns debug format │ -│ │ -│ ❌ execute_query_bindings(sparql: &str, container: &QuadContainer) │ -│ → Result>, Error> <-- NEEDED │ -│ │ -│ ⚠️ KolibrieAdapter (stubbed, not functional) │ -│ │ -└─────────────────────────────────────────────────────────────────────────────┘ -``` - ---- - -## Data Flow: End-to-End Query Execution - -### Scenario: Temperature Sensor Monitoring - -**JanusQL Query:** -```sparql -PREFIX ex: -REGISTER RStream AS -SELECT ?sensor ?temp -FROM NAMED WINDOW ex:historical ON STREAM ex:sensors [RANGE 3600000 STEP 600000] -FROM NAMED WINDOW ex:live ON STREAM ex:sensors [RANGE 5000 STEP 1000] -WHERE { - WINDOW ?w { ?sensor ex:temperature ?temp } -} -``` - -### Phase 1: Registration (✅ Working) - -``` -User - │ - │ query_cli --register temp_monitor --query sensors.janusql - │ - ▼ -JanusApi::register_query() - │ - ├─► JanusQLParser::parse() - │ ├─ Extracts windows - │ ├─ Generates RSP-QL for live - │ ├─ Generates SPARQL for historical - │ └─ Returns ParsedJanusQuery - │ - └─► QueryRegistry::register() - └─ Stores metadata with query_id -``` - -### Phase 2: Historical Data Ingestion (✅ Working) - -``` -Historical Data File: sensors_historical.nq - │ - │ "23.5" . - │ "24.1" . - │ - │ stream_bus_cli --input sensors_historical.nq --broker none --storage-path ./data - │ - ▼ -StreamBus::run() - │ - ├─► parse_rdf_line() → RDFEvent - │ └─ RDFEvent { timestamp: 1000, subject: "http://...", ... } - │ - └─► StreamingSegmentedStorage::write() - │ - ├─► Dictionary::encode() → Event - │ ├─ "http://ex.org/s1" → ID: 1 - │ ├─ "http://ex.org/temp" → ID: 2 - │ ├─ "23.5" → ID: 3 - │ └─ Event { s: 1, p: 2, o: 3, g: 0, ts: 1000 } (24 bytes) - │ - └─► BatchBuffer::push() - └─ Background thread flushes to segment files -``` - -### Phase 3: Query Execution Start (❌ Not Implemented) - -``` -User - │ - │ query_cli --execute temp_monitor --format json - │ - ▼ -JanusApi::start_query("temp_monitor") - │ - ├─► Validate query exists - │ - ├─► Create result channel - │ └─ (result_tx, result_rx) = mpsc::channel() - │ - ├─► ❌ Spawn HISTORICAL WORKER Thread - │ │ - │ ├─► Parse historical windows - │ │ └─ Window: RANGE 3600000 STEP 600000 - │ │ → Query last hour in 10-minute chunks - │ │ - │ ├─► For each time window [start_ts, end_ts]: - │ │ │ - │ │ ├─► storage.read_range(start_ts, end_ts) - │ │ │ └─ Returns Vec (encoded) - │ │ │ - │ │ ├─► Dictionary::decode() each Event → RDFEvent - │ │ │ └─ ID: 1 → "http://ex.org/s1" - │ │ │ - │ │ ├─► Convert RDFEvent → Oxigraph Quad - │ │ │ └─ Quad { s: NamedNode, p: NamedNode, o: Literal, g: ... } - │ │ │ - │ │ ├─► Build QuadContainer(quads, end_ts) - │ │ │ - │ │ ├─► OxigraphAdapter::execute_query_bindings(sparql, container) - │ │ │ └─ Returns Vec, ...> - │ │ │ - │ │ └─► Send QueryResult - │ │ └─ result_tx.send(QueryResult { - │ │ query_id: "temp_monitor", - │ │ timestamp: end_ts, - │ │ source: ResultSource::Historical, - │ │ bindings: [{ - │ │ "?sensor": "http://ex.org/s1", - │ │ "?temp": "23.5" - │ │ }] - │ │ }) - │ │ - │ └─► Complete (historical data exhausted) - │ - ├─► ❌ Spawn LIVE WORKER Thread - │ │ - │ ├─► LiveStreamProcessing::new(rspql_query) - │ │ - │ ├─► register_stream("http://ex.org/sensors") - │ │ - │ ├─► start_processing() - │ │ - │ ├─► ❌ Subscribe to EventBus - │ │ └─ event_rx = event_bus.subscribe() - │ │ - │ └─► Loop: - │ │ - │ ├─► event_rx.try_recv() → RDFEvent - │ │ - │ ├─► LiveStreamProcessing::add_event(stream_uri, event) - │ │ ├─ Converts to Quad - │ │ ├─ Adds to RDFStream - │ │ └─ RSP engine processes windows - │ │ - │ ├─► try_receive_result() → BindingWithTimestamp - │ │ - │ └─► Send QueryResult - │ └─ result_tx.send(QueryResult { - │ query_id: "temp_monitor", - │ timestamp: result.timestamp, - │ source: ResultSource::Live, - │ bindings: convert_bindings(result) - │ }) - │ - └─► Return QueryHandle { query_id, receiver: result_rx } -``` - -### Phase 4: Live Data Ingestion (❌ EventBus Integration Missing) - -``` -Live Data Stream - │ - │ "25.0" . - │ - │ stream_bus_cli --input - --broker none --add-timestamps - │ - ▼ -StreamBus::run() - │ - ├─► parse_rdf_line() → RDFEvent - │ - ├─► storage.write(&[event]) ✅ Works - │ - └─► ❌ event_bus.publish(event) <-- MISSING - │ - └─► EventBus distributes to subscribers - │ - └─► Live Worker receives event - └─► Adds to LiveStreamProcessing -``` - -### Phase 5: Result Consumption (✅ QueryHandle API exists) - -``` -QueryHandle - │ - ├─► handle.receive() → blocks for next result - │ │ - │ └─► QueryResult { - │ query_id: "temp_monitor", - │ timestamp: 1640000000, - │ source: Historical | Live, - │ bindings: [{ "?sensor": "...", "?temp": "23.5" }] - │ } - │ - └─► User displays results (CLI table, JSON, or WebSocket to Flutter) -``` - ---- - -## Critical Missing Components Summary - -### 1. JanusApi::start_query() Implementation -- **Status:** ❌ Commented out (lines 128-140 in janus_api.rs) -- **Impact:** Cannot execute queries at all -- **Effort:** High (200-300 lines, complex threading) -- **Priority:** 🔴 CRITICAL - -### 2. HistoricalExecutor -- **Status:** ❌ Doesn't exist -- **Impact:** No historical query results -- **Effort:** Medium (150-200 lines) -- **Priority:** 🔴 CRITICAL - -### 3. EventBus for Live Integration -- **Status:** ❌ Doesn't exist -- **Impact:** No live query results -- **Effort:** Medium (100-150 lines) -- **Priority:** 🔴 CRITICAL - -### 4. SPARQL Result Formatting -- **Status:** ⚠️ Returns debug strings, not structured bindings -- **Impact:** Results are unparseable -- **Effort:** Low (50-75 lines) -- **Priority:** 🔴 CRITICAL - -### 5. Query Execution CLI -- **Status:** ❌ Doesn't exist (only ingestion CLI exists) -- **Impact:** No user interface for queries -- **Effort:** Medium (200-250 lines) -- **Priority:** 🟠 HIGH - -### 6. End-to-End Integration Test -- **Status:** ❌ Doesn't exist -- **Impact:** Can't validate MVP works -- **Effort:** Medium (150-200 lines) -- **Priority:** 🟠 HIGH - ---- - -## Thread Architecture - -``` -┌─────────────────────────────────────────────────────────────────┐ -│ Main Thread │ -├─────────────────────────────────────────────────────────────────┤ -│ │ -│ - Accept API calls (register_query, start_query, stop_query) │ -│ - Manage running queries map │ -│ - Return QueryHandle to caller │ -│ │ -└───────────┬──────────────────────────────┬──────────────────────┘ - │ │ - │ Spawns │ Spawns - │ │ - ▼ ▼ -┌─────────────────────────┐ ┌─────────────────────────────────┐ -│ Historical Worker │ │ Live Worker Thread │ -│ Thread │ │ │ -├─────────────────────────┤ ├─────────────────────────────────┤ -│ │ │ │ -│ Loop over time windows │ │ Loop: │ -│ ├─ Query storage │ │ ├─ Receive events from bus │ -│ ├─ Decode events │ │ ├─ Add to LiveProcessing │ -│ ├─ Execute SPARQL │ │ ├─ Poll for results │ -│ └─ Send results │ │ └─ Send results │ -│ │ │ │ -│ Listens for shutdown │ │ Listens for shutdown │ -│ │ │ │ -└─────────────────────────┘ └─────────────────────────────────┘ - │ │ - │ Sends via mpsc::Sender │ Sends via mpsc::Sender - │ │ - ▼ ▼ -┌─────────────────────────────────────────────────────────────────┐ -│ Result Channel (mpsc) │ -│ │ -│ QueryHandle holds mpsc::Receiver │ -│ ├─ receive() blocks for next result │ -│ └─ try_receive() non-blocking │ -│ │ -└─────────────────────────────────────────────────────────────────┘ -``` - ---- - -## Data Model Reference - -### RDFEvent (User-facing) -```rust -pub struct RDFEvent { - pub timestamp: u64, - pub subject: String, // Full URI: "http://example.org/alice" - pub predicate: String, // Full URI: "http://example.org/knows" - pub object: String, // Full URI or literal: "Bob" or "http://..." - pub graph: String, // Full URI: "http://example.org/graph1" -} -``` - -### Event (Storage-internal, 24 bytes) -```rust -pub struct Event { - pub subject: u32, // Dictionary ID - pub predicate: u32, // Dictionary ID - pub object: u32, // Dictionary ID - pub graph: u32, // Dictionary ID - pub timestamp: u64, // Milliseconds since epoch -} -``` - -### QueryResult (Output) -```rust -pub struct QueryResult { - pub query_id: QueryId, - pub timestamp: u64, - pub source: ResultSource, // Historical | Live - pub bindings: Vec>, -} - -// Example: -QueryResult { - query_id: "temp_monitor", - timestamp: 1640000000, - source: ResultSource::Historical, - bindings: vec![ - HashMap::from([ - ("?sensor".to_string(), "http://example.org/sensor1".to_string()), - ("?temp".to_string(), "23.5".to_string()), - ]), - ], -} -``` - ---- - -## Next Steps - -See **`MVP_TODO.md`** for detailed implementation tasks, estimates, and priority order. - -**Quick Start:** -1. Implement `OxigraphAdapter::execute_query_bindings()` (easiest) -2. Create `HistoricalExecutor` (foundational) -3. Create `EventBus` (enables live) -4. Implement `JanusApi::start_query()` (ties it all together) -5. Write integration test (validates MVP) -6. Build Query CLI (makes it usable) \ No newline at end of file diff --git a/docs/MVP_TODO.md b/docs/MVP_TODO.md deleted file mode 100644 index fa76e29..0000000 --- a/docs/MVP_TODO.md +++ /dev/null @@ -1,42 +0,0 @@ -# Janus MVP TODO - -This file is retained as a planning artifact. - -It no longer describes the current backend state accurately as a whole. - -## What Is Already Present - -The repository already includes: - -- JanusQL parsing -- historical query execution -- live query execution -- HTTP/WebSocket endpoints -- replay control endpoints -- integration tests for the Janus API and HTTP server - -## What Still Needs Follow-Up - -The remaining work is no longer “build the MVP from scratch”. The current gaps are narrower: - -### Documentation coherence - -- keep top-level docs aligned with the actual binaries and API -- clearly separate current guides from historical design notes -- document the backend/dashboard repo split - -### Runtime lifecycle hardening - -- increment query execution counts when queries start -- tighten shutdown and worker cleanup behavior -- manage spawned MQTT subscriber handles consistently -- make query status transitions more explicit - -### Repo boundary cleanup - -- treat the local dashboard as a demo client -- keep product dashboard work in the separate `janus-dashboard` repository - -## Historical Note - -Earlier versions of this file described `JanusApi::start_query()` and the HTTP path as missing. That is no longer true in the current codebase. diff --git a/docs/QUICKSTART_HTTP_API.md b/docs/QUICKSTART_HTTP_API.md index f4a0050..ade4df4 100644 --- a/docs/QUICKSTART_HTTP_API.md +++ b/docs/QUICKSTART_HTTP_API.md @@ -12,6 +12,7 @@ cargo run --bin http_server ```bash curl http://localhost:8080/health +curl http://localhost:8080/ops/status ``` ## 3. Register a query @@ -65,13 +66,7 @@ Check replay metrics: curl http://localhost:8080/api/replay/status ``` -## Optional Demo Client - -This repository still contains a demo HTML dashboard: - -```bash -open examples/demo_dashboard.html -``` +## Optional Frontend For the maintained frontend, use: diff --git a/docs/QUICK_REFERENCE.md b/docs/QUICK_REFERENCE.md index 90d58cd..ad5e6b9 100644 --- a/docs/QUICK_REFERENCE.md +++ b/docs/QUICK_REFERENCE.md @@ -8,20 +8,17 @@ cargo run --bin http_server -- --host 127.0.0.1 --port 8080 --storage-dir ./data cargo run --example http_client_example ``` -## Optional Manual Demo +## Optional Frontend -```bash -open examples/demo_dashboard.html -``` - -The static HTML demo is only for local manual testing. The maintained web -dashboard lives in the separate `janus-dashboard` repository. +The maintained web dashboard lives in the separate `janus-dashboard` +repository. ## API Endpoints ```bash # Health GET http://localhost:8080/health +GET http://localhost:8080/ops/status # Queries POST /api/queries # Register @@ -104,7 +101,6 @@ docker-compose restart mosquitto - [ ] MQTT running: `docker ps | grep mosquitto` - [ ] Server running: `curl localhost:8080/health` - [ ] Example client runs: `cargo run --example http_client_example` -- [ ] Optional demo opens: `open examples/demo_dashboard.html` --- diff --git a/docs/README.md b/docs/README.md index dac27f6..0d95628 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,9 +1,6 @@ # Janus Documentation -This directory contains the project documentation for Janus. - -Some files here are current product documentation. Others are older design or -milestone notes kept only for background context. +This directory contains the current Janus documentation. ## Start Here @@ -19,8 +16,6 @@ milestone notes kept only for background context. ## Supporting Material -- [ARCHITECTURE.md](./ARCHITECTURE.md): older high-level architecture notes -- [EXECUTION_ARCHITECTURE.md](./EXECUTION_ARCHITECTURE.md): historical execution design notes - [BENCHMARK_RESULTS.md](./BENCHMARK_RESULTS.md): benchmark data - [STREAM_BUS_CLI.md](./STREAM_BUS_CLI.md): replay and ingestion CLI @@ -33,8 +28,3 @@ The maintained dashboard lives in: - `https://github.com/SolidLabResearch/janus-dashboard` The static demo in this repository is only for local manual backend testing. - -## Notes - -- The files listed under Start Here are the current sources of truth for `main`. -- Older files are still useful for background, but they may describe previous milestones or implementation states. diff --git a/docs/README_HTTP_API.md b/docs/README_HTTP_API.md index a5bc568..f048770 100644 --- a/docs/README_HTTP_API.md +++ b/docs/README_HTTP_API.md @@ -96,11 +96,18 @@ curl -X POST http://localhost:8080/api/replay/stop curl http://localhost:8080/api/replay/status ``` +### Ops status + +```bash +curl http://localhost:8080/ops/status +``` + ## Endpoint Summary | Method | Endpoint | Description | |--------|----------|-------------| | GET | `/health` | Health check | +| GET | `/ops/status` | Detailed operational status | | POST | `/api/queries` | Register query | | GET | `/api/queries` | List queries | | GET | `/api/queries/:id` | Query details | @@ -112,16 +119,9 @@ curl http://localhost:8080/api/replay/status | POST | `/api/replay/stop` | Stop replay | | GET | `/api/replay/status` | Replay status | -## Local Demo Dashboard - -You can still use the demo HTML client included in this repository: - -```bash -open examples/demo_dashboard.html -``` - -The maintained dashboard lives separately: +## Dashboard +The maintained production dashboard lives separately: - `https://github.com/SolidLabResearch/janus-dashboard` ## Related Docs diff --git a/docs/RSP_INTEGRATION_COMPLETE.md b/docs/RSP_INTEGRATION_COMPLETE.md deleted file mode 100644 index f522f10..0000000 --- a/docs/RSP_INTEGRATION_COMPLETE.md +++ /dev/null @@ -1,369 +0,0 @@ -# RSP-RS Integration Complete ✅ - -**Integration Status:** PRODUCTION READY -**rsp-rs Version:** 0.3.1 -**Date:** January 2025 -**All Tests Passing:** ✅ 14/14 - ---- - -## Summary - -The Janus `LiveStreamProcessing` module has been successfully implemented and fully integrated with rsp-rs 0.3.1, enabling real-time RDF stream processing using RSP-QL queries. The integration is complete, tested, and ready for production use. - ---- - -## What Was Implemented - -### 1. LiveStreamProcessing Module -**File:** `src/stream/live_stream_processing.rs` (486 lines) - -**Features:** -- ✅ Real-time RSP-QL query execution -- ✅ Stream registration and management -- ✅ Event-by-event processing (true streaming) -- ✅ Window-based aggregation support -- ✅ Static data joins -- ✅ Multiple result collection methods -- ✅ Stream closure with sentinel events -- ✅ Comprehensive error handling -- ✅ Full conversion between Janus `RDFEvent` and Oxigraph `Quad` - -**API Methods:** -```rust -LiveStreamProcessing::new(query: String) -> Result -register_stream(stream_uri: &str) -> Result<(), Error> -start_processing() -> Result<(), Error> -add_event(stream_uri: &str, event: RDFEvent) -> Result<(), Error> -add_events(stream_uri: &str, events: Vec) -> Result<(), Error> -close_stream(stream_uri: &str, final_timestamp: i64) -> Result<(), Error> -add_static_data(event: RDFEvent) -> Result<(), Error> -receive_result() -> Result, Error> -try_receive_result() -> Result, Error> -collect_results(max: Option) -> Result, Error> -get_registered_streams() -> Vec -is_processing() -> bool -``` - -### 2. Tests & Examples - -**Unit Tests:** 4 tests in `src/stream/live_stream_processing.rs` -- ✅ `test_create_processor` - Engine initialization -- ✅ `test_register_stream` - Stream registration -- ✅ `test_rdf_event_to_quad` - Data conversion -- ✅ `test_processing_state` - State management - -**Integration Tests:** 10 tests in `tests/live_stream_integration_test.rs` -- ✅ `test_simple_window_query` - Basic windowing -- ✅ `test_iot_sensor_streaming` - Real-world IoT scenario -- ✅ `test_multiple_streams_registration` - Stream management -- ✅ `test_window_timing` - Window closure timing -- ✅ `test_empty_window` - Edge case handling -- ✅ `test_processing_state_management` - State validation -- ✅ `test_unregistered_stream_error` - Error handling -- ✅ `test_literal_and_uri_objects` - Object type support -- ✅ `test_rapid_event_stream` - High-throughput streaming -- ✅ `test_result_collection_methods` - All collection patterns - -**Examples:** -- ✅ `examples/minimal_rsp_test.rs` - Simple verification example -- ✅ `examples/live_stream_processing_example.rs` - Comprehensive IoT demo - -### 3. Documentation - -**Comprehensive Guides:** -- ✅ `docs/LIVE_STREAM_PROCESSING.md` (478 lines) - - Architecture overview - - RSP-QL syntax guide - - Complete usage examples - - Performance considerations - - Troubleshooting guide - - API reference - -- ✅ `docs/RSP_RS_INTEGRATION_STATUS.md` (389 lines) - - Technical implementation details - - Bug analysis and resolution - - Performance benchmarks - - Integration patterns - ---- - -## Bug Fix Journey - -### The Problem -Initially, windows were processing and queries were executing, but **no results were being received** through the channel. - -### Root Cause Discovery -Through systematic debugging, we discovered: -1. ✅ Windows were closing correctly -2. ✅ Queries were executing (15+ quads processed) -3. ❌ Query asked for `GRAPH ex:w1 { ?s ?p ?o }` -4. ❌ But quads had `graph_name: DefaultGraph` -5. ❌ **Graph name mismatch = no matches = no results** - -### The Fix (rsp-rs 0.3.1) -When quads are added to a window, they are now automatically assigned to the window's named graph: -```rust -graph_name: NamedNode(NamedNode { iri: "http://example.org/w1" }) -``` - -### Verification -``` -Before fix (rsp-rs 0.3.0): - Total results received: 0 - -After fix (rsp-rs 0.3.1): - Total results received: 21 - ✅ SUCCESS: Integration working! -``` - ---- - -## Test Results - -### All Tests Pass -``` -cargo test --lib stream::live_stream_processing -test result: ok. 4 passed; 0 failed - -cargo test --test live_stream_integration_test -test result: ok. 10 passed; 0 failed - -cargo run --example minimal_rsp_test -✅ SUCCESS: Integration working! -Total results received: 21 -``` - -### CI/CD Checks -```bash -./scripts/ci-check.sh -✅ Formatting check passed! -✅ Clippy check passed! -✅ All tests passed! -✅ Build successful! -All CI/CD checks passed! Safe to push. -``` - ---- - -## Usage Example - -```rust -use janus::core::RDFEvent; -use janus::stream::live_stream_processing::LiveStreamProcessing; - -// Define RSP-QL query -let query = r#" - PREFIX ex: - REGISTER RStream AS - SELECT ?sensor ?temp - FROM NAMED WINDOW ex:w1 ON STREAM ex:sensors [RANGE 10000 STEP 2000] - WHERE { - WINDOW ex:w1 { ?sensor ex:temperature ?temp } - } -"#; - -// Create processor -let mut processor = LiveStreamProcessing::new(query.to_string())?; -processor.register_stream("http://example.org/sensors")?; -processor.start_processing()?; - -// Add events one at a time (true streaming) -for i in 0..100 { - let event = RDFEvent::new( - i * 1000, // timestamp - "http://example.org/sensor1", - "http://example.org/temperature", - &format!("{}", 20 + (i % 10)), - "" - ); - processor.add_event("http://example.org/sensors", event)?; -} - -// Close stream to get final results -processor.close_stream("http://example.org/sensors", 100000)?; - -// Collect results -let results = processor.collect_results(None)?; -for result in results { - println!("Window [{} to {}]: {}", - result.timestamp_from, - result.timestamp_to, - result.bindings); -} -``` - ---- - -## Performance Characteristics - -Based on rsp-rs benchmarks and Janus testing: - -**Throughput:** -- ~1.28M quads/sec (100-quad batches) -- ~868K quads/sec (500-quad batches) - -**Latency:** -- Query execution: ~87 µs for 100 quads -- Window processing: ~391-717 µs for 30-second windows -- First result: After first STEP interval (e.g., 2 seconds for STEP 2000) - -**Memory:** -- Base overhead: ~2-5 MB for engine structures -- Per quad in window: ~2.5 KB -- Example: 30-second window at 10 quads/sec = ~0.75 MB - ---- - -## Architecture - -### Data Flow -``` -RDFEvent (Janus) - ↓ -Oxigraph Quad (conversion) - ↓ -RDFStream (rsp-rs) - ↓ -CSPARQLWindow (assigns window graph) - ↓ -SPARQL Query Execution - ↓ -BindingWithTimestamp (results) - ↓ -mpsc::Receiver (Janus) -``` - -### Key Design Decisions - -1. **One Event at a Time:** True streaming, no batch processing -2. **Window State Management:** Handled entirely by rsp-rs -3. **Graph Assignment:** Quads automatically assigned to window graph (rsp-rs 0.3.1) -4. **Cloneable Streams:** RDFStream is cloneable for easier API usage -5. **Explicit Stream Closure:** `close_stream()` method for clean shutdown - ---- - -## Integration Checklist - -- ✅ rsp-rs 0.3.1 dependency added -- ✅ LiveStreamProcessing module implemented -- ✅ Unit tests passing (4/4) -- ✅ Integration tests passing (10/10) -- ✅ Examples working -- ✅ Documentation complete -- ✅ CI/CD checks passing -- ✅ Clippy warnings fixed -- ✅ Code formatting verified -- ✅ Error handling comprehensive -- ✅ API documented with examples - ---- - -## Known Limitations - -1. **Object Type Detection:** Simple heuristic (http:// = URI, else Literal) - - For complex datatypes (xsd:integer, etc.), extend `rdf_event_to_quad()` - -2. **Single Query per Processor:** Each instance handles one RSP-QL query - - Create multiple processors for multiple queries - -3. **Timestamp Range:** Uses i64 for rsp-rs compatibility - - Timestamps must be < i64::MAX (unlikely to be an issue) - ---- - -## Future Enhancements - -**Potential Improvements:** -- [ ] Support for IStream/DStream (currently only RStream) -- [ ] Typed literal support (xsd:integer, xsd:dateTime, etc.) -- [ ] Custom result formatters (JSON, CSV, RDFEvent) -- [ ] Backpressure management for high-throughput scenarios -- [ ] Multi-query support in single processor -- [ ] Integration with Kafka/MQTT sources -- [ ] Query validation before execution -- [ ] Performance metrics and monitoring - ---- - -## Files Modified/Created - -**Core Implementation:** -- `src/stream/live_stream_processing.rs` (486 lines) - CREATED -- `Cargo.toml` - MODIFIED (added rsp-rs 0.3.1) - -**Tests:** -- `tests/live_stream_integration_test.rs` (356 lines) - CREATED - -**Examples:** -- `examples/minimal_rsp_test.rs` (94 lines) - CREATED -- `examples/live_stream_processing_example.rs` (161 lines) - CREATED - -**Documentation:** -- `docs/LIVE_STREAM_PROCESSING.md` (478 lines) - CREATED -- `docs/RSP_RS_INTEGRATION_STATUS.md` (389 lines) - CREATED -- `RSP_INTEGRATION_COMPLETE.md` (this file) - CREATED - ---- - -## Commands - -**Run All Tests:** -```bash -cargo test --lib stream::live_stream_processing -cargo test --test live_stream_integration_test -``` - -**Run Examples:** -```bash -cargo run --example minimal_rsp_test -cargo run --example live_stream_processing_example -``` - -**CI/CD Check:** -```bash -./scripts/ci-check.sh -``` - -**Format Code:** -```bash -cargo fmt --all -``` - -**Lint Check:** -```bash -cargo clippy --all-targets --all-features -- -D warnings -``` - ---- - -## Acknowledgments - -This integration was made possible by: -- **rsp-rs 0.3.1** - For fixing the graph name assignment bug -- **Oxigraph** - For SPARQL query execution -- **Janus Architecture** - For the clean two-layer data model - -Special thanks for the collaborative debugging process that identified the root cause! - ---- - -## Contact & Support - -**For Questions:** -- Janus Implementation: See `src/stream/live_stream_processing.rs` -- Usage Guide: See `docs/LIVE_STREAM_PROCESSING.md` -- Technical Details: See `docs/RSP_RS_INTEGRATION_STATUS.md` - -**Repository:** https://github.com/SolidLabResearch/janus - ---- - -## Status: ✅ PRODUCTION READY - -The rsp-rs 0.3.1 integration with Janus is **complete, tested, and production-ready**. - -All 14 tests pass. All CI/CD checks pass. The integration is fully functional. - -**Last Updated:** January 2025 \ No newline at end of file diff --git a/docs/SPARQL_BINDINGS_UPGRADE.md b/docs/SPARQL_BINDINGS_UPGRADE.md deleted file mode 100644 index ad3e88d..0000000 --- a/docs/SPARQL_BINDINGS_UPGRADE.md +++ /dev/null @@ -1,373 +0,0 @@ -# SPARQL Structured Bindings Upgrade - -**Date:** 2024 -**Version:** 0.1.0 -**Author:** Janus Development Team -**Status:** ✅ Complete - -## Overview - -Enhanced the `OxigraphAdapter` to support structured SPARQL query results with the new `execute_query_bindings()` method. This replaces debug-formatted strings with proper variable bindings using `HashMap`. - -## Motivation - -**Problem:** The original `execute_query()` method returned `Vec` with debug format output like: -``` -"QuerySolution { s: NamedNode(\"http://example.org/alice\"), p: NamedNode(...) }" -``` - -**Solution:** New method returns structured bindings: -```rust -Vec> where each HashMap is: -{ - "s": "", - "p": "", - "o": "" -} -``` - -## Changes - -### 1. New Method: `execute_query_bindings()` - -**File:** `src/querying/oxigraph_adapter.rs` - -**Signature:** -```rust -pub fn execute_query_bindings( - &self, - query: &str, - container: &QuadContainer, -) -> Result>, OxigraphError> -``` - -**Features:** -- Returns structured bindings as `Vec>` -- Each HashMap represents one solution/row -- Variable names are HashMap keys -- Bound values are HashMap values as strings -- Returns empty vector for ASK/CONSTRUCT queries -- Full error handling via `OxigraphError` - -### 2. Enhanced Documentation - -**Module-Level Docs:** -- Added comprehensive usage examples -- Explained both `execute_query()` and `execute_query_bindings()` -- Included complete working example with imports - -**Method-Level Docs:** -- Detailed parameter descriptions -- Return value documentation -- Usage examples in doc comments - -### 3. Comprehensive Test Suite - -**File:** `tests/oxigraph_adapter_test.rs` - -**Added 12 New Tests:** - -| Test | Purpose | -|------|---------| -| `test_execute_query_bindings_simple_select` | Basic SELECT with multiple variables | -| `test_execute_query_bindings_with_literals` | Queries returning literal values (ages) | -| `test_execute_query_bindings_single_variable` | Single variable SELECT queries | -| `test_execute_query_bindings_with_filter` | FILTER clause support | -| `test_execute_query_bindings_empty_result` | Queries matching no data | -| `test_execute_query_bindings_empty_container` | Empty QuadContainer handling | -| `test_execute_query_bindings_ask_query_returns_empty` | ASK queries return empty (use `execute_query()`) | -| `test_execute_query_bindings_construct_query_returns_empty` | CONSTRUCT queries return empty | -| `test_execute_query_bindings_invalid_query` | Error handling for malformed SPARQL | -| `test_execute_query_bindings_multiple_variables` | Three-variable SELECT queries | -| `test_execute_query_bindings_with_aggregation` | COUNT and other aggregations | -| `test_execute_query_bindings_comparison_with_execute_query` | Verify consistency with original method | - -**Test Results:** -``` -running 25 tests -test result: ok. 25 passed; 0 failed; 0 ignored -``` - -## Usage Examples - -### Basic Usage - -```rust -use janus::querying::oxigraph_adapter::OxigraphAdapter; - -let adapter = OxigraphAdapter::new(); - -let query = r" - PREFIX ex: - SELECT ?person ?age WHERE { - ?person ex:age ?age - } -"; - -let bindings = adapter.execute_query_bindings(query, &container)?; - -for binding in bindings { - println!("Person: {}, Age: {}", - binding.get("person").unwrap(), - binding.get("age").unwrap()); -} -``` - -### Accessing Specific Variables - -```rust -let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }"; -let bindings = adapter.execute_query_bindings(query, &container)?; - -for binding in bindings { - let subject = binding.get("s").unwrap(); - let predicate = binding.get("p").unwrap(); - let object = binding.get("o").unwrap(); - - // Process structured data - process_triple(subject, predicate, object); -} -``` - -### With FILTER Clauses - -```rust -let query = r#" - PREFIX ex: - SELECT ?person ?age WHERE { - ?person ex:age ?age . - FILTER(?age > "25") - } -"#; - -let bindings = adapter.execute_query_bindings(query, &container)?; -// Returns only people older than 25 -``` - -### Aggregation Queries - -```rust -let query = r" - PREFIX ex: - SELECT (COUNT(?s) AS ?count) WHERE { - ?s ex:knows ?o - } -"; - -let bindings = adapter.execute_query_bindings(query, &container)?; -let count = bindings[0].get("count").unwrap(); -println!("Total relationships: {}", count); -``` - -## Migration Guide - -### Before (Debug Format) - -```rust -let results = adapter.execute_query(query, &container)?; -for result in results { - // Result is a debug-formatted string - println!("{}", result); // "QuerySolution { s: NamedNode(...) }" - - // Hard to parse programmatically -} -``` - -### After (Structured Bindings) - -```rust -let bindings = adapter.execute_query_bindings(query, &container)?; -for binding in bindings { - // Easy programmatic access - let subject = binding.get("s").unwrap(); - let object = binding.get("o").unwrap(); - - // Direct string values - println!("Subject: {}, Object: {}", subject, object); -} -``` - -## Design Decisions - -### 1. Separate Method vs Trait Update - -**Decision:** Added as a separate method, not part of `SparqlEngine` trait. - -**Rationale:** -- Maintains backward compatibility -- `execute_query()` still useful for debugging -- Allows gradual migration -- Different use cases (debug vs production) - -### 2. Return Type: `HashMap` - -**Decision:** Use `HashMap` for bindings. - -**Rationale:** -- Simple and ergonomic API -- Variable names naturally map to keys -- String values compatible with RDF term representations -- Easy to serialize/deserialize -- Familiar Rust pattern - -### 3. Empty Vector for ASK/CONSTRUCT - -**Decision:** Return empty `Vec` for non-SELECT queries. - -**Rationale:** -- SELECT queries have variable bindings -- ASK queries return boolean (use `execute_query()`) -- CONSTRUCT queries return triples (use `execute_query()`) -- Type consistency across query types -- Clear separation of concerns - -### 4. Debug Mode Output - -**Decision:** Keep debug printing in `#[cfg(debug_assertions)]` blocks. - -**Rationale:** -- Consistent with existing codebase patterns -- Helpful for development/debugging -- Zero runtime cost in release builds -- Maintains existing behavior - -## Performance Characteristics - -### Memory - -- **Before:** `Vec` with formatted debug strings (~200-500 bytes/result) -- **After:** `Vec>` with structured data (~150-300 bytes/result) -- **Impact:** ~30% memory reduction in typical queries - -### CPU - -- **Overhead:** Minimal - iterating solution bindings is O(n×m) where n=results, m=variables -- **Benefit:** Eliminates string parsing in consuming code -- **Net:** Performance neutral or slight improvement - -### Allocations - -- Creates one `HashMap` per solution -- Allocates strings for keys and values -- Similar allocation count to debug formatting -- Better cache locality for structured access - -## Testing Strategy - -### Unit Test Coverage - -- ✅ Simple SELECT queries -- ✅ Multi-variable queries -- ✅ Literal value handling -- ✅ FILTER clause support -- ✅ Empty result sets -- ✅ Empty containers -- ✅ Invalid queries -- ✅ Aggregations -- ✅ ASK/CONSTRUCT edge cases - -### Integration Testing - -All tests use realistic RDF data: -- Alice knows Bob (subject-object relationships) -- Bob knows Charlie (transitive relationships) -- Age literals (typed literals) -- Multiple predicates (knows, age) - -### Error Handling - -- ✅ Malformed SPARQL syntax -- ✅ Storage errors propagated -- ✅ Query evaluation errors caught -- ✅ Proper `OxigraphError` conversion - -## Code Quality - -### Formatting -```bash -cargo fmt --check -- src/querying/oxigraph_adapter.rs -✅ No formatting issues -``` - -### Linting -```bash -cargo clippy --lib -✅ No warnings in oxigraph_adapter.rs -``` - -### Documentation -```bash -cargo doc --no-deps --package janus -✅ Documentation builds successfully -``` - -## Backward Compatibility - -### Maintained - -- ✅ Original `execute_query()` method unchanged -- ✅ `SparqlEngine` trait unchanged -- ✅ All existing tests pass -- ✅ No breaking changes to public API - -### Additions - -- ✅ New `execute_query_bindings()` method -- ✅ New import: `use std::collections::HashMap;` -- ✅ Enhanced module documentation - -## Future Enhancements - -### Potential Improvements - -1. **Typed Bindings:** Return `HashMap` for type-safe access -2. **Lazy Iteration:** Stream bindings instead of collecting into Vec -3. **Zero-Copy:** Reference container data without cloning -4. **Result Pagination:** Support LIMIT/OFFSET efficiently -5. **Trait Integration:** Add to `SparqlEngine` trait with default impl - -### Compatibility Considerations - -- Current design allows all enhancements without breaking changes -- String-based API provides stable interface -- Can add typed variants alongside existing methods - -## Related Documentation - -- **Architecture:** `docs/ARCHITECTURE.md` -- **RSP Integration:** `docs/RSP_INTEGRATION_COMPLETE.md` -- **API Docs:** Generated via `cargo doc` -- **Tests:** `tests/oxigraph_adapter_test.rs` - -## Verification Commands - -```bash -# Run all Oxigraph adapter tests -cargo test --test oxigraph_adapter_test - -# Run only new binding tests -cargo test --test oxigraph_adapter_test execute_query_bindings - -# Check formatting -cargo fmt --check - -# Run clippy -cargo clippy --lib - -# Build documentation -cargo doc --no-deps --package janus --open -``` - -## Summary - -This upgrade provides a production-ready, structured interface for SPARQL query results while maintaining full backward compatibility. The implementation is well-tested, documented, and follows Janus coding standards. - -**Key Metrics:** -- ✅ 12 new tests (100% passing) -- ✅ 0 breaking changes -- ✅ 0 clippy warnings -- ✅ ~30% memory reduction -- ✅ Comprehensive documentation -- ✅ 1 hour implementation time (as estimated) - -**Status:** Ready for integration into the main codebase. \ No newline at end of file diff --git a/docs/STREAM_BUS_CLI.md b/docs/STREAM_BUS_CLI.md index 32ed170..65cccba 100644 --- a/docs/STREAM_BUS_CLI.md +++ b/docs/STREAM_BUS_CLI.md @@ -427,8 +427,7 @@ stream_bus_cli --input large.nq --broker kafka 2>&1 | tee -a processing.log ## See Also - [Stream Bus Module Documentation](../src/stream_bus/stream_bus.rs) -- [Janus Architecture](../ARCHITECTURE.md) -- [Benchmark Results](../BENCHMARK_RESULTS.md) +- [Benchmark Results](./BENCHMARK_RESULTS.md) - [Getting Started Guide](../GETTING_STARTED.md) ## License @@ -439,4 +438,4 @@ MIT License - See [LICENCE.md](../LICENCE.md) For questions or issues: - Email: mailkushbisen@gmail.com -- GitHub: https://github.com/SolidLabResearch/janus \ No newline at end of file +- GitHub: https://github.com/SolidLabResearch/janus diff --git a/docs/WINDOW_TYPES_EXPLAINED.md b/docs/WINDOW_TYPES_EXPLAINED.md index 9434206..0277963 100644 --- a/docs/WINDOW_TYPES_EXPLAINED.md +++ b/docs/WINDOW_TYPES_EXPLAINED.md @@ -254,7 +254,6 @@ Result: Only historical, no live updates **Test it:** ```bash -open examples/demo_dashboard.html -# Start Replay → Wait 3s → Start Query -# Watch for both "historical" and "live" in results! +cargo run --example http_client_example +# Or use the dedicated janus-dashboard repository for a browser UI. ``` diff --git a/docs/live_extension_function_architecture.md b/docs/live_extension_function_architecture.md deleted file mode 100644 index 933744c..0000000 --- a/docs/live_extension_function_architecture.md +++ /dev/null @@ -1,57 +0,0 @@ -# Live Extension Function Architecture - -This document describes how Janus executes Janus-specific extension functions for live queries -without modifying the upstream `rsp-rs` crate. - -## Flow - -```mermaid -flowchart TD - A["JanusQL query"] --> B["JanusQLParser"] - B --> C["Historical windows + SPARQL"] - B --> D["Live windows + RSP-QL"] - - D --> E["LiveStreamProcessing"] - E --> F["rsp-rs RSPEngine::initialize()"] - F --> G["rsp-rs stream registry"] - F --> H["rsp-rs CSPARQL windows"] - - I["MQTT / live RDF events"] --> G - G --> H - - H --> J["Janus window subscriptions"] - J --> K["Merge emitted window content with sibling windows"] - K --> L["Add mirrored static background quads"] - L --> M["Oxigraph Store"] - M --> N["build_evaluator()"] - N --> O["Oxigraph SPARQL execution"] - O --> P["Janus extension functions"] - P --> Q["BindingWithTimestamp / QueryResult"] - - C --> R["HistoricalExecutor"] - R --> S["OxigraphAdapter"] - S --> N -``` - -## Responsibilities - -- `rsp-rs` - - stream ingestion - - timestamp-driven window lifecycle - - window materialization - - window closure notifications - -- `Janus` - - JanusQL parsing - - historical execution - - live query orchestration - - Janus-specific custom function registration through `build_evaluator()` - - final SPARQL evaluation for both historical and live paths - -## Why this design - -- Keeps `rsp-rs` minimal and reusable. -- Avoids a Janus-specific fork or API expansion in `rsp-rs`. -- Lets Janus use the same extension-function mechanism on both historical and live queries. -- Intercepts at the materialized-window stage, so Janus does not re-evaluate already-produced live - bindings. Instead, it performs the final SPARQL evaluation itself once per emitted window. diff --git a/examples/demo_dashboard.html b/examples/demo_dashboard.html deleted file mode 100644 index 421fa79..0000000 --- a/examples/demo_dashboard.html +++ /dev/null @@ -1,734 +0,0 @@ - - - - - - Janus RDF Stream Processing - Demo Dashboard - - - -
-
-

Janus RDF Stream Processing Engine

-

- Unified Live and Historical RDF Stream Processing Demo - Dashboard -

-
- -
-
-

Stream Bus Replay Control

-
- - -
-
-

Replay Status

-
- Status: - Not Running -
-
- Elapsed Time: - 0s -
-
- Input File: - - -
-
- Broker: - MQTT -
-
-
- -
-

Query Execution Control

-
- - -
-
-

Query Status

-
- Status: - Not Running -
-
- Query ID: - demo_query -
-
- Results Received: - 0 -
-
- Connection: - Disconnected -
-
-
-
- -
-

Query Results Stream

-
-
-
- No results yet. Start replay and query to see results. -
-
-
- - -
- - - - diff --git a/examples/http_client_example.rs b/examples/http_client_example.rs index ddebc8b..cb45453 100644 --- a/examples/http_client_example.rs +++ b/examples/http_client_example.rs @@ -35,6 +35,14 @@ struct SuccessResponse { message: String, } +#[derive(Debug, Deserialize)] +struct HealthResponse { + status: String, + message: String, + storage_status: String, + storage_error: Option, +} + #[derive(Debug, Deserialize)] struct ListQueriesResponse { queries: Vec, @@ -74,6 +82,21 @@ struct ReplayStatusResponse { elapsed_seconds: f64, } +#[derive(Debug, Deserialize)] +struct QueryOpsStatusResponse { + total_registered_queries: usize, + active_runtime_queries: usize, + running_queries: usize, +} + +#[derive(Debug, Deserialize)] +struct OpsStatusResponse { + status: String, + message: String, + replay: ReplayStatusResponse, + queries: QueryOpsStatusResponse, +} + #[derive(Debug, Deserialize)] struct QueryResultMessage { query_id: String, @@ -99,13 +122,35 @@ async fn main() -> Result<(), Box> { println!(" GET {}/health", base_url); let response = client.get(format!("{}/health", base_url)).send().await?; if response.status().is_success() { - let body: SuccessResponse = response.json().await?; + let body: HealthResponse = response.json().await?; println!(" ✓ {}", body.message); + println!(" ✓ Service status: {}", body.status); + println!(" ✓ Storage status: {}", body.storage_status); + if let Some(error) = body.storage_error { + println!(" ! Storage error: {}", error); + } } else { println!(" ✗ Health check failed: {}", response.status()); } println!(); + // 1b. Ops Status + println!("1b. Ops Status"); + println!(" GET {}/ops/status", base_url); + let response = client.get(format!("{}/ops/status", base_url)).send().await?; + if response.status().is_success() { + let body: OpsStatusResponse = response.json().await?; + println!(" ✓ {}", body.message); + println!(" ✓ Overall status: {}", body.status); + println!(" ✓ Replay running: {}", body.replay.is_running); + println!(" ✓ Registered queries: {}", body.queries.total_registered_queries); + println!(" ✓ Active runtime queries: {}", body.queries.active_runtime_queries); + println!(" ✓ Running queries: {}", body.queries.running_queries); + } else { + println!(" ✗ Ops status failed: {}", response.status()); + } + println!(); + // 2. Register a Query println!("2. Register a Query"); println!(" POST {}/api/queries", base_url); diff --git a/janus-dashboard/README.md b/janus-dashboard/README.md deleted file mode 100644 index d817029..0000000 --- a/janus-dashboard/README.md +++ /dev/null @@ -1,43 +0,0 @@ -# Janus Dashboard (Local Demo) - -This directory contains a lightweight local dashboard used for quick backend demos from within the `janus` repository. - -It is not the primary dashboard codebase. - -The maintained dashboard lives in a separate repository: - -- `https://github.com/SolidLabResearch/janus-dashboard` - -## What This Folder Is For - -- quick local testing against `http_server` -- validating the WebSocket result stream manually -- lightweight backend demo flows during engine development - -## What This Folder Is Not For - -- the main frontend product surface -- long-term frontend feature development -- frontend CI ownership for the Janus project as a whole - -## Local Use - -Install dependencies: - -```bash -npm install -``` - -Run checks: - -```bash -npm run check -``` - -Start the dev server: - -```bash -npm run dev -``` - -The local app expects the Janus backend to be running on `http://localhost:8080`. diff --git a/scripts/check_doc_links.sh b/scripts/check_doc_links.sh new file mode 100644 index 0000000..0feb076 --- /dev/null +++ b/scripts/check_doc_links.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +set -euo pipefail + +cd "$(dirname "$0")/.." + +failures=0 + +while IFS= read -r file; do + while IFS= read -r raw_link; do + link="${raw_link#<}" + link="${link%>}" + link="${link%%#*}" + link="${link%% *}" + + if [[ -z "$link" ]]; then + continue + fi + + if [[ "$link" == http://* ]] || + [[ "$link" == https://* ]] || + [[ "$link" == mailto:* ]] || + [[ "$link" == \#* ]] || + [[ "$link" == app://* ]] || + [[ "$link" == plugin://* ]]; then + continue + fi + + target="$(cd "$(dirname "$file")" && pwd)/$link" + if [[ ! -e "$target" ]]; then + echo "Broken link in $file -> $raw_link" + failures=1 + fi + done < <(perl -ne 'while (/\[[^][]+\]\(([^)]+)\)/g) { print "$1\n"; }' "$file") +done < <(find . -path './target' -prune -o -name '*.md' -print) + +if [[ "$failures" -ne 0 ]]; then + echo "Documentation link check failed." + exit 1 +fi + +echo "Documentation link check passed." diff --git a/scripts/ci-check.sh b/scripts/ci-check.sh index fefbcea..e93d3cd 100755 --- a/scripts/ci-check.sh +++ b/scripts/ci-check.sh @@ -34,4 +34,10 @@ cargo build --all-targets echo "Build successful!" echo "" +# Check 5: Docs link check +echo "Checking markdown links..." +bash ./scripts/check_doc_links.sh +echo "Documentation link check passed!" +echo "" + echo "All CI/CD checks passed! Safe to push." diff --git a/scripts/start_http_server.sh b/scripts/start_http_server.sh index 6ffe66e..b605f0c 100755 --- a/scripts/start_http_server.sh +++ b/scripts/start_http_server.sh @@ -58,7 +58,7 @@ echo "Storage:" echo " - Background flushing: ENABLED" echo " - Auto-flush every 5 seconds or when batch full" echo "" -echo "Dashboard: Open examples/demo_dashboard.html in your browser" +echo "Frontend: Use the separate janus-dashboard repository if you need a browser UI" echo "" echo "═══════════════════════════════════════════════════════════════" echo "" diff --git a/scripts/test_live_streaming.sh b/scripts/test_live_streaming.sh index a799ecc..84e83be 100755 --- a/scripts/test_live_streaming.sh +++ b/scripts/test_live_streaming.sh @@ -256,7 +256,7 @@ echo -e "${GREEN}✓ All steps completed successfully${NC}" echo "" echo "To view the dashboard:" echo " 1. Start the server: ./scripts/start_http_server.sh" -echo " 2. Open: examples/demo_dashboard.html in your browser" +echo " 2. Use the separate janus-dashboard repository if you need a browser UI" echo " 3. Click 'Start Replay' and wait 10 seconds" echo " 4. Click 'Start Query' to see results" echo "" diff --git a/scripts/test_setup.sh b/scripts/test_setup.sh index 4562738..d20f7ed 100755 --- a/scripts/test_setup.sh +++ b/scripts/test_setup.sh @@ -83,7 +83,7 @@ echo "1. Start the HTTP server in a new terminal:" echo " cargo run --bin http_server" echo "" echo "2. Open the demo dashboard:" -echo " open examples/demo_dashboard.html" +echo " use the separate janus-dashboard repository for a browser UI" echo "" echo "3. Click 'Start Replay' then 'Start Query'" echo "" diff --git a/src/http/server.rs b/src/http/server.rs index babc977..953913e 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -75,6 +75,22 @@ pub struct SuccessResponse { pub message: String, } +/// Response for service health. +#[derive(Debug, Serialize)] +pub struct HealthResponse { + pub status: String, + pub message: String, + pub storage_status: String, + pub storage_error: Option, +} + +/// Detailed storage status for ops surfaces. +#[derive(Debug, Serialize)] +pub struct StorageStatusResponse { + pub status: String, + pub background_flush_error: Option, +} + /// Error response #[derive(Debug, Serialize)] pub struct ErrorResponse { @@ -123,7 +139,7 @@ pub struct MqttConfigDto { } /// Response for replay status -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] pub struct ReplayStatusResponse { pub is_running: bool, pub events_read: u64, @@ -135,6 +151,28 @@ pub struct ReplayStatusResponse { pub elapsed_seconds: f64, } +/// Query lifecycle status summary for ops surfaces. +#[derive(Debug, Serialize)] +pub struct QueryOpsStatusResponse { + pub total_registered_queries: usize, + pub active_runtime_queries: usize, + pub registered_queries: usize, + pub warming_baseline_queries: usize, + pub running_queries: usize, + pub stopped_queries: usize, + pub failed_queries: usize, +} + +/// Rich operational status response. +#[derive(Debug, Serialize)] +pub struct OpsStatusResponse { + pub status: String, + pub message: String, + pub storage: StorageStatusResponse, + pub replay: ReplayStatusResponse, + pub queries: QueryOpsStatusResponse, +} + /// Shared application state pub struct AppState { pub janus_api: Arc, @@ -243,6 +281,7 @@ pub fn create_server_with_state( .route("/api/replay/start", post(start_replay)) .route("/api/replay/stop", post(stop_replay)) .route("/api/replay/status", get(replay_status)) + .route("/ops/status", get(ops_status)) .route("/health", get(health_check)) .layer(cors) .with_state(Arc::clone(&state)); @@ -251,8 +290,61 @@ pub fn create_server_with_state( } /// Health check endpoint -async fn health_check() -> impl IntoResponse { - Json(SuccessResponse { message: "Janus HTTP API is running".to_string() }) +async fn health_check(State(state): State>) -> impl IntoResponse { + let storage = storage_status(&state.storage); + + if let Some(storage_error) = storage.background_flush_error.clone() { + let response = HealthResponse { + status: "degraded".to_string(), + message: "Janus HTTP API is running with storage errors".to_string(), + storage_status: storage.status, + storage_error: Some(storage_error), + }; + return (StatusCode::SERVICE_UNAVAILABLE, Json(response)).into_response(); + } + + ( + StatusCode::OK, + Json(HealthResponse { + status: "ok".to_string(), + message: "Janus HTTP API is running".to_string(), + storage_status: storage.status, + storage_error: None, + }), + ) + .into_response() +} + +/// Operational status endpoint. +async fn ops_status(State(state): State>) -> impl IntoResponse { + let storage = storage_status(&state.storage); + let replay = replay_status_snapshot(&state.replay_state.lock().unwrap()); + let queries = query_ops_status(&state); + + let (status, message) = if storage.background_flush_error.is_some() { + ( + StatusCode::SERVICE_UNAVAILABLE, + "Janus HTTP API is running with degraded storage".to_string(), + ) + } else { + (StatusCode::OK, "Janus HTTP API is running".to_string()) + }; + + ( + status, + Json(OpsStatusResponse { + status: if status == StatusCode::OK { + "ok".to_string() + } else { + "degraded".to_string() + }, + message, + storage, + replay, + queries, + }), + ) + .into_response() } /// POST /api/queries - Register a new query @@ -577,7 +669,10 @@ async fn replay_status( State(state): State>, ) -> Result, ApiError> { let replay_state = state.replay_state.lock().unwrap(); + Ok(Json(replay_status_snapshot(&replay_state))) +} +fn replay_status_snapshot(replay_state: &ReplayState) -> ReplayStatusResponse { let elapsed_seconds = if replay_state.is_running { replay_state.start_time.map_or(0.0, |t| t.elapsed().as_secs_f64()) } else { @@ -596,7 +691,7 @@ async fn replay_status( 0.0 }; - Ok(Json(ReplayStatusResponse { + ReplayStatusResponse { is_running: replay_state.is_running, events_read, events_published, @@ -605,7 +700,53 @@ async fn replay_status( storage_errors, events_per_second, elapsed_seconds, - })) + } +} + +fn storage_status(storage: &StreamingSegmentedStorage) -> StorageStatusResponse { + StorageStatusResponse { + status: if storage.background_flush_error().is_some() { + "error".to_string() + } else { + "ok".to_string() + }, + background_flush_error: storage.background_flush_error(), + } +} + +fn query_ops_status(state: &Arc) -> QueryOpsStatusResponse { + let query_ids = state.registry.list_all(); + let mut registered_queries = 0; + let mut warming_baseline_queries = 0; + let mut running_queries = 0; + let mut stopped_queries = 0; + let mut failed_queries = 0; + + for query_id in &query_ids { + if let Some(metadata) = state.registry.get(query_id) { + match metadata.status.as_str() { + "Registered" => registered_queries += 1, + "WarmingBaseline" => warming_baseline_queries += 1, + "Running" => running_queries += 1, + "Stopped" => stopped_queries += 1, + status if status.starts_with("Failed") => failed_queries += 1, + _ => {} + } + } + } + + let active_runtime_queries = + query_ids.iter().filter(|query_id| state.janus_api.is_running(query_id)).count(); + + QueryOpsStatusResponse { + total_registered_queries: query_ids.len(), + active_runtime_queries, + registered_queries, + warming_baseline_queries, + running_queries, + stopped_queries, + failed_queries, + } } /// Start the HTTP server on the specified address @@ -631,6 +772,7 @@ pub async fn start_server( println!(" POST /api/replay/start - Start stream bus replay"); println!(" POST /api/replay/stop - Stop stream bus replay"); println!(" GET /api/replay/status - Get replay status"); + println!(" GET /ops/status - Detailed operational status"); println!(" GET /health - Health check"); println!(); diff --git a/src/main.rs b/src/main.rs index e793f2f..b583140 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,12 @@ -//! Janus - A hybrid engine for unified Live and Historical RDF Stream Processing +//! Janus package entry point. //! -//! This is the main entry point for the Janus command-line interface. +//! This binary exists to provide a clear top-level entry point when users run +//! `cargo run --bin janus`. The operational binaries remain: +//! +//! - `http_server` for the HTTP/WebSocket API +//! - `stream_bus_cli` for replay and ingestion +use clap::{Parser, Subcommand}; use janus::core::Event; use janus::storage::segmented_storage::StreamingSegmentedStorage; use janus::storage::util::StreamingConfig; @@ -10,15 +15,48 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const SEGMENT_BASE_PATH: &str = "data/rdf_benchmark"; -fn benchmark_segmented_storage_rdf() -> std::io::Result<()> { - // println!("RDF Segmented Storage Benchmark"); - // println!("=================================="); +#[derive(Parser)] +#[command( + name = "janus", + about = "Janus package entry point", + long_about = "Use this binary for package-level help and internal storage benchmarks. For the backend API, run `http_server`. For RDF replay and ingestion, run `stream_bus_cli`." +)] +struct Cli { + #[command(subcommand)] + command: Option, +} + +#[derive(Subcommand)] +enum Command { + /// Show the primary Janus entry points. + Info, + /// Run the RDF segmented storage benchmark. + BenchmarkStorageRdf, + /// Run the event storage benchmark matrix. + BenchmarkStorage, +} + +fn print_overview() { + println!("Janus package entry point"); + println!(); + println!("Primary binaries:"); + println!(" http_server REST and WebSocket API"); + println!(" stream_bus_cli RDF replay and ingestion CLI"); + println!(); + println!("Useful commands:"); + println!(" cargo run --bin http_server -- --host 127.0.0.1 --port 8080 --storage-dir ./data/storage"); + println!(" cargo run --bin stream_bus_cli -- --help"); + println!(" cargo run --example http_client_example"); + println!(); + println!("Benchmark subcommands:"); + println!(" cargo run --bin janus -- benchmark-storage-rdf"); + println!(" cargo run --bin janus -- benchmark-storage"); +} - // Clean up and create directories +fn benchmark_segmented_storage_rdf() -> std::io::Result<()> { let _ = fs::remove_dir_all(SEGMENT_BASE_PATH); fs::create_dir_all(SEGMENT_BASE_PATH)?; - // Configure storage let config = StreamingConfig { max_batch_events: 500_000, max_batch_age_seconds: 1, @@ -31,16 +69,11 @@ fn benchmark_segmented_storage_rdf() -> std::io::Result<()> { let mut storage = StreamingSegmentedStorage::new(config)?; storage.start_background_flushing(); - // Record initial memory - // storage.record_memory("before_writing"); - - // Benchmark writing 1 million RDF events - // println!("\nWriting 1,000,000 RDF events..."); let start_time = Instant::now(); let base_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; for i in 0..1_000_000u64 { - let timestamp = base_timestamp + i; // 1ms intervals + let timestamp = base_timestamp + i; let subject = format!("http://example.org/person/person_{}", i % 10000); let predicate = match i % 10 { 0..=3 => "http://example.org/knows", @@ -57,112 +90,35 @@ fn benchmark_segmented_storage_rdf() -> std::io::Result<()> { let graph = format!("http://example.org/graph/graph_{}", i % 100); storage.write_rdf(timestamp, &subject, predicate, &object, &graph)?; - - if i > 0 && i % 100_000 == 0 { - // println!(" ✓ Written {} events", i); - // storage.record_memory(&format!("after_{}_events", i)); - } } - let write_duration = start_time.elapsed(); - let _write_throughput = 1_000_000.0 / write_duration.as_secs_f64(); - - // println!("\nWrite completed!"); - // println!(" Duration: {:.3} seconds", write_duration.as_secs_f64()); - // println!(" Throughput: {:.0} events/sec", write_throughput); + let _write_duration = start_time.elapsed(); - // Wait a bit for background flushing std::thread::sleep(Duration::from_secs(2)); - // storage.record_memory("after_background_flush"); - - // Benchmark reading different amounts of data - // println!("\nReading Benchmarks"); - // println!("===================="); let read_sizes = vec![100, 1_000, 10_000, 100_000, 1_000_000]; for &size in &read_sizes { - // Query the first 'size' events let query_start_ts = base_timestamp; let query_end_ts = base_timestamp + size as u64; - - // println!("\n📖 Querying {} events...", size); - let start_time = Instant::now(); - let results = storage.query_rdf(query_start_ts, query_end_ts)?; - let query_duration = start_time.elapsed(); - let _read_throughput = results.len() as f64 / query_duration.as_secs_f64(); - - // println!(" Results found: {}", results.len()); - // println!(" Query time: {:.3} ms", query_duration.as_millis()); - // println!(" Read throughput: {:.0} events/sec", read_throughput); - - // Show a sample result for verification - if !results.is_empty() { - let sample = &results[0]; + if let Some(sample) = results.first() { println!( - " Sample result: {} {} {} in {} at {}", + "Sample result: {} {} {} in {} at {}", sample.subject, sample.predicate, sample.object, sample.graph, sample.timestamp ); } } - // Shutdown storage storage.shutdown()?; - - // Print memory statistics - // println!("\nMemory Usage Statistics"); - // println!("=========================="); - // let memory_stats = storage.get_memory_stats(); - // // println!("Peak memory: {}", MemoryTracker::format_bytes(memory_stats.peak_bytes)); - // // println!("Current memory: {}", MemoryTracker::format_bytes(memory_stats.current_bytes)); - // println!( - // "Average memory: {}", - // MemoryTracker::format_bytes(memory_stats.avg_bytes as usize) - // ); - // // println!("Total measurements: {}", memory_stats.total_measurements); - - // Print storage component breakdown - // let component_sizes = storage.get_storage_component_sizes(); - // println!("\n🧩 Storage Component Breakdown"); - // println!("============================="); - // println!( - // "Batch buffer: {}", - // MemoryTracker::format_bytes(component_sizes.batch_buffer_bytes) - // ); - // // println!("Dictionary: {}", MemoryTracker::format_bytes(component_sizes.dictionary_bytes)); - // // println!("Segments count: {}", component_sizes.segments_count); - // println!( - // "Estimated total: {}", - // MemoryTracker::format_bytes(component_sizes.estimated_total_bytes) - // ); - - // if memory_stats.measurements.len() > 1 { - // // println!("\nDetailed measurements:"); - // for measurement in &memory_stats.measurements { - // println!( - // " {}: {}", - // measurement.description, - // MemoryTracker::format_bytes(measurement.memory_bytes) - // ); - // } - // } - - // println!("\nBenchmark completed successfully!"); Ok(()) } fn benchmark_storage_performance() -> std::io::Result<()> { - // println!("=== WAL-Based Segmented Storage Performance Benchmark ===\n"); - let record_counts = vec![100, 1000, 10000, 100000, 1000000]; for &num_records in &record_counts { - // println!("Testing with {} records", num_records); - // println!("──────────────────────────────────────────────────"); - - // Configure storage let config = StreamingConfig { max_batch_events: 250_000, max_batch_age_seconds: 1, @@ -172,15 +128,11 @@ fn benchmark_storage_performance() -> std::io::Result<()> { segment_base_path: format!("./benchmark_data_{}", num_records), }; - // Clean up any existing data let _ = std::fs::remove_dir_all(&config.segment_base_path); let mut storage = StreamingSegmentedStorage::new(config.clone())?; storage.start_background_flushing(); - // Benchmark writes - // println!("Writing {} records...", num_records); - let write_start = Instant::now(); let mut min_timestamp = u64::MAX; let mut max_timestamp = 0u64; @@ -200,80 +152,39 @@ fn benchmark_storage_performance() -> std::io::Result<()> { storage.write(event)?; } - let write_duration = write_start.elapsed(); - let _write_throughput = num_records as f64 / write_duration.as_secs_f64(); - - // println!("Write Performance:"); - // println!(" Duration: {:.3}s", write_duration.as_secs_f64()); - // println!(" Throughput: {:.0} records/sec", write_throughput); - // println!(" Timestamp range: {} to {}", min_timestamp, max_timestamp); - - // Benchmark queries immediately after writing (data is still in WAL) let query_ranges = vec![(0.1, "10% of data"), (0.5, "50% of data"), (1.0, "100% of data")]; - // println!("\nQuery Performance:"); - for (fraction, _description) in query_ranges { - let query_count = 100.min(num_records / 10); // Run 100 queries or 10% of records, whichever is smaller - let mut query_times = Vec::new(); - let mut total_records_read = 0; + let query_count = 100.min(num_records / 10); for q in 0..query_count { - // Use a deterministic but varied offset for queries within the actual data range let timestamp_range = max_timestamp - min_timestamp; let start_offset = (timestamp_range as f64 * fraction * (q as f64 / query_count as f64)) as u64; - let query_window = (timestamp_range as f64 * 0.01).max(100.0) as u64; // 1% of data or 100 records minimum + let query_window = (timestamp_range as f64 * 0.01).max(100.0) as u64; let start_timestamp = min_timestamp + start_offset; let end_timestamp = (start_timestamp + query_window).min(max_timestamp); - let query_start = Instant::now(); - let results = storage.query(start_timestamp, end_timestamp)?; - let query_duration = query_start.elapsed(); - - total_records_read += results.len(); - query_times.push(query_duration.as_secs_f64()); + let _ = storage.query(start_timestamp, end_timestamp)?; } - - let avg_query_time = query_times.iter().sum::() / query_times.len() as f64; - let _queries_per_sec = 1.0 / avg_query_time; - let total_query_time = query_times.iter().sum::(); - let _records_per_sec = if total_query_time > 0.0 { - total_records_read as f64 / total_query_time - } else { - 0.0 - }; - let _avg_records_per_query = total_records_read as f64 / query_count as f64; - let _min_time = query_times.iter().cloned().fold(f64::INFINITY, f64::min); - let _max_time = query_times.iter().cloned().fold(f64::NEG_INFINITY, f64::max); - - // println!(" {} queries ({}):", description, query_count); - // println!(" Avg query time: {:.3}ms", avg_query_time * 1000.0); - // println!(" Query throughput: {:.1} queries/sec", queries_per_sec); - // println!(" Read throughput: {:.0} records/sec", records_per_sec); - // println!(" Avg records per query: {:.1}", avg_records_per_query); - // println!(" Total records read: {}", total_records_read); - // println!(" Min/Max time: {:.3}ms / {:.3}ms", min_time * 1000.0, max_time * 1000.0); } - // Force flush remaining WAL data and shutdown storage.shutdown()?; - println!(); } - // println!("Benchmark completed!"); Ok(()) } fn main() -> std::io::Result<()> { - // Run the new RDF benchmark - benchmark_segmented_storage_rdf()?; + let cli = Cli::parse(); - // println!("\n{}", "=".repeat(50)); - // println!("Running legacy benchmark for comparison..."); - // println!("{}", "=".repeat(50)); - - // Also run the old benchmark for comparison - benchmark_storage_performance() + match cli.command.unwrap_or(Command::Info) { + Command::Info => { + print_overview(); + Ok(()) + } + Command::BenchmarkStorageRdf => benchmark_segmented_storage_rdf(), + Command::BenchmarkStorage => benchmark_storage_performance(), + } } diff --git a/src/storage/segmented_storage.rs b/src/storage/segmented_storage.rs index cb26776..101c3a2 100644 --- a/src/storage/segmented_storage.rs +++ b/src/storage/segmented_storage.rs @@ -24,6 +24,7 @@ pub struct StreamingSegmentedStorage { dictionary: Arc>, flush_handle: Option>, shutdown_signal: Arc>, + background_flush_error: Arc>>, config: StreamingConfig, } @@ -64,6 +65,7 @@ impl StreamingSegmentedStorage { dictionary: Arc::new(RwLock::new(dictionary)), flush_handle: None, shutdown_signal: Arc::new(Mutex::new(false)), + background_flush_error: Arc::new(Mutex::new(None)), config, }; storage.load_existing_segments()?; @@ -75,6 +77,7 @@ impl StreamingSegmentedStorage { let batch_buffer_clone = Arc::clone(&self.batch_buffer); let segments_clone = Arc::clone(&self.segments); let shutdown_clone = Arc::clone(&self.shutdown_signal); + let background_error_clone = Arc::clone(&self.background_flush_error); let config_clone = self.config.clone(); let dictionary_clone = Arc::clone(&self.dictionary); @@ -83,6 +86,7 @@ impl StreamingSegmentedStorage { batch_buffer_clone, segments_clone, shutdown_clone, + background_error_clone, config_clone, dictionary_clone, ); @@ -96,8 +100,22 @@ impl StreamingSegmentedStorage { &self.dictionary } + /// Return the most recent background flush error, if one has occurred. + pub fn background_flush_error(&self) -> Option { + self.background_flush_error.lock().unwrap().clone() + } + + fn ensure_background_flush_healthy(&self) -> std::io::Result<()> { + let background_error = self.background_flush_error.lock().unwrap(); + if let Some(message) = background_error.as_ref() { + return Err(std::io::Error::other(message.clone())); + } + Ok(()) + } + // Write an event into the storage system pub fn write(&self, event: Event) -> std::io::Result<()> { + self.ensure_background_flush_healthy()?; let event_size = std::mem::size_of::(); { @@ -147,6 +165,7 @@ impl StreamingSegmentedStorage { /// Force flush the current batch buffer to disk /// This is useful when you need to ensure data is persisted immediately pub fn flush(&self) -> std::io::Result<()> { + self.ensure_background_flush_healthy()?; self.flush_batch_buffer_to_segment()?; self.save_dictionary()?; Ok(()) @@ -287,6 +306,7 @@ impl StreamingSegmentedStorage { // Query events within a timestamp range from the storage system but result in encoded Events and not RDFEvents. pub fn query(&self, start_timestamp: u64, end_timestamp: u64) -> std::io::Result> { + self.ensure_background_flush_healthy()?; let mut results = Vec::new(); // First try to query the immediate batch buffer which has the fastest visibility. @@ -325,6 +345,7 @@ impl StreamingSegmentedStorage { start_timestamp: u64, end_timestamp: u64, ) -> std::io::Result> { + self.ensure_background_flush_healthy()?; let encoded_events = self.query(start_timestamp, end_timestamp)?; let dict = self.dictionary.read().unwrap(); Ok(encoded_events.into_iter().map(|event| event.decode(&dict)).collect()) @@ -458,6 +479,7 @@ impl StreamingSegmentedStorage { batch_buffer: Arc>, segments: Arc>>, shutdown_signal: Arc>, + background_flush_error: Arc>>, config: StreamingConfig, dictionary: Arc>, ) { @@ -479,14 +501,16 @@ impl StreamingSegmentedStorage { }; if should_flush { - // TODO : Add better error handling here in this case if let Err(e) = Self::flush_background( batch_buffer.clone(), segments.clone(), config.clone(), dictionary.clone(), ) { - eprintln!("Background flush failed: {}", e); + let message = format!("Background flush failed: {}", e); + eprintln!("{}", message); + *background_flush_error.lock().unwrap() = Some(message); + break; } } } @@ -516,90 +540,120 @@ impl StreamingSegmentedStorage { events }; - // Create a new segment for these events - let segment_id = Self::current_timestamp(); - let data_path = format!("{}/segment-{}.log", config.segment_base_path, segment_id); - let index_path = format!("{}/segment-{}.idx", config.segment_base_path, segment_id); + let flush_result = (|| -> std::io::Result<()> { + let segment_id = Self::current_timestamp(); + let data_path = format!("{}/segment-{}.log", config.segment_base_path, segment_id); + let index_path = format!("{}/segment-{}.idx", config.segment_base_path, segment_id); - // Use buffered writers for performance (same as original implementation) - let mut data_file = BufWriter::new(std::fs::File::create(&data_path)?); - let mut index_file = BufWriter::new(std::fs::File::create(&index_path)?); + let mut data_file = BufWriter::new(std::fs::File::create(&data_path)?); + let mut index_file = BufWriter::new(std::fs::File::create(&index_path)?); - let mut index_directory = Vec::new(); - let mut current_block_entries = Vec::new(); - let mut current_block_min_ts = None; - let mut current_block_max_ts = 0u64; - let mut data_offset = 0u64; + let mut index_directory = Vec::new(); + let mut current_block_entries = Vec::new(); + let mut current_block_min_ts = None; + let mut current_block_max_ts = 0u64; + let mut data_offset = 0u64; - for (record_count, event) in events_to_flush.iter().enumerate() { - // Use the same serialization as the original - let record_bytes = Self::serialize_event_to_fixed_size_static(event); - data_file.write_all(&record_bytes)?; + for (record_count, event) in events_to_flush.iter().enumerate() { + let record_bytes = Self::serialize_event_to_fixed_size_static(event); + data_file.write_all(&record_bytes)?; - if record_count % config.sparse_interval == 0 { - let sparse_entry = (event.timestamp, data_offset); + if record_count % config.sparse_interval == 0 { + let sparse_entry = (event.timestamp, data_offset); - if current_block_min_ts.is_none() { - current_block_min_ts = Some(event.timestamp); - } + if current_block_min_ts.is_none() { + current_block_min_ts = Some(event.timestamp); + } - current_block_max_ts = event.timestamp; - current_block_entries.push(sparse_entry); + current_block_max_ts = event.timestamp; + current_block_entries.push(sparse_entry); - if current_block_entries.len() >= config.entries_per_index_block { - let block_metadata = Self::flush_index_block_static( - &mut index_file, - ¤t_block_entries, - current_block_min_ts.unwrap(), - current_block_max_ts, - )?; + if current_block_entries.len() >= config.entries_per_index_block { + let block_metadata = Self::flush_index_block_static( + &mut index_file, + ¤t_block_entries, + current_block_min_ts.unwrap(), + current_block_max_ts, + )?; - index_directory.push(block_metadata); + index_directory.push(block_metadata); - current_block_entries.clear(); - current_block_min_ts = None; + current_block_entries.clear(); + current_block_min_ts = None; + } } + data_offset += record_bytes.len() as u64; } - data_offset += record_bytes.len() as u64; - } - if !current_block_entries.is_empty() { - let block_metadata = Self::flush_index_block_static( - &mut index_file, - ¤t_block_entries, - current_block_min_ts.unwrap(), - current_block_max_ts, - )?; + if !current_block_entries.is_empty() { + let block_metadata = Self::flush_index_block_static( + &mut index_file, + ¤t_block_entries, + current_block_min_ts.unwrap(), + current_block_max_ts, + )?; - index_directory.push(block_metadata); + index_directory.push(block_metadata); + } + + data_file.flush()?; + index_file.flush()?; + + let new_segment = EnhancedSegmentMetadata { + start_timstamp: events_to_flush.first().unwrap().timestamp, + end_timestamp: events_to_flush.last().unwrap().timestamp, + data_path, + index_path, + record_count: events_to_flush.len() as u64, + index_directory, + }; + + { + let mut segments = segments.write().unwrap(); + segments.push(new_segment); + segments.sort_by_key(|s| s.start_timstamp); + } + + let dict_path = std::path::Path::new(&config.segment_base_path).join("dictionary.bin"); + let dict = dictionary.read().unwrap(); + dict.save_to_file(&dict_path)?; + + Ok(()) + })(); + + if let Err(err) = flush_result { + Self::restore_failed_background_flush(&batch_buffer, &events_to_flush); + return Err(err); } - data_file.flush()?; - index_file.flush()?; + Ok(()) + } - // Add the new segment to the segments list - let new_segment = EnhancedSegmentMetadata { - start_timstamp: events_to_flush.first().unwrap().timestamp, - end_timestamp: events_to_flush.last().unwrap().timestamp, - data_path, - index_path, - record_count: events_to_flush.len() as u64, - index_directory, - }; + fn restore_failed_background_flush(batch_buffer: &Arc>, events: &[Event]) { + if events.is_empty() { + return; + } - { - let mut segments = segments.write().unwrap(); - segments.push(new_segment); - // Keep segments sorted by start timestamp - segments.sort_by_key(|s| s.start_timstamp); + let mut buffer = batch_buffer.write().unwrap(); + for event in events.iter().rev().cloned() { + buffer.events.push_front(event); + buffer.total_bytes += std::mem::size_of::(); } - // Save dictionary after flush - let dict_path = std::path::Path::new(&config.segment_base_path).join("dictionary.bin"); - let dict = dictionary.read().unwrap(); - dict.save_to_file(&dict_path)?; + let restored_oldest = events.first().map(|event| event.timestamp); + let restored_newest = events.last().map(|event| event.timestamp); - Ok(()) + buffer.oldest_timestamp_bound = match (buffer.oldest_timestamp_bound, restored_oldest) { + (Some(existing), Some(restored)) => Some(existing.min(restored)), + (None, restored) => restored, + (existing, None) => existing, + }; + + buffer.newest_timestamp_bound = match (buffer.newest_timestamp_bound, restored_newest) { + (Some(existing), Some(restored)) => Some(existing.max(restored)), + (None, restored) => restored, + (existing, None) => existing, + }; } fn load_existing_segments(&self) -> std::io::Result<()> { @@ -733,15 +787,23 @@ impl StreamingSegmentedStorage { // Shutdown the storage system gracefully, ensuring all data is flushed to disk. pub fn shutdown(&mut self) -> std::io::Result<()> { + let background_error = self.ensure_background_flush_healthy().err(); *self.shutdown_signal.lock().unwrap() = true; // Final Flush - self.flush_batch_buffer_to_segment()?; + if background_error.is_none() { + self.flush_batch_buffer_to_segment()?; + } if let Some(handle) = self.flush_handle.take() { handle.join().unwrap(); } + + if let Some(err) = background_error { + return Err(err); + } + Ok(()) } diff --git a/tests/http_server_integration_test.rs b/tests/http_server_integration_test.rs index e27ea30..2001ba1 100644 --- a/tests/http_server_integration_test.rs +++ b/tests/http_server_integration_test.rs @@ -8,7 +8,7 @@ use janus::{ }; use reqwest::Client; use serde_json::{json, Value}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, fs, path::PathBuf, sync::Arc}; use tempfile::TempDir; use tokio::{ net::TcpListener, @@ -23,6 +23,7 @@ struct TestServer { ws_base_url: String, client: Client, state: Arc, + storage_dir: PathBuf, _temp_dir: TempDir, server_task: JoinHandle<()>, } @@ -52,8 +53,9 @@ fn historical_query(query_id: &str) -> Value { async fn spawn_test_server() -> TestServer { let temp_dir = TempDir::new().expect("failed to create temp dir"); + let storage_dir = temp_dir.path().to_path_buf(); let mut storage = StreamingSegmentedStorage::new(StreamingConfig { - segment_base_path: temp_dir.path().to_string_lossy().into_owned(), + segment_base_path: storage_dir.to_string_lossy().into_owned(), max_batch_events: 10, max_batch_age_seconds: 60, max_batch_bytes: 1024 * 1024, @@ -98,6 +100,7 @@ async fn spawn_test_server() -> TestServer { ws_base_url: format!("ws://{}", addr), client: Client::new(), state, + storage_dir, _temp_dir: temp_dir, server_task, } @@ -116,7 +119,125 @@ async fn test_health_endpoint() { assert!(response.status().is_success()); let body: Value = response.json().await.expect("invalid health response"); + assert_eq!(body["status"], "ok"); assert_eq!(body["message"], "Janus HTTP API is running"); + assert_eq!(body["storage_status"], "ok"); + assert_eq!(body["storage_error"], Value::Null); +} + +#[tokio::test] +async fn test_ops_status_endpoint_reports_runtime_counts() { + let server = spawn_test_server().await; + + let register_response = server + .client + .post(format!("{}/api/queries", server.base_url)) + .json(&historical_query("ops_status")) + .send() + .await + .expect("register request failed"); + assert!(register_response.status().is_success()); + + let start_response = server + .client + .post(format!("{}/api/queries/ops_status/start", server.base_url)) + .send() + .await + .expect("start request failed"); + assert!(start_response.status().is_success()); + + let response = server + .client + .get(format!("{}/ops/status", server.base_url)) + .send() + .await + .expect("ops status request failed"); + + assert!(response.status().is_success()); + let body: Value = response.json().await.expect("invalid ops status response"); + assert_eq!(body["status"], "ok"); + assert_eq!(body["storage"]["status"], "ok"); + assert_eq!(body["replay"]["is_running"], false); + assert_eq!(body["queries"]["total_registered_queries"], 1); + assert_eq!(body["queries"]["active_runtime_queries"], 1); + assert_eq!(body["queries"]["running_queries"], 1); +} + +#[tokio::test] +async fn test_health_endpoint_reports_storage_degradation() { + let server = spawn_test_server().await; + + fs::remove_dir_all(&server.storage_dir).expect("failed to remove storage directory"); + for timestamp in 2_000..2_010 { + server + .state + .storage + .write_rdf( + timestamp, + "http://example.org/sensor2", + "http://example.org/temperature", + "22", + "http://example.org/sensors", + ) + .expect("initial writes should succeed before background failure is observed"); + } + + sleep(Duration::from_millis(250)).await; + + let response = server + .client + .get(format!("{}/health", server.base_url)) + .send() + .await + .expect("health request failed"); + + assert_eq!(response.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE); + let body: Value = response.json().await.expect("invalid health response"); + assert_eq!(body["status"], "degraded"); + assert_eq!(body["storage_status"], "error"); + assert!(body["storage_error"] + .as_str() + .expect("storage error should be present") + .contains("Background flush failed")); +} + +#[tokio::test] +async fn test_ops_status_endpoint_reports_storage_degradation() { + let server = spawn_test_server().await; + + fs::remove_dir_all(&server.storage_dir).expect("failed to remove storage directory"); + for timestamp in 2_000..2_010 { + server + .state + .storage + .write_rdf( + timestamp, + "http://example.org/sensor2", + "http://example.org/temperature", + "22", + "http://example.org/sensors", + ) + .expect("initial writes should succeed before background failure is observed"); + } + + sleep(Duration::from_millis(250)).await; + + let response = server + .client + .get(format!("{}/ops/status", server.base_url)) + .send() + .await + .expect("ops status request failed"); + + assert_eq!(response.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE); + let body: Value = response.json().await.expect("invalid ops status response"); + assert_eq!(body["status"], "degraded"); + assert_eq!(body["storage"]["status"], "error"); + assert_eq!(body["queries"]["total_registered_queries"], 0); + assert!(body["storage"]["background_flush_error"] + .as_str() + .expect("storage error should be present") + .contains("Background flush failed")); } #[tokio::test] diff --git a/tests/janus_cli_test.rs b/tests/janus_cli_test.rs new file mode 100644 index 0000000..8b44a0b --- /dev/null +++ b/tests/janus_cli_test.rs @@ -0,0 +1,49 @@ +use std::path::Path; +use std::process::Command; + +fn get_janus_binary() -> String { + if let Ok(path) = std::env::var("CARGO_BIN_EXE_janus") { + return path; + } + + let bin_name = if cfg!(windows) { "janus.exe" } else { "janus" }; + let candidates = [ + format!("target/debug/{bin_name}"), + format!("target/release/{bin_name}"), + format!("target/llvm-cov-target/debug/{bin_name}"), + format!("target/llvm-cov-target/release/{bin_name}"), + ]; + + for candidate in candidates { + if Path::new(&candidate).exists() { + return candidate; + } + } + + panic!("Could not find janus binary in expected target locations"); +} + +#[test] +fn test_janus_help_lists_primary_entry_points() { + let output = Command::new(get_janus_binary()) + .arg("--help") + .output() + .expect("failed to run janus --help"); + + assert!(output.status.success()); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!(stdout.contains("package-level help")); + assert!(stdout.contains("benchmark-storage-rdf")); + assert!(stdout.contains("benchmark-storage")); +} + +#[test] +fn test_janus_default_output_points_to_real_binaries() { + let output = Command::new(get_janus_binary()).output().expect("failed to run janus"); + + assert!(output.status.success()); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!(stdout.contains("http_server")); + assert!(stdout.contains("stream_bus_cli")); + assert!(stdout.contains("http_client_example")); +} diff --git a/tests/segmented_storage_error_test.rs b/tests/segmented_storage_error_test.rs new file mode 100644 index 0000000..bbde812 --- /dev/null +++ b/tests/segmented_storage_error_test.rs @@ -0,0 +1,52 @@ +use janus::storage::segmented_storage::StreamingSegmentedStorage; +use janus::storage::util::StreamingConfig; +use std::fs; +use std::thread; +use std::time::Duration; +use tempfile::TempDir; + +#[test] +fn test_background_flush_failure_surfaces_as_storage_error() { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + let storage_dir = temp_dir.path().join("storage"); + + let mut storage = StreamingSegmentedStorage::new(StreamingConfig { + segment_base_path: storage_dir.to_string_lossy().into_owned(), + max_batch_events: 1, + max_batch_age_seconds: 60, + max_batch_bytes: 1024 * 1024, + sparse_interval: 10, + entries_per_index_block: 100, + }) + .expect("failed to create storage"); + + storage.start_background_flushing(); + + fs::remove_dir_all(&storage_dir).expect("failed to remove storage directory"); + + storage + .write_rdf( + 1_000, + "http://example.org/sensor1", + "http://example.org/temperature", + "21", + "http://example.org/graph1", + ) + .expect("initial write should succeed before background failure is observed"); + + thread::sleep(Duration::from_millis(250)); + + let err = storage + .query(0, 2_000) + .expect_err("query should surface the background flush failure"); + + assert!(err.to_string().contains("Background flush failed"), "unexpected error: {err}"); + + let shutdown_err = storage + .shutdown() + .expect_err("shutdown should also surface the background flush failure"); + assert!( + shutdown_err.to_string().contains("Background flush failed"), + "unexpected shutdown error: {shutdown_err}" + ); +}