Skip to content

Commit f0b223d

Browse files
committed
graph,node: Refactor LoggerFactory for multi-backend support
Refactors LoggerFactory to use LogStoreConfig instead of elastic-only: - Replaced elastic_config with log_store_config parameter - Build ElasticLoggingConfig on-demand from LogStoreConfig::Elasticsearch - Support all log drain types (File, Loki, Elasticsearch) - Maintain backward compatibility with existing elastic configuration This enables the factory to create drains for any configured backend while preserving the existing component logger patterns.
1 parent e015112 commit f0b223d

2 files changed

Lines changed: 126 additions & 45 deletions

File tree

graph/src/log/factory.rs

Lines changed: 119 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use prometheus::Counter;
45
use slog::*;
56

7+
use crate::components::log_store::LogStoreConfig;
68
use crate::components::metrics::MetricsRegistry;
79
use crate::components::store::DeploymentLocator;
810
use crate::log::elastic::*;
11+
use crate::log::file::{file_logger, FileDrainConfig};
12+
use crate::log::loki::{loki_logger, LokiDrainConfig};
913
use crate::log::split::*;
1014
use crate::prelude::ENV_VARS;
1115

@@ -23,20 +27,20 @@ pub struct ComponentLoggerConfig {
2327
#[derive(Clone)]
2428
pub struct LoggerFactory {
2529
parent: Logger,
26-
elastic_config: Option<ElasticLoggingConfig>,
30+
log_store_config: Option<LogStoreConfig>,
2731
metrics_registry: Arc<MetricsRegistry>,
2832
}
2933

3034
impl LoggerFactory {
31-
/// Creates a new factory using a parent logger and optional Elasticsearch configuration.
35+
/// Creates a new factory using a parent logger and optional log store configuration.
3236
pub fn new(
3337
logger: Logger,
34-
elastic_config: Option<ElasticLoggingConfig>,
38+
log_store_config: Option<LogStoreConfig>,
3539
metrics_registry: Arc<MetricsRegistry>,
3640
) -> Self {
3741
Self {
3842
parent: logger,
39-
elastic_config,
43+
log_store_config,
4044
metrics_registry,
4145
}
4246
}
@@ -45,7 +49,7 @@ impl LoggerFactory {
4549
pub fn with_parent(&self, parent: Logger) -> Self {
4650
Self {
4751
parent,
48-
elastic_config: self.elastic_config.clone(),
52+
log_store_config: self.log_store_config.clone(),
4953
metrics_registry: self.metrics_registry.clone(),
5054
}
5155
}
@@ -62,56 +66,127 @@ impl LoggerFactory {
6266
None => term_logger,
6367
Some(config) => match config.elastic {
6468
None => term_logger,
65-
Some(config) => self
66-
.elastic_config
67-
.clone()
68-
.map(|elastic_config| {
69-
split_logger(
70-
term_logger.clone(),
71-
elastic_logger(
72-
ElasticDrainConfig {
73-
general: elastic_config,
74-
index: config.index,
75-
custom_id_key: String::from("componentId"),
76-
custom_id_value: component.to_string(),
77-
flush_interval: ENV_VARS.elastic_search_flush_interval,
78-
max_retries: ENV_VARS.elastic_search_max_retries,
79-
},
69+
Some(elastic_component_config) => {
70+
// Check if we have Elasticsearch configured in log_store_config
71+
match &self.log_store_config {
72+
Some(LogStoreConfig::Elasticsearch {
73+
endpoint,
74+
username,
75+
password,
76+
..
77+
}) => {
78+
// Build ElasticLoggingConfig on-demand
79+
let elastic_config = ElasticLoggingConfig {
80+
endpoint: endpoint.clone(),
81+
username: username.clone(),
82+
password: password.clone(),
83+
client: reqwest::Client::new(),
84+
};
85+
86+
split_logger(
8087
term_logger.clone(),
81-
self.logs_sent_counter(None),
82-
),
83-
)
84-
})
85-
.unwrap_or(term_logger),
88+
elastic_logger(
89+
ElasticDrainConfig {
90+
general: elastic_config,
91+
index: elastic_component_config.index,
92+
custom_id_key: String::from("componentId"),
93+
custom_id_value: component.to_string(),
94+
flush_interval: ENV_VARS.elastic_search_flush_interval,
95+
max_retries: ENV_VARS.elastic_search_max_retries,
96+
},
97+
term_logger.clone(),
98+
self.logs_sent_counter(None),
99+
),
100+
)
101+
}
102+
_ => {
103+
// No Elasticsearch configured, just use terminal logger
104+
term_logger
105+
}
106+
}
107+
}
86108
},
87109
}
88110
}
89111

90-
/// Creates a subgraph logger with Elasticsearch support.
112+
/// Creates a subgraph logger with multi-backend support.
91113
pub fn subgraph_logger(&self, loc: &DeploymentLocator) -> Logger {
92114
let term_logger = self
93115
.parent
94116
.new(o!("subgraph_id" => loc.hash.to_string(), "sgd" => loc.id.to_string()));
95117

96-
self.elastic_config
97-
.clone()
98-
.map(|elastic_config| {
99-
split_logger(
118+
// Determine which drain to use based on log_store_config
119+
let drain = match &self.log_store_config {
120+
Some(LogStoreConfig::Elasticsearch {
121+
endpoint,
122+
username,
123+
password,
124+
index,
125+
..
126+
}) => {
127+
// Build ElasticLoggingConfig on-demand
128+
let elastic_config = ElasticLoggingConfig {
129+
endpoint: endpoint.clone(),
130+
username: username.clone(),
131+
password: password.clone(),
132+
client: reqwest::Client::new(),
133+
};
134+
135+
Some(elastic_logger(
136+
ElasticDrainConfig {
137+
general: elastic_config,
138+
index: index.clone(),
139+
custom_id_key: String::from("subgraphId"),
140+
custom_id_value: loc.hash.to_string(),
141+
flush_interval: ENV_VARS.elastic_search_flush_interval,
142+
max_retries: ENV_VARS.elastic_search_max_retries,
143+
},
144+
term_logger.clone(),
145+
self.logs_sent_counter(Some(loc.hash.as_str())),
146+
))
147+
}
148+
149+
None => None,
150+
151+
Some(LogStoreConfig::Loki {
152+
endpoint,
153+
tenant_id,
154+
}) => {
155+
// Use Loki
156+
Some(loki_logger(
157+
LokiDrainConfig {
158+
endpoint: endpoint.clone(),
159+
tenant_id: tenant_id.clone(),
160+
flush_interval: Duration::from_secs(5),
161+
subgraph_id: loc.hash.to_string(),
162+
},
100163
term_logger.clone(),
101-
elastic_logger(
102-
ElasticDrainConfig {
103-
general: elastic_config,
104-
index: ENV_VARS.elastic_search_index.clone(),
105-
custom_id_key: String::from("subgraphId"),
106-
custom_id_value: loc.hash.to_string(),
107-
flush_interval: ENV_VARS.elastic_search_flush_interval,
108-
max_retries: ENV_VARS.elastic_search_max_retries,
109-
},
110-
term_logger.clone(),
111-
self.logs_sent_counter(Some(loc.hash.as_str())),
112-
),
113-
)
114-
})
164+
))
165+
}
166+
167+
Some(LogStoreConfig::File {
168+
directory,
169+
max_file_size,
170+
retention_days,
171+
}) => {
172+
// Use File
173+
Some(file_logger(
174+
FileDrainConfig {
175+
directory: directory.clone(),
176+
subgraph_id: loc.hash.to_string(),
177+
max_file_size: *max_file_size,
178+
retention_days: *retention_days,
179+
},
180+
term_logger.clone(),
181+
))
182+
}
183+
184+
Some(LogStoreConfig::Disabled) => None,
185+
};
186+
187+
// Combine terminal and storage drain
188+
drain
189+
.map(|storage_drain| split_logger(term_logger.clone(), storage_drain))
115190
.unwrap_or(term_logger)
116191
}
117192

node/src/bin/manager.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,13 @@ impl Context {
10331033

10341034
let load_manager = Arc::new(LoadManager::new(&logger, vec![], vec![], registry.clone()));
10351035

1036-
Arc::new(GraphQlRunner::new(&logger, store, load_manager, registry))
1036+
Arc::new(GraphQlRunner::new(
1037+
&logger,
1038+
store,
1039+
load_manager,
1040+
registry,
1041+
Arc::new(graph::components::log_store::NoOpLogStore),
1042+
))
10371043
}
10381044

10391045
async fn networks(&self) -> anyhow::Result<Networks> {

0 commit comments

Comments
 (0)