Skip to content

Commit d3a9519

Browse files
committed
graph: Add log drains for writing to backends
Implements slog drains for capturing and writing logs: - FileDrain: Writes logs to JSON Lines files (one file per subgraph) - LokiDrain: Writes logs to Grafana Loki via HTTP push API Both drains: - Capture structured log entries with metadata (module, line, column) - Format logs with timestamp, level, text, and arguments - Support all log levels (critical, error, warning, info, debug) - Use efficient serialization with custom KVSerializers Code quality improvements: - Simplified serializers (removed Default::default(), use Vec::new()) - Use val.to_string() instead of format!("{}", val)
1 parent 0f2674a commit d3a9519

3 files changed

Lines changed: 789 additions & 0 deletions

File tree

graph/src/log/file.rs

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
use std::fmt;
2+
use std::fmt::Write as FmtWrite;
3+
use std::fs::{File, OpenOptions};
4+
use std::io::{BufWriter, Write};
5+
use std::path::PathBuf;
6+
use std::sync::{Arc, Mutex};
7+
8+
use chrono::prelude::{SecondsFormat, Utc};
9+
use serde::Serialize;
10+
use slog::*;
11+
12+
/// Configuration for `FileDrain`.
13+
#[derive(Clone, Debug)]
14+
pub struct FileDrainConfig {
15+
/// Directory where log files will be stored
16+
pub directory: PathBuf,
17+
/// The subgraph ID used for the log filename
18+
pub subgraph_id: String,
19+
/// Maximum file size in bytes
20+
pub max_file_size: u64,
21+
/// Retention period in days
22+
pub retention_days: u32,
23+
}
24+
25+
/// Log document structure for JSON Lines format
26+
#[derive(Clone, Debug, Serialize)]
27+
#[serde(rename_all = "camelCase")]
28+
struct FileLogDocument {
29+
id: String,
30+
subgraph_id: String,
31+
timestamp: String,
32+
level: String,
33+
text: String,
34+
arguments: Vec<(String, String)>,
35+
meta: FileLogMeta,
36+
}
37+
38+
#[derive(Clone, Debug, Serialize)]
39+
#[serde(rename_all = "camelCase")]
40+
struct FileLogMeta {
41+
module: String,
42+
line: i64,
43+
column: i64,
44+
}
45+
46+
/// Serializer for extracting key-value pairs into a Vec
47+
struct VecKVSerializer {
48+
kvs: Vec<(String, String)>,
49+
}
50+
51+
impl VecKVSerializer {
52+
fn new() -> Self {
53+
Self { kvs: Vec::new() }
54+
}
55+
56+
fn finish(self) -> Vec<(String, String)> {
57+
self.kvs
58+
}
59+
}
60+
61+
impl Serializer for VecKVSerializer {
62+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
63+
self.kvs.push((key.into(), val.to_string()));
64+
Ok(())
65+
}
66+
}
67+
68+
/// Serializer for concatenating key-value arguments into a string
69+
struct SimpleKVSerializer {
70+
kvs: Vec<(String, String)>,
71+
}
72+
73+
impl SimpleKVSerializer {
74+
fn new() -> Self {
75+
Self { kvs: Vec::new() }
76+
}
77+
78+
fn finish(self) -> (usize, String) {
79+
(
80+
self.kvs.len(),
81+
self.kvs
82+
.iter()
83+
.map(|(k, v)| format!("{}: {}", k, v))
84+
.collect::<Vec<_>>()
85+
.join(", "),
86+
)
87+
}
88+
}
89+
90+
impl Serializer for SimpleKVSerializer {
91+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
92+
self.kvs.push((key.into(), val.to_string()));
93+
Ok(())
94+
}
95+
}
96+
97+
/// An slog `Drain` for logging to local files in JSON Lines format.
98+
///
99+
/// Each subgraph gets its own .jsonl file with log entries.
100+
/// Format: One JSON object per line
101+
/// ```jsonl
102+
/// {"id":"QmXxx-2024-01-15T10:30:00Z","subgraphId":"QmXxx","timestamp":"2024-01-15T10:30:00Z","level":"error","text":"Error message","arguments":[],"meta":{"module":"test.rs","line":42,"column":10}}
103+
/// ```
104+
pub struct FileDrain {
105+
config: FileDrainConfig,
106+
error_logger: Logger,
107+
writer: Arc<Mutex<BufWriter<File>>>,
108+
}
109+
110+
impl FileDrain {
111+
/// Creates a new `FileDrain`.
112+
pub fn new(config: FileDrainConfig, error_logger: Logger) -> std::io::Result<Self> {
113+
// Create directory if it doesn't exist
114+
std::fs::create_dir_all(&config.directory)?;
115+
116+
// Open file for subgraph (append mode)
117+
let path = config
118+
.directory
119+
.join(format!("{}.jsonl", config.subgraph_id));
120+
let file = OpenOptions::new().create(true).append(true).open(path)?;
121+
122+
Ok(FileDrain {
123+
config,
124+
error_logger,
125+
writer: Arc::new(Mutex::new(BufWriter::new(file))),
126+
})
127+
}
128+
}
129+
130+
impl Drain for FileDrain {
131+
type Ok = ();
132+
type Err = Never;
133+
134+
fn log(&self, record: &Record, values: &OwnedKVList) -> std::result::Result<(), Never> {
135+
// Don't write `trace` logs to file
136+
if record.level() == Level::Trace {
137+
return Ok(());
138+
}
139+
140+
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true);
141+
let id = format!("{}-{}", self.config.subgraph_id, timestamp);
142+
143+
// Get log level as string
144+
let level = match record.level() {
145+
Level::Critical => "critical",
146+
Level::Error => "error",
147+
Level::Warning => "warning",
148+
Level::Info => "info",
149+
Level::Debug => "debug",
150+
Level::Trace => "trace",
151+
};
152+
153+
// Serialize logger arguments
154+
let mut serializer = SimpleKVSerializer::new();
155+
record
156+
.kv()
157+
.serialize(record, &mut serializer)
158+
.expect("failed to serialize logger arguments");
159+
let (n_logger_kvs, logger_kvs) = serializer.finish();
160+
161+
// Serialize log message arguments
162+
let mut serializer = SimpleKVSerializer::new();
163+
values
164+
.serialize(record, &mut serializer)
165+
.expect("failed to serialize log message arguments");
166+
let (n_value_kvs, value_kvs) = serializer.finish();
167+
168+
// Serialize arguments into vec for storage
169+
let mut serializer = VecKVSerializer::new();
170+
record
171+
.kv()
172+
.serialize(record, &mut serializer)
173+
.expect("failed to serialize log message arguments into vec");
174+
let arguments = serializer.finish();
175+
176+
// Build text with all key-value pairs
177+
let mut text = format!("{}", record.msg());
178+
if n_logger_kvs > 0 {
179+
write!(text, ", {}", logger_kvs).unwrap();
180+
}
181+
if n_value_kvs > 0 {
182+
write!(text, ", {}", value_kvs).unwrap();
183+
}
184+
185+
// Build log document
186+
let log_doc = FileLogDocument {
187+
id,
188+
subgraph_id: self.config.subgraph_id.clone(),
189+
timestamp,
190+
level: level.to_string(),
191+
text,
192+
arguments,
193+
meta: FileLogMeta {
194+
module: record.module().into(),
195+
line: record.line() as i64,
196+
column: record.column() as i64,
197+
},
198+
};
199+
200+
// Write JSON line (synchronous, buffered)
201+
let mut writer = self.writer.lock().unwrap();
202+
if let Err(e) = serde_json::to_writer(&mut *writer, &log_doc) {
203+
error!(self.error_logger, "Failed to serialize log to JSON: {}", e);
204+
return Ok(());
205+
}
206+
207+
if let Err(e) = writeln!(&mut *writer) {
208+
error!(self.error_logger, "Failed to write newline: {}", e);
209+
return Ok(());
210+
}
211+
212+
// Flush to ensure durability
213+
if let Err(e) = writer.flush() {
214+
error!(self.error_logger, "Failed to flush log file: {}", e);
215+
}
216+
217+
Ok(())
218+
}
219+
}
220+
221+
/// Creates a new asynchronous file logger.
222+
///
223+
/// Uses `error_logger` to print any file logging errors,
224+
/// so they don't go unnoticed.
225+
pub fn file_logger(config: FileDrainConfig, error_logger: Logger) -> Logger {
226+
let file_drain = match FileDrain::new(config, error_logger.clone()) {
227+
Ok(drain) => drain,
228+
Err(e) => {
229+
error!(error_logger, "Failed to create FileDrain: {}", e);
230+
// Return a logger that discards all logs
231+
return Logger::root(slog::Discard, o!());
232+
}
233+
};
234+
235+
let async_drain = slog_async::Async::new(file_drain.fuse())
236+
.chan_size(20000)
237+
.overflow_strategy(slog_async::OverflowStrategy::Block)
238+
.build()
239+
.fuse();
240+
Logger::root(async_drain, o!())
241+
}
242+
243+
#[cfg(test)]
244+
mod tests {
245+
use super::*;
246+
use tempfile::TempDir;
247+
248+
#[test]
249+
fn test_file_drain_creation() {
250+
let temp_dir = TempDir::new().unwrap();
251+
let error_logger = Logger::root(slog::Discard, o!());
252+
253+
let config = FileDrainConfig {
254+
directory: temp_dir.path().to_path_buf(),
255+
subgraph_id: "QmTest".to_string(),
256+
max_file_size: 1024 * 1024,
257+
retention_days: 30,
258+
};
259+
260+
let drain = FileDrain::new(config, error_logger);
261+
assert!(drain.is_ok());
262+
263+
// Verify file was created
264+
let file_path = temp_dir.path().join("QmTest.jsonl");
265+
assert!(file_path.exists());
266+
}
267+
268+
#[test]
269+
fn test_log_entry_format() {
270+
let arguments = vec![
271+
("key1".to_string(), "value1".to_string()),
272+
("key2".to_string(), "value2".to_string()),
273+
];
274+
275+
let doc = FileLogDocument {
276+
id: "test-id".to_string(),
277+
subgraph_id: "QmTest".to_string(),
278+
timestamp: "2024-01-15T10:30:00Z".to_string(),
279+
level: "error".to_string(),
280+
text: "Test error message".to_string(),
281+
arguments,
282+
meta: FileLogMeta {
283+
module: "test.rs".to_string(),
284+
line: 42,
285+
column: 10,
286+
},
287+
};
288+
289+
let json = serde_json::to_string(&doc).unwrap();
290+
assert!(json.contains("\"id\":\"test-id\""));
291+
assert!(json.contains("\"subgraphId\":\"QmTest\""));
292+
assert!(json.contains("\"level\":\"error\""));
293+
assert!(json.contains("\"text\":\"Test error message\""));
294+
assert!(json.contains("\"arguments\""));
295+
}
296+
297+
#[test]
298+
fn test_file_drain_writes_jsonl() {
299+
use std::io::{BufRead, BufReader};
300+
301+
let temp_dir = TempDir::new().unwrap();
302+
let error_logger = Logger::root(slog::Discard, o!());
303+
304+
let config = FileDrainConfig {
305+
directory: temp_dir.path().to_path_buf(),
306+
subgraph_id: "QmTest".to_string(),
307+
max_file_size: 1024 * 1024,
308+
retention_days: 30,
309+
};
310+
311+
let drain = FileDrain::new(config.clone(), error_logger).unwrap();
312+
313+
// Create a test record
314+
let logger = Logger::root(drain, o!());
315+
info!(logger, "Test message"; "key" => "value");
316+
317+
// Give async drain time to write (in real test we'd use proper sync)
318+
std::thread::sleep(std::time::Duration::from_millis(100));
319+
320+
// Read the file
321+
let file_path = temp_dir.path().join("QmTest.jsonl");
322+
let file = File::open(file_path).unwrap();
323+
let reader = BufReader::new(file);
324+
325+
let lines: Vec<String> = reader.lines().map_while(|r| r.ok()).collect();
326+
327+
// Should have written at least one line
328+
assert!(!lines.is_empty());
329+
330+
// Each line should be valid JSON
331+
for line in lines {
332+
let parsed: serde_json::Value = serde_json::from_str(&line).unwrap();
333+
assert!(parsed.get("id").is_some());
334+
assert!(parsed.get("subgraphId").is_some());
335+
assert!(parsed.get("timestamp").is_some());
336+
assert!(parsed.get("level").is_some());
337+
assert!(parsed.get("text").is_some());
338+
}
339+
}
340+
}

0 commit comments

Comments
 (0)