Skip to content

Commit 1dbf961

Browse files
committed
graph, node: Refactor LoggerFactory for multi-backend log support
Refactor LoggerFactory to support multiple log storage backends (Elasticsearch, Loki, File) through a unified configuration approach
1 parent 7d2852f commit 1dbf961

3 files changed

Lines changed: 210 additions & 56 deletions

File tree

graph/src/log/factory.rs

Lines changed: 118 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use prometheus::Counter;
45
use slog::*;
56

7+
use crate::components::log_store::LogStoreConfig;
68
use crate::components::metrics::MetricsRegistry;
79
use crate::components::store::DeploymentLocator;
810
use crate::log::elastic::*;
11+
use crate::log::file::{file_logger, FileDrainConfig};
12+
use crate::log::loki::{loki_logger, LokiDrainConfig};
913
use crate::log::split::*;
1014
use crate::prelude::ENV_VARS;
1115

@@ -23,20 +27,20 @@ pub struct ComponentLoggerConfig {
2327
#[derive(Clone)]
2428
pub struct LoggerFactory {
2529
parent: Logger,
26-
elastic_config: Option<ElasticLoggingConfig>,
30+
log_store_config: Option<LogStoreConfig>,
2731
metrics_registry: Arc<MetricsRegistry>,
2832
}
2933

3034
impl LoggerFactory {
31-
/// Creates a new factory using a parent logger and optional Elasticsearch configuration.
35+
/// Creates a new factory using a parent logger and optional log store configuration.
3236
pub fn new(
3337
logger: Logger,
34-
elastic_config: Option<ElasticLoggingConfig>,
38+
log_store_config: Option<LogStoreConfig>,
3539
metrics_registry: Arc<MetricsRegistry>,
3640
) -> Self {
3741
Self {
3842
parent: logger,
39-
elastic_config,
43+
log_store_config,
4044
metrics_registry,
4145
}
4246
}
@@ -45,7 +49,7 @@ impl LoggerFactory {
4549
pub fn with_parent(&self, parent: Logger) -> Self {
4650
Self {
4751
parent,
48-
elastic_config: self.elastic_config.clone(),
52+
log_store_config: self.log_store_config.clone(),
4953
metrics_registry: self.metrics_registry.clone(),
5054
}
5155
}
@@ -62,56 +66,126 @@ impl LoggerFactory {
6266
None => term_logger,
6367
Some(config) => match config.elastic {
6468
None => term_logger,
65-
Some(config) => self
66-
.elastic_config
67-
.clone()
68-
.map(|elastic_config| {
69-
split_logger(
70-
term_logger.clone(),
71-
elastic_logger(
72-
ElasticDrainConfig {
73-
general: elastic_config,
74-
index: config.index,
75-
custom_id_key: String::from("componentId"),
76-
custom_id_value: component.to_string(),
77-
flush_interval: ENV_VARS.elastic_search_flush_interval,
78-
max_retries: ENV_VARS.elastic_search_max_retries,
79-
},
69+
Some(elastic_component_config) => {
70+
// Check if we have Elasticsearch configured in log_store_config
71+
match &self.log_store_config {
72+
Some(LogStoreConfig::Elasticsearch {
73+
endpoint,
74+
username,
75+
password,
76+
..
77+
}) => {
78+
// Build ElasticLoggingConfig on-demand
79+
let elastic_config = ElasticLoggingConfig {
80+
endpoint: endpoint.clone(),
81+
username: username.clone(),
82+
password: password.clone(),
83+
client: reqwest::Client::new(),
84+
};
85+
86+
split_logger(
8087
term_logger.clone(),
81-
self.logs_sent_counter(None),
82-
),
83-
)
84-
})
85-
.unwrap_or(term_logger),
88+
elastic_logger(
89+
ElasticDrainConfig {
90+
general: elastic_config,
91+
index: elastic_component_config.index,
92+
custom_id_key: String::from("componentId"),
93+
custom_id_value: component.to_string(),
94+
flush_interval: ENV_VARS.elastic_search_flush_interval,
95+
max_retries: ENV_VARS.elastic_search_max_retries,
96+
},
97+
term_logger.clone(),
98+
self.logs_sent_counter(None),
99+
),
100+
)
101+
}
102+
_ => {
103+
// No Elasticsearch configured, just use terminal logger
104+
term_logger
105+
}
106+
}
107+
}
86108
},
87109
}
88110
}
89111

90-
/// Creates a subgraph logger with Elasticsearch support.
112+
/// Creates a subgraph logger with multi-backend support.
91113
pub fn subgraph_logger(&self, loc: &DeploymentLocator) -> Logger {
92114
let term_logger = self
93115
.parent
94116
.new(o!("subgraph_id" => loc.hash.to_string(), "sgd" => loc.id.to_string()));
95117

96-
self.elastic_config
97-
.clone()
98-
.map(|elastic_config| {
99-
split_logger(
118+
// Determine which drain to use based on log_store_config
119+
let drain = match &self.log_store_config {
120+
Some(LogStoreConfig::Elasticsearch {
121+
endpoint,
122+
username,
123+
password,
124+
index,
125+
}) => {
126+
// Build ElasticLoggingConfig on-demand
127+
let elastic_config = ElasticLoggingConfig {
128+
endpoint: endpoint.clone(),
129+
username: username.clone(),
130+
password: password.clone(),
131+
client: reqwest::Client::new(),
132+
};
133+
134+
Some(elastic_logger(
135+
ElasticDrainConfig {
136+
general: elastic_config,
137+
index: index.clone(),
138+
custom_id_key: String::from("subgraphId"),
139+
custom_id_value: loc.hash.to_string(),
140+
flush_interval: ENV_VARS.elastic_search_flush_interval,
141+
max_retries: ENV_VARS.elastic_search_max_retries,
142+
},
143+
term_logger.clone(),
144+
self.logs_sent_counter(Some(loc.hash.as_str())),
145+
))
146+
}
147+
148+
None => None,
149+
150+
Some(LogStoreConfig::Loki {
151+
endpoint,
152+
tenant_id,
153+
}) => {
154+
// Use Loki
155+
Some(loki_logger(
156+
LokiDrainConfig {
157+
endpoint: endpoint.clone(),
158+
tenant_id: tenant_id.clone(),
159+
flush_interval: Duration::from_secs(5),
160+
subgraph_id: loc.hash.to_string(),
161+
},
100162
term_logger.clone(),
101-
elastic_logger(
102-
ElasticDrainConfig {
103-
general: elastic_config,
104-
index: ENV_VARS.elastic_search_index.clone(),
105-
custom_id_key: String::from("subgraphId"),
106-
custom_id_value: loc.hash.to_string(),
107-
flush_interval: ENV_VARS.elastic_search_flush_interval,
108-
max_retries: ENV_VARS.elastic_search_max_retries,
109-
},
110-
term_logger.clone(),
111-
self.logs_sent_counter(Some(loc.hash.as_str())),
112-
),
113-
)
114-
})
163+
))
164+
}
165+
166+
Some(LogStoreConfig::File {
167+
directory,
168+
max_file_size,
169+
retention_days,
170+
}) => {
171+
// Use File
172+
Some(file_logger(
173+
FileDrainConfig {
174+
directory: directory.clone(),
175+
subgraph_id: loc.hash.to_string(),
176+
max_file_size: *max_file_size,
177+
retention_days: *retention_days,
178+
},
179+
term_logger.clone(),
180+
))
181+
}
182+
183+
Some(LogStoreConfig::Disabled) => None,
184+
};
185+
186+
// Combine terminal and storage drain
187+
drain
188+
.map(|storage_drain| split_logger(term_logger.clone(), storage_drain))
115189
.unwrap_or(term_logger)
116190
}
117191

graph/src/log/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use crate::prelude::ENV_VARS;
2929
pub mod codes;
3030
pub mod elastic;
3131
pub mod factory;
32+
pub mod file;
33+
pub mod loki;
3234
pub mod split;
3335

3436
pub fn logger(show_debug: bool) -> Logger {

node/src/launcher.rs

Lines changed: 90 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -454,23 +454,101 @@ pub async fn run(
454454
});
455455

456456
// Create log store for querying logs
457+
// Priority: GRAPH_LOG_STORE env var, then Elasticsearch config from CLI, then disabled
457458
let log_store: Arc<dyn graph::components::log_store::LogStore> =
458-
if let Some(ref elastic_config) = elastic_config {
459-
let index = std::env::var("GRAPH_ELASTIC_SEARCH_INDEX")
460-
.unwrap_or_else(|_| "subgraph".to_string());
461-
Arc::new(
462-
graph::components::log_store::elasticsearch::ElasticsearchLogStore::new(
463-
elastic_config.clone(),
464-
index,
465-
),
466-
)
467-
} else {
468-
Arc::new(graph::components::log_store::NoOpLogStore)
459+
match graph::components::log_store::LogStoreFactory::from_env() {
460+
Ok(config) => {
461+
match graph::components::log_store::LogStoreFactory::from_config(config) {
462+
Ok(store) => {
463+
info!(
464+
logger,
465+
"Log store initialized from GRAPH_LOG_STORE environment variable"
466+
);
467+
store
468+
}
469+
Err(e) => {
470+
warn!(logger, "Failed to initialize log store from GRAPH_LOG_STORE: {}, falling back to Elasticsearch config", e);
471+
// Fall back to Elasticsearch if env config fails
472+
if let Some(ref elastic_config) = elastic_config {
473+
let config =
474+
graph::components::log_store::LogStoreConfig::Elasticsearch {
475+
endpoint: elastic_config.endpoint.clone(),
476+
username: elastic_config.username.clone(),
477+
password: elastic_config.password.clone(),
478+
index: std::env::var("GRAPH_ELASTIC_SEARCH_INDEX")
479+
.unwrap_or_else(|_| "subgraph".to_string()),
480+
};
481+
graph::components::log_store::LogStoreFactory::from_config(config)
482+
.unwrap_or_else(|_| {
483+
Arc::new(graph::components::log_store::NoOpLogStore)
484+
})
485+
} else {
486+
Arc::new(graph::components::log_store::NoOpLogStore)
487+
}
488+
}
489+
}
490+
}
491+
Err(_) => {
492+
// No GRAPH_LOG_STORE env var, fall back to Elasticsearch from CLI args
493+
if let Some(ref elastic_config) = elastic_config {
494+
let config = graph::components::log_store::LogStoreConfig::Elasticsearch {
495+
endpoint: elastic_config.endpoint.clone(),
496+
username: elastic_config.username.clone(),
497+
password: elastic_config.password.clone(),
498+
index: std::env::var("GRAPH_ELASTIC_SEARCH_INDEX")
499+
.unwrap_or_else(|_| "subgraph".to_string()),
500+
};
501+
match graph::components::log_store::LogStoreFactory::from_config(config) {
502+
Ok(store) => {
503+
info!(
504+
logger,
505+
"Log store initialized from Elasticsearch CLI configuration"
506+
);
507+
store
508+
}
509+
Err(e) => {
510+
warn!(
511+
logger,
512+
"Failed to initialize Elasticsearch log store: {}, using NoOp", e
513+
);
514+
Arc::new(graph::components::log_store::NoOpLogStore)
515+
}
516+
}
517+
} else {
518+
info!(
519+
logger,
520+
"No log store configured, queries will return empty results"
521+
);
522+
Arc::new(graph::components::log_store::NoOpLogStore)
523+
}
524+
}
469525
};
470526

527+
// Parse log store config for drain selection
528+
// Priority: GRAPH_LOG_STORE env var, then Elasticsearch CLI config, then disabled
529+
let log_store_config = graph::components::log_store::LogStoreFactory::from_env()
530+
.ok()
531+
.or_else(|| {
532+
// Fallback to Elasticsearch if configured via CLI
533+
elastic_config.as_ref().map(|ec| {
534+
graph::components::log_store::LogStoreConfig::Elasticsearch {
535+
endpoint: ec.endpoint.clone(),
536+
username: ec.username.clone(),
537+
password: ec.password.clone(),
538+
index: std::env::var("GRAPH_ELASTIC_SEARCH_INDEX")
539+
.unwrap_or_else(|_| "subgraph".to_string()),
540+
}
541+
})
542+
});
543+
544+
// Log which backend is being used for log drains
545+
if let Some(ref config) = log_store_config {
546+
info!(logger, "Log drain initialized"; "backend" => format!("{:?}", config));
547+
}
548+
471549
// Create a component and subgraph logger factory
472550
let logger_factory =
473-
LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone());
551+
LoggerFactory::new(logger.clone(), log_store_config, metrics_registry.clone());
474552

475553
let arweave_resolver = Arc::new(ArweaveClient::new(
476554
logger.cheap_clone(),

0 commit comments

Comments
 (0)