Skip to content

Commit 7d2852f

Browse files
committed
graph: Implement FileDrain for writing subgraph logs to file system
- JSON Lines format (one JSON object per line) - One file per subgraph: {directory}/{subgraphId}.jsonl - Structured log entries with metadata (module, line, column) - Configurable directory, max file size, and retention days - Unit tests for file creation, format validation, and filtering # Conflicts: # Cargo.lock
1 parent 45b833a commit 7d2852f

3 files changed

Lines changed: 346 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

graph/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ clap.workspace = true
108108
maplit = "1.0.2"
109109
hex-literal = "1.1"
110110
wiremock = "0.6.5"
111+
tempfile = "3.8"
111112

112113
[build-dependencies]
113114
tonic-build = { workspace = true }

graph/src/log/file.rs

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
use std::fmt;
2+
use std::fmt::Write as FmtWrite;
3+
use std::fs::{File, OpenOptions};
4+
use std::io::{BufWriter, Write};
5+
use std::path::PathBuf;
6+
use std::sync::{Arc, Mutex};
7+
8+
use chrono::prelude::{SecondsFormat, Utc};
9+
use serde::Serialize;
10+
use slog::*;
11+
12+
/// Configuration for `FileDrain`.
13+
#[derive(Clone, Debug)]
14+
pub struct FileDrainConfig {
15+
/// Directory where log files will be stored
16+
pub directory: PathBuf,
17+
/// The subgraph ID (used for filename)
18+
pub subgraph_id: String,
19+
/// Maximum file size in bytes (for future rotation feature)
20+
pub max_file_size: u64,
21+
/// Retention period in days (for future cleanup feature)
22+
pub retention_days: u32,
23+
}
24+
25+
/// Log document structure for JSON Lines format
26+
#[derive(Clone, Debug, Serialize)]
27+
#[serde(rename_all = "camelCase")]
28+
struct FileLogDocument {
29+
id: String,
30+
subgraph_id: String,
31+
timestamp: String,
32+
level: String,
33+
text: String,
34+
arguments: Vec<(String, String)>,
35+
meta: FileLogMeta,
36+
}
37+
38+
#[derive(Clone, Debug, Serialize)]
39+
#[serde(rename_all = "camelCase")]
40+
struct FileLogMeta {
41+
module: String,
42+
line: i64,
43+
column: i64,
44+
}
45+
46+
/// Serializer for extracting key-value pairs into a Vec
47+
struct VecKVSerializer {
48+
kvs: Vec<(String, String)>,
49+
}
50+
51+
impl VecKVSerializer {
52+
fn new() -> Self {
53+
VecKVSerializer {
54+
kvs: Default::default(),
55+
}
56+
}
57+
58+
fn finish(self) -> Vec<(String, String)> {
59+
self.kvs
60+
}
61+
}
62+
63+
impl Serializer for VecKVSerializer {
64+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
65+
self.kvs.push((key.into(), format!("{}", val)));
66+
Ok(())
67+
}
68+
}
69+
70+
/// Serializer for concatenating key-value arguments into a string
71+
struct SimpleKVSerializer {
72+
kvs: Vec<(String, String)>,
73+
}
74+
75+
impl SimpleKVSerializer {
76+
fn new() -> Self {
77+
SimpleKVSerializer {
78+
kvs: Default::default(),
79+
}
80+
}
81+
82+
fn finish(self) -> (usize, String) {
83+
(
84+
self.kvs.len(),
85+
self.kvs
86+
.iter()
87+
.map(|(k, v)| format!("{}: {}", k, v))
88+
.collect::<Vec<_>>()
89+
.join(", "),
90+
)
91+
}
92+
}
93+
94+
impl Serializer for SimpleKVSerializer {
95+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
96+
self.kvs.push((key.into(), format!("{}", val)));
97+
Ok(())
98+
}
99+
}
100+
101+
/// An slog `Drain` for logging to local files in JSON Lines format.
102+
///
103+
/// Each subgraph gets its own .jsonl file with log entries.
104+
/// Format: One JSON object per line
105+
/// ```jsonl
106+
/// {"id":"QmXxx-2024-01-15T10:30:00Z","subgraphId":"QmXxx","timestamp":"2024-01-15T10:30:00Z","level":"error","text":"Error message","arguments":[],"meta":{"module":"test.rs","line":42,"column":10}}
107+
/// ```
108+
pub struct FileDrain {
109+
config: FileDrainConfig,
110+
error_logger: Logger,
111+
writer: Arc<Mutex<BufWriter<File>>>,
112+
}
113+
114+
impl FileDrain {
115+
/// Creates a new `FileDrain`.
116+
pub fn new(config: FileDrainConfig, error_logger: Logger) -> std::io::Result<Self> {
117+
// Create directory if it doesn't exist
118+
std::fs::create_dir_all(&config.directory)?;
119+
120+
// Open file for subgraph (append mode)
121+
let path = config
122+
.directory
123+
.join(format!("{}.jsonl", config.subgraph_id));
124+
let file = OpenOptions::new().create(true).append(true).open(path)?;
125+
126+
Ok(FileDrain {
127+
config,
128+
error_logger,
129+
writer: Arc::new(Mutex::new(BufWriter::new(file))),
130+
})
131+
}
132+
}
133+
134+
impl Drain for FileDrain {
135+
type Ok = ();
136+
type Err = ();
137+
138+
fn log(&self, record: &Record, values: &OwnedKVList) -> std::result::Result<(), ()> {
139+
// Don't write `trace` logs to file
140+
if record.level() == Level::Trace {
141+
return Ok(());
142+
}
143+
144+
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true);
145+
let id = format!("{}-{}", self.config.subgraph_id, timestamp);
146+
147+
// Get log level as string
148+
let level = match record.level() {
149+
Level::Critical => "critical",
150+
Level::Error => "error",
151+
Level::Warning => "warning",
152+
Level::Info => "info",
153+
Level::Debug => "debug",
154+
Level::Trace => "trace",
155+
};
156+
157+
// Serialize logger arguments
158+
let mut serializer = SimpleKVSerializer::new();
159+
record
160+
.kv()
161+
.serialize(record, &mut serializer)
162+
.expect("failed to serialize logger arguments");
163+
let (n_logger_kvs, logger_kvs) = serializer.finish();
164+
165+
// Serialize log message arguments
166+
let mut serializer = SimpleKVSerializer::new();
167+
values
168+
.serialize(record, &mut serializer)
169+
.expect("failed to serialize log message arguments");
170+
let (n_value_kvs, value_kvs) = serializer.finish();
171+
172+
// Serialize arguments into vec for storage
173+
let mut serializer = VecKVSerializer::new();
174+
record
175+
.kv()
176+
.serialize(record, &mut serializer)
177+
.expect("failed to serialize log message arguments into vec");
178+
let arguments = serializer.finish();
179+
180+
// Build text with all key-value pairs
181+
let mut text = format!("{}", record.msg());
182+
if n_logger_kvs > 0 {
183+
write!(text, ", {}", logger_kvs).unwrap();
184+
}
185+
if n_value_kvs > 0 {
186+
write!(text, ", {}", value_kvs).unwrap();
187+
}
188+
189+
// Build log document
190+
let log_doc = FileLogDocument {
191+
id,
192+
subgraph_id: self.config.subgraph_id.clone(),
193+
timestamp,
194+
level: level.to_string(),
195+
text,
196+
arguments,
197+
meta: FileLogMeta {
198+
module: record.module().into(),
199+
line: record.line() as i64,
200+
column: record.column() as i64,
201+
},
202+
};
203+
204+
// Write JSON line (synchronous, buffered)
205+
let mut writer = self.writer.lock().unwrap();
206+
if let Err(e) = serde_json::to_writer(&mut *writer, &log_doc) {
207+
error!(self.error_logger, "Failed to serialize log to JSON: {}", e);
208+
return Ok(());
209+
}
210+
211+
if let Err(e) = writeln!(&mut *writer) {
212+
error!(self.error_logger, "Failed to write newline: {}", e);
213+
return Ok(());
214+
}
215+
216+
// Flush to ensure durability
217+
if let Err(e) = writer.flush() {
218+
error!(self.error_logger, "Failed to flush log file: {}", e);
219+
}
220+
221+
Ok(())
222+
}
223+
}
224+
225+
/// Creates a new asynchronous file logger.
226+
///
227+
/// Uses `error_logger` to print any file logging errors,
228+
/// so they don't go unnoticed.
229+
pub fn file_logger(config: FileDrainConfig, error_logger: Logger) -> Logger {
230+
let file_drain = match FileDrain::new(config, error_logger.clone()) {
231+
Ok(drain) => drain,
232+
Err(e) => {
233+
error!(error_logger, "Failed to create FileDrain: {}", e);
234+
// Return a logger that discards all logs
235+
return Logger::root(slog::Discard, o!());
236+
}
237+
};
238+
239+
let async_drain = slog_async::Async::new(file_drain.fuse())
240+
.chan_size(20000)
241+
.overflow_strategy(slog_async::OverflowStrategy::Block)
242+
.build()
243+
.fuse();
244+
Logger::root(async_drain, o!())
245+
}
246+
247+
#[cfg(test)]
248+
mod tests {
249+
use super::*;
250+
use tempfile::TempDir;
251+
252+
#[test]
253+
fn test_file_drain_creation() {
254+
let temp_dir = TempDir::new().unwrap();
255+
let error_logger = Logger::root(slog::Discard, o!());
256+
257+
let config = FileDrainConfig {
258+
directory: temp_dir.path().to_path_buf(),
259+
subgraph_id: "QmTest".to_string(),
260+
max_file_size: 1024 * 1024,
261+
retention_days: 30,
262+
};
263+
264+
let drain = FileDrain::new(config, error_logger);
265+
assert!(drain.is_ok());
266+
267+
// Verify file was created
268+
let file_path = temp_dir.path().join("QmTest.jsonl");
269+
assert!(file_path.exists());
270+
}
271+
272+
#[test]
273+
fn test_log_entry_format() {
274+
let arguments = vec![
275+
("key1".to_string(), "value1".to_string()),
276+
("key2".to_string(), "value2".to_string()),
277+
];
278+
279+
let doc = FileLogDocument {
280+
id: "test-id".to_string(),
281+
subgraph_id: "QmTest".to_string(),
282+
timestamp: "2024-01-15T10:30:00Z".to_string(),
283+
level: "error".to_string(),
284+
text: "Test error message".to_string(),
285+
arguments,
286+
meta: FileLogMeta {
287+
module: "test.rs".to_string(),
288+
line: 42,
289+
column: 10,
290+
},
291+
};
292+
293+
let json = serde_json::to_string(&doc).unwrap();
294+
assert!(json.contains("\"id\":\"test-id\""));
295+
assert!(json.contains("\"subgraphId\":\"QmTest\""));
296+
assert!(json.contains("\"level\":\"error\""));
297+
assert!(json.contains("\"text\":\"Test error message\""));
298+
assert!(json.contains("\"arguments\""));
299+
}
300+
301+
#[test]
302+
fn test_file_drain_writes_jsonl() {
303+
use std::io::{BufRead, BufReader};
304+
305+
let temp_dir = TempDir::new().unwrap();
306+
let error_logger = Logger::root(slog::Discard, o!());
307+
308+
let config = FileDrainConfig {
309+
directory: temp_dir.path().to_path_buf(),
310+
subgraph_id: "QmTest".to_string(),
311+
max_file_size: 1024 * 1024,
312+
retention_days: 30,
313+
};
314+
315+
let drain = FileDrain::new(config.clone(), error_logger).unwrap();
316+
317+
// Create a test record
318+
let logger = Logger::root(drain, o!());
319+
info!(logger, "Test message"; "key" => "value");
320+
321+
// Give async drain time to write (in real test we'd use proper sync)
322+
std::thread::sleep(std::time::Duration::from_millis(100));
323+
324+
// Read the file
325+
let file_path = temp_dir.path().join("QmTest.jsonl");
326+
let file = File::open(file_path).unwrap();
327+
let reader = BufReader::new(file);
328+
329+
let lines: Vec<String> = reader.lines().filter_map(Result::ok).collect();
330+
331+
// Should have written at least one line
332+
assert!(lines.len() > 0);
333+
334+
// Each line should be valid JSON
335+
for line in lines {
336+
let parsed: serde_json::Value = serde_json::from_str(&line).unwrap();
337+
assert!(parsed.get("id").is_some());
338+
assert!(parsed.get("subgraphId").is_some());
339+
assert!(parsed.get("timestamp").is_some());
340+
assert!(parsed.get("level").is_some());
341+
assert!(parsed.get("text").is_some());
342+
}
343+
}
344+
}

0 commit comments

Comments
 (0)