Skip to content

Commit 15518c4

Browse files
committed
graph: Add FileLogStore and LokiLogStore for querying subgraph logs
- Also add LogStoreFactory for creating log store instances from config
1 parent 8c13b5f commit 15518c4

3 files changed

Lines changed: 747 additions & 1 deletion

File tree

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
use async_trait::async_trait;
2+
use serde::{Deserialize, Serialize};
3+
use std::fs::File;
4+
use std::io::{BufRead, BufReader};
5+
use std::path::PathBuf;
6+
7+
use crate::prelude::DeploymentHash;
8+
9+
use super::{LogEntry, LogLevel, LogMeta, LogQuery, LogStore, LogStoreError};
10+
11+
pub struct FileLogStore {
12+
directory: PathBuf,
13+
max_file_size: u64,
14+
retention_days: u32,
15+
}
16+
17+
impl FileLogStore {
18+
pub fn new(
19+
directory: PathBuf,
20+
max_file_size: u64,
21+
retention_days: u32,
22+
) -> Result<Self, LogStoreError> {
23+
// Create directory if it doesn't exist
24+
std::fs::create_dir_all(&directory)
25+
.map_err(|e| LogStoreError::InitializationFailed(e.into()))?;
26+
27+
Ok(Self {
28+
directory,
29+
max_file_size,
30+
retention_days,
31+
})
32+
}
33+
34+
/// Get log file path for a subgraph
35+
fn log_file_path(&self, subgraph_id: &DeploymentHash) -> PathBuf {
36+
self.directory.join(format!("{}.jsonl", subgraph_id))
37+
}
38+
39+
/// Parse a JSON line into a LogEntry
40+
fn parse_line(&self, line: &str) -> Option<LogEntry> {
41+
let doc: FileLogDocument = serde_json::from_str(line).ok()?;
42+
43+
let level = LogLevel::from_str(&doc.level)?;
44+
let subgraph_id = DeploymentHash::new(&doc.subgraph_id).ok()?;
45+
46+
Some(LogEntry {
47+
id: doc.id,
48+
subgraph_id,
49+
timestamp: doc.timestamp,
50+
level,
51+
text: doc.text,
52+
arguments: doc.arguments,
53+
meta: LogMeta {
54+
module: doc.meta.module,
55+
line: doc.meta.line,
56+
column: doc.meta.column,
57+
},
58+
})
59+
}
60+
61+
/// Check if an entry matches the query filters
62+
fn matches_filters(&self, entry: &LogEntry, query: &LogQuery) -> bool {
63+
// Level filter
64+
if let Some(level) = query.level {
65+
if entry.level != level {
66+
return false;
67+
}
68+
}
69+
70+
// Time range filters
71+
if let Some(ref from) = query.from {
72+
if entry.timestamp < *from {
73+
return false;
74+
}
75+
}
76+
77+
if let Some(ref to) = query.to {
78+
if entry.timestamp > *to {
79+
return false;
80+
}
81+
}
82+
83+
// Text search (case-insensitive)
84+
if let Some(ref text) = query.text {
85+
if !entry.text.to_lowercase().contains(&text.to_lowercase()) {
86+
return false;
87+
}
88+
}
89+
90+
true
91+
}
92+
}
93+
94+
#[async_trait]
95+
impl LogStore for FileLogStore {
96+
async fn query_logs(&self, query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError> {
97+
let file_path = self.log_file_path(&query.subgraph_id);
98+
99+
if !file_path.exists() {
100+
return Ok(vec![]);
101+
}
102+
103+
let file = File::open(&file_path).map_err(|e| LogStoreError::QueryFailed(e.into()))?;
104+
105+
let reader = BufReader::new(file);
106+
let mut entries = Vec::new();
107+
let mut skipped = 0;
108+
109+
// Read all lines and collect matching entries
110+
// Note: For large files, this loads everything into memory
111+
// A production implementation would use reverse iteration or indexing
112+
let all_entries: Vec<LogEntry> = reader
113+
.lines()
114+
.filter_map(|line| line.ok())
115+
.filter_map(|line| self.parse_line(&line))
116+
.collect();
117+
118+
// Sort by timestamp descending (most recent first)
119+
let mut sorted_entries = all_entries;
120+
sorted_entries.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
121+
122+
// Apply filters and pagination
123+
for entry in sorted_entries {
124+
if !self.matches_filters(&entry, &query) {
125+
continue;
126+
}
127+
128+
// Skip the first N entries
129+
if skipped < query.skip {
130+
skipped += 1;
131+
continue;
132+
}
133+
134+
entries.push(entry);
135+
136+
// Stop once we have enough entries
137+
if entries.len() >= query.first as usize {
138+
break;
139+
}
140+
}
141+
142+
Ok(entries)
143+
}
144+
145+
fn is_available(&self) -> bool {
146+
self.directory.exists() && self.directory.is_dir()
147+
}
148+
}
149+
150+
// File log document format (JSON Lines)
151+
#[derive(Debug, Serialize, Deserialize)]
152+
struct FileLogDocument {
153+
id: String,
154+
#[serde(rename = "subgraphId")]
155+
subgraph_id: String,
156+
timestamp: String,
157+
level: String,
158+
text: String,
159+
arguments: Vec<(String, String)>,
160+
meta: FileLogMeta,
161+
}
162+
163+
#[derive(Debug, Serialize, Deserialize)]
164+
struct FileLogMeta {
165+
module: String,
166+
line: i64,
167+
column: i64,
168+
}
169+
170+
#[cfg(test)]
171+
mod tests {
172+
use super::*;
173+
use std::io::Write;
174+
use tempfile::TempDir;
175+
176+
#[test]
177+
fn test_file_log_store_initialization() {
178+
let temp_dir = TempDir::new().unwrap();
179+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30);
180+
assert!(store.is_ok());
181+
182+
let store = store.unwrap();
183+
assert!(store.is_available());
184+
}
185+
186+
#[test]
187+
fn test_log_file_path() {
188+
let temp_dir = TempDir::new().unwrap();
189+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
190+
191+
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
192+
let path = store.log_file_path(&subgraph_id);
193+
194+
assert_eq!(path, temp_dir.path().join("QmTest.jsonl"));
195+
}
196+
197+
#[tokio::test]
198+
async fn test_query_nonexistent_file() {
199+
let temp_dir = TempDir::new().unwrap();
200+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
201+
202+
let query = LogQuery {
203+
subgraph_id: DeploymentHash::new("QmNonexistent").unwrap(),
204+
level: None,
205+
from: None,
206+
to: None,
207+
text: None,
208+
first: 100,
209+
skip: 0,
210+
};
211+
212+
let result = store.query_logs(query).await;
213+
assert!(result.is_ok());
214+
assert_eq!(result.unwrap().len(), 0);
215+
}
216+
217+
#[tokio::test]
218+
async fn test_query_with_sample_data() {
219+
let temp_dir = TempDir::new().unwrap();
220+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
221+
222+
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
223+
let file_path = store.log_file_path(&subgraph_id);
224+
225+
// Write some test data
226+
let mut file = File::create(&file_path).unwrap();
227+
let log_entry = FileLogDocument {
228+
id: "log-1".to_string(),
229+
subgraph_id: "QmTest".to_string(),
230+
timestamp: "2024-01-15T10:30:00Z".to_string(),
231+
level: "error".to_string(),
232+
text: "Test error message".to_string(),
233+
arguments: vec![],
234+
meta: FileLogMeta {
235+
module: "test.ts".to_string(),
236+
line: 42,
237+
column: 10,
238+
},
239+
};
240+
writeln!(file, "{}", serde_json::to_string(&log_entry).unwrap()).unwrap();
241+
242+
// Query
243+
let query = LogQuery {
244+
subgraph_id,
245+
level: None,
246+
from: None,
247+
to: None,
248+
text: None,
249+
first: 100,
250+
skip: 0,
251+
};
252+
253+
let result = store.query_logs(query).await;
254+
assert!(result.is_ok());
255+
256+
let entries = result.unwrap();
257+
assert_eq!(entries.len(), 1);
258+
assert_eq!(entries[0].id, "log-1");
259+
assert_eq!(entries[0].text, "Test error message");
260+
assert_eq!(entries[0].level, LogLevel::Error);
261+
}
262+
263+
#[tokio::test]
264+
async fn test_query_with_level_filter() {
265+
let temp_dir = TempDir::new().unwrap();
266+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
267+
268+
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
269+
let file_path = store.log_file_path(&subgraph_id);
270+
271+
// Write test data with different levels
272+
let mut file = File::create(&file_path).unwrap();
273+
for (id, level) in [("log-1", "error"), ("log-2", "info"), ("log-3", "error")] {
274+
let log_entry = FileLogDocument {
275+
id: id.to_string(),
276+
subgraph_id: "QmTest".to_string(),
277+
timestamp: format!("2024-01-15T10:30:{}Z", id),
278+
level: level.to_string(),
279+
text: format!("Test {} message", level),
280+
arguments: vec![],
281+
meta: FileLogMeta {
282+
module: "test.ts".to_string(),
283+
line: 42,
284+
column: 10,
285+
},
286+
};
287+
writeln!(file, "{}", serde_json::to_string(&log_entry).unwrap()).unwrap();
288+
}
289+
290+
// Query for errors only
291+
let query = LogQuery {
292+
subgraph_id,
293+
level: Some(LogLevel::Error),
294+
from: None,
295+
to: None,
296+
text: None,
297+
first: 100,
298+
skip: 0,
299+
};
300+
301+
let result = store.query_logs(query).await;
302+
assert!(result.is_ok());
303+
304+
let entries = result.unwrap();
305+
assert_eq!(entries.len(), 2);
306+
assert!(entries.iter().all(|e| e.level == LogLevel::Error));
307+
}
308+
}

0 commit comments

Comments
 (0)