Skip to content

Commit cd2acd7

Browse files
committed
graph: Add LogStore trait and core types
Introduces the foundation for the log store system with: - LogStore trait for querying logs from backends - LogLevel enum with FromStr trait implementation - LogEntry and LogQuery types for structured log data - LogStoreFactory for creating backend instances - NoOpLogStore as default (disabled) implementation This establishes the core abstractions that all log storage backends will implement.
1 parent dc48c6c commit cd2acd7

2 files changed

Lines changed: 334 additions & 0 deletions

File tree

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
pub mod config;
2+
pub mod elasticsearch;
3+
pub mod file;
4+
pub mod loki;
5+
6+
use async_trait::async_trait;
7+
use std::path::PathBuf;
8+
use std::str::FromStr;
9+
use std::sync::Arc;
10+
use thiserror::Error;
11+
12+
use crate::prelude::DeploymentHash;
13+
14+
#[derive(Error, Debug)]
15+
pub enum LogStoreError {
16+
#[error("log store query failed: {0}")]
17+
QueryFailed(#[from] anyhow::Error),
18+
19+
#[error("log store is unavailable")]
20+
Unavailable,
21+
22+
#[error("log store initialization failed: {0}")]
23+
InitializationFailed(anyhow::Error),
24+
25+
#[error("log store configuration error: {0}")]
26+
ConfigurationError(anyhow::Error),
27+
}
28+
29+
/// Configuration for different log store backends
30+
#[derive(Debug, Clone)]
31+
pub enum LogStoreConfig {
32+
/// No logging - returns empty results
33+
Disabled,
34+
35+
/// Elasticsearch backend
36+
Elasticsearch {
37+
endpoint: String,
38+
username: Option<String>,
39+
password: Option<String>,
40+
index: String,
41+
timeout_secs: u64,
42+
},
43+
44+
/// Loki (Grafana's log aggregation system)
45+
Loki {
46+
endpoint: String,
47+
tenant_id: Option<String>,
48+
},
49+
50+
/// File-based logs (JSON lines format)
51+
File {
52+
directory: PathBuf,
53+
max_file_size: u64,
54+
retention_days: u32,
55+
},
56+
}
57+
58+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59+
pub enum LogLevel {
60+
Critical,
61+
Error,
62+
Warning,
63+
Info,
64+
Debug,
65+
}
66+
67+
impl LogLevel {
68+
pub fn as_str(&self) -> &'static str {
69+
match self {
70+
LogLevel::Critical => "critical",
71+
LogLevel::Error => "error",
72+
LogLevel::Warning => "warning",
73+
LogLevel::Info => "info",
74+
LogLevel::Debug => "debug",
75+
}
76+
}
77+
}
78+
79+
impl FromStr for LogLevel {
80+
type Err = String;
81+
82+
fn from_str(s: &str) -> Result<Self, Self::Err> {
83+
match s.trim().to_lowercase().as_str() {
84+
"critical" => Ok(LogLevel::Critical),
85+
"error" => Ok(LogLevel::Error),
86+
"warning" => Ok(LogLevel::Warning),
87+
"info" => Ok(LogLevel::Info),
88+
"debug" => Ok(LogLevel::Debug),
89+
_ => Err(format!("Invalid log level: {}", s)),
90+
}
91+
}
92+
}
93+
94+
#[derive(Debug, Clone)]
95+
pub struct LogMeta {
96+
pub module: String,
97+
pub line: i64,
98+
pub column: i64,
99+
}
100+
101+
#[derive(Debug, Clone)]
102+
pub struct LogEntry {
103+
pub id: String,
104+
pub subgraph_id: DeploymentHash,
105+
pub timestamp: String,
106+
pub level: LogLevel,
107+
pub text: String,
108+
pub arguments: Vec<(String, String)>,
109+
pub meta: LogMeta,
110+
}
111+
112+
#[derive(Debug, Clone)]
113+
pub struct LogQuery {
114+
pub subgraph_id: DeploymentHash,
115+
pub level: Option<LogLevel>,
116+
pub from: Option<String>,
117+
pub to: Option<String>,
118+
pub search: Option<String>,
119+
pub first: u32,
120+
pub skip: u32,
121+
}
122+
123+
#[async_trait]
124+
pub trait LogStore: Send + Sync + 'static {
125+
async fn query_logs(&self, query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError>;
126+
fn is_available(&self) -> bool;
127+
}
128+
129+
/// Factory for creating LogStore instances from configuration
130+
pub struct LogStoreFactory;
131+
132+
impl LogStoreFactory {
133+
/// Create a LogStore from configuration
134+
pub fn from_config(config: LogStoreConfig) -> Result<Arc<dyn LogStore>, LogStoreError> {
135+
match config {
136+
LogStoreConfig::Disabled => Ok(Arc::new(NoOpLogStore)),
137+
138+
LogStoreConfig::Elasticsearch {
139+
endpoint,
140+
username,
141+
password,
142+
index,
143+
timeout_secs,
144+
} => {
145+
let timeout = std::time::Duration::from_secs(timeout_secs);
146+
let client = reqwest::Client::builder()
147+
.timeout(timeout)
148+
.build()
149+
.map_err(|e| LogStoreError::InitializationFailed(e.into()))?;
150+
151+
let config = crate::log::elastic::ElasticLoggingConfig {
152+
endpoint,
153+
username,
154+
password,
155+
client,
156+
};
157+
158+
Ok(Arc::new(elasticsearch::ElasticsearchLogStore::new(
159+
config, index, timeout,
160+
)))
161+
}
162+
163+
LogStoreConfig::Loki {
164+
endpoint,
165+
tenant_id,
166+
} => Ok(Arc::new(loki::LokiLogStore::new(endpoint, tenant_id)?)),
167+
168+
LogStoreConfig::File {
169+
directory,
170+
max_file_size,
171+
retention_days,
172+
} => Ok(Arc::new(file::FileLogStore::new(
173+
directory,
174+
max_file_size,
175+
retention_days,
176+
)?)),
177+
}
178+
}
179+
180+
/// Parse configuration from environment variables
181+
///
182+
/// Supports both new (GRAPH_LOG_STORE_*) and old (deprecated) environment variable names
183+
/// for backward compatibility. The new keys take precedence when both are set.
184+
pub fn from_env() -> Result<LogStoreConfig, LogStoreError> {
185+
// Logger for deprecation warnings
186+
let logger = crate::log::logger(false);
187+
188+
// Read backend selector with backward compatibility
189+
let backend = config::read_env_with_default(
190+
&logger,
191+
"GRAPH_LOG_STORE_BACKEND",
192+
"GRAPH_LOG_STORE",
193+
"disabled",
194+
);
195+
196+
match backend.to_lowercase().as_str() {
197+
"disabled" | "none" => Ok(LogStoreConfig::Disabled),
198+
199+
"elasticsearch" | "elastic" | "es" => {
200+
let endpoint = config::read_env_with_fallback(
201+
&logger,
202+
"GRAPH_LOG_STORE_ELASTICSEARCH_URL",
203+
"GRAPH_ELASTICSEARCH_URL",
204+
)
205+
.ok_or_else(|| {
206+
LogStoreError::ConfigurationError(anyhow::anyhow!(
207+
"Elasticsearch endpoint not set. Use GRAPH_LOG_STORE_ELASTICSEARCH_URL environment variable"
208+
))
209+
})?;
210+
211+
let username = config::read_env_with_fallback(
212+
&logger,
213+
"GRAPH_LOG_STORE_ELASTICSEARCH_USER",
214+
"GRAPH_ELASTICSEARCH_USER",
215+
);
216+
217+
let password = config::read_env_with_fallback(
218+
&logger,
219+
"GRAPH_LOG_STORE_ELASTICSEARCH_PASSWORD",
220+
"GRAPH_ELASTICSEARCH_PASSWORD",
221+
);
222+
223+
let index = config::read_env_with_default(
224+
&logger,
225+
"GRAPH_LOG_STORE_ELASTICSEARCH_INDEX",
226+
"GRAPH_ELASTIC_SEARCH_INDEX",
227+
"subgraph",
228+
);
229+
230+
// Default: 10 seconds query timeout
231+
// Configurable via GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT environment variable
232+
let timeout_secs = config::read_u64_with_fallback(
233+
&logger,
234+
"GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT",
235+
"GRAPH_ELASTICSEARCH_TIMEOUT",
236+
10,
237+
);
238+
239+
Ok(LogStoreConfig::Elasticsearch {
240+
endpoint,
241+
username,
242+
password,
243+
index,
244+
timeout_secs,
245+
})
246+
}
247+
248+
"loki" => {
249+
let endpoint = config::read_env_with_fallback(
250+
&logger,
251+
"GRAPH_LOG_STORE_LOKI_URL",
252+
"GRAPH_LOG_LOKI_ENDPOINT",
253+
)
254+
.ok_or_else(|| {
255+
LogStoreError::ConfigurationError(anyhow::anyhow!(
256+
"Loki endpoint not set. Use GRAPH_LOG_STORE_LOKI_URL environment variable"
257+
))
258+
})?;
259+
260+
let tenant_id = config::read_env_with_fallback(
261+
&logger,
262+
"GRAPH_LOG_STORE_LOKI_TENANT_ID",
263+
"GRAPH_LOG_LOKI_TENANT",
264+
);
265+
266+
Ok(LogStoreConfig::Loki {
267+
endpoint,
268+
tenant_id,
269+
})
270+
}
271+
272+
"file" | "files" => {
273+
let directory = config::read_env_with_fallback(
274+
&logger,
275+
"GRAPH_LOG_STORE_FILE_DIR",
276+
"GRAPH_LOG_FILE_DIR",
277+
)
278+
.ok_or_else(|| {
279+
LogStoreError::ConfigurationError(anyhow::anyhow!(
280+
"File log directory not set. Use GRAPH_LOG_STORE_FILE_DIR environment variable"
281+
))
282+
})
283+
.map(PathBuf::from)?;
284+
285+
// Default: 100MB per file (104857600 bytes)
286+
// Configurable via GRAPH_LOG_STORE_FILE_MAX_SIZE environment variable
287+
let max_file_size = config::read_u64_with_fallback(
288+
&logger,
289+
"GRAPH_LOG_STORE_FILE_MAX_SIZE",
290+
"GRAPH_LOG_FILE_MAX_SIZE",
291+
100 * 1024 * 1024,
292+
);
293+
294+
// Default: 30 days retention
295+
// Configurable via GRAPH_LOG_STORE_FILE_RETENTION_DAYS environment variable
296+
let retention_days = config::read_u32_with_fallback(
297+
&logger,
298+
"GRAPH_LOG_STORE_FILE_RETENTION_DAYS",
299+
"GRAPH_LOG_FILE_RETENTION_DAYS",
300+
30,
301+
);
302+
303+
Ok(LogStoreConfig::File {
304+
directory,
305+
max_file_size,
306+
retention_days,
307+
})
308+
}
309+
310+
_ => Err(LogStoreError::ConfigurationError(anyhow::anyhow!(
311+
"Unknown log store backend: {}. Valid options: disabled, elasticsearch, loki, file",
312+
backend
313+
))),
314+
}
315+
}
316+
}
317+
318+
/// A no-op LogStore that returns empty results
319+
/// Used when logging is disabled
320+
pub struct NoOpLogStore;
321+
322+
#[async_trait]
323+
impl LogStore for NoOpLogStore {
324+
async fn query_logs(&self, _query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError> {
325+
Ok(vec![])
326+
}
327+
328+
fn is_available(&self) -> bool {
329+
false
330+
}
331+
}

graph/src/components/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ pub mod server;
5050
/// Components dealing with storing entities.
5151
pub mod store;
5252

53+
/// Components dealing with log storage.
54+
pub mod log_store;
55+
5356
pub mod link_resolver;
5457

5558
pub mod trigger_processor;

0 commit comments

Comments
 (0)