Skip to content

Commit f47e987

Browse files
committed
graphql,node,server,store: Wire up _logs query in execution
Integrates _logs query into the GraphQL execution pipeline: Execution layer: - Execute _logs queries via log_store.query_logs() - Convert LogEntry results to GraphQL response objects - Handle log store errors gracefully Query parsing: - Recognize _logs as special query field - Build LogQuery from GraphQL arguments - Pass log_store to execution context Service wiring: - Create log store from configuration in launcher - Provide log store to GraphQL runner - Use NoOpLogStore in test environments This completes the read path from GraphQL query to log storage backend.
1 parent 2bfcccf commit f47e987

6 files changed

Lines changed: 195 additions & 15 deletions

File tree

graphql/src/execution/execution.rs

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use graph::{
88
},
99
futures03::future::TryFutureExt,
1010
prelude::{s, CheapClone},
11-
schema::{is_introspection_field, INTROSPECTION_QUERY_TYPE, META_FIELD_NAME},
11+
schema::{is_introspection_field, INTROSPECTION_QUERY_TYPE, LOGS_FIELD_NAME, META_FIELD_NAME},
1212
util::{herd_cache::HerdCache, lfu_cache::EvictStats, timed_rw_lock::TimedMutex},
1313
};
1414
use lazy_static::lazy_static;
@@ -231,6 +231,9 @@ where
231231

232232
/// Whether to include an execution trace in the result
233233
pub trace: bool,
234+
235+
/// The log store to use for querying logs.
236+
pub log_store: Arc<dyn graph::components::log_store::LogStore>,
234237
}
235238

236239
pub(crate) fn get_field<'a>(
@@ -264,6 +267,7 @@ where
264267
// `cache_status` is a dead value for the introspection context.
265268
cache_status: AtomicCell::new(CacheStatus::Miss),
266269
trace: ENV_VARS.log_sql_timing(),
270+
log_store: self.log_store.cheap_clone(),
267271
}
268272
}
269273
}
@@ -273,18 +277,21 @@ pub(crate) async fn execute_root_selection_set_uncached(
273277
selection_set: &a::SelectionSet,
274278
root_type: &sast::ObjectType,
275279
) -> Result<(Object, Trace), Vec<QueryExecutionError>> {
276-
// Split the top-level fields into introspection fields and
277-
// regular data fields
280+
// Split the top-level fields into introspection fields,
281+
// logs fields, meta fields, and regular data fields
278282
let mut data_set = a::SelectionSet::empty_from(selection_set);
279283
let mut intro_set = a::SelectionSet::empty_from(selection_set);
280284
let mut meta_items = Vec::new();
285+
let mut logs_fields = Vec::new();
281286

282287
for field in selection_set.fields_for(root_type)? {
283288
// See if this is an introspection or data field. We don't worry about
284289
// non-existent fields; those will cause an error later when we execute
285290
// the data_set SelectionSet
286291
if is_introspection_field(&field.name) {
287292
intro_set.push(field)?
293+
} else if field.name == LOGS_FIELD_NAME {
294+
logs_fields.push(field)
288295
} else if field.name == META_FIELD_NAME || field.name == "__typename" {
289296
meta_items.push(field)
290297
} else {
@@ -314,6 +321,64 @@ pub(crate) async fn execute_root_selection_set_uncached(
314321
);
315322
}
316323

324+
// Resolve logs fields, if there are any
325+
for field in logs_fields {
326+
use graph::data::graphql::object;
327+
328+
// Build log query from field arguments
329+
let log_query = crate::store::logs::build_log_query(field, ctx.query.schema.id())
330+
.map_err(|e| vec![e])?;
331+
332+
// Query the log store
333+
let log_entries = ctx.log_store.query_logs(log_query).await.map_err(|e| {
334+
vec![QueryExecutionError::StoreError(
335+
anyhow::Error::from(e).into(),
336+
)]
337+
})?;
338+
339+
// Convert log entries to GraphQL values
340+
let log_values: Vec<r::Value> = log_entries
341+
.into_iter()
342+
.map(|entry| {
343+
// Convert arguments Vec<(String, String)> to GraphQL objects
344+
let arguments: Vec<r::Value> = entry
345+
.arguments
346+
.into_iter()
347+
.map(|(key, value)| {
348+
object! {
349+
key: key,
350+
value: value,
351+
__typename: "_LogArgument_"
352+
}
353+
})
354+
.collect();
355+
356+
// Convert log level to string
357+
let level_str = entry.level.as_str().to_uppercase();
358+
359+
object! {
360+
id: entry.id,
361+
subgraphId: entry.subgraph_id.to_string(),
362+
timestamp: entry.timestamp,
363+
level: level_str,
364+
text: entry.text,
365+
arguments: arguments,
366+
meta: object! {
367+
module: entry.meta.module,
368+
line: r::Value::Int(entry.meta.line),
369+
column: r::Value::Int(entry.meta.column),
370+
__typename: "_LogMeta_"
371+
},
372+
__typename: "_Log_"
373+
}
374+
})
375+
.collect();
376+
377+
let response_key = Word::from(field.response_key());
378+
let logs_object = Object::from_iter(vec![(response_key, r::Value::List(log_values))]);
379+
values.append(logs_object);
380+
}
381+
317382
Ok((values, trace))
318383
}
319384

graphql/src/query/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ pub struct QueryExecutionOptions<R> {
2929

3030
/// Whether to include an execution trace in the result
3131
pub trace: bool,
32+
33+
/// The log store to use for querying logs.
34+
pub log_store: Arc<dyn graph::components::log_store::LogStore>,
3235
}
3336

3437
/// Executes a query and returns a result.
@@ -52,6 +55,7 @@ where
5255
max_skip: options.max_skip,
5356
cache_status: Default::default(),
5457
trace: options.trace,
58+
log_store: options.log_store,
5559
});
5660

5761
let selection_set = selection_set

graphql/src/runner.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub struct GraphQlRunner<S> {
2626
store: Arc<S>,
2727
load_manager: Arc<LoadManager>,
2828
graphql_metrics: Arc<GraphQLMetrics>,
29+
log_store: Arc<dyn graph::components::log_store::LogStore>,
2930
}
3031

3132
#[cfg(debug_assertions)]
@@ -44,6 +45,7 @@ where
4445
store: Arc<S>,
4546
load_manager: Arc<LoadManager>,
4647
registry: Arc<MetricsRegistry>,
48+
log_store: Arc<dyn graph::components::log_store::LogStore>,
4749
) -> Self {
4850
let logger = logger.new(o!("component" => "GraphQlRunner"));
4951
let graphql_metrics = Arc::new(GraphQLMetrics::new(registry));
@@ -52,6 +54,7 @@ where
5254
store,
5355
load_manager,
5456
graphql_metrics,
57+
log_store,
5558
}
5659
}
5760

@@ -186,6 +189,7 @@ where
186189
max_first: max_first.unwrap_or(ENV_VARS.graphql.max_first),
187190
max_skip: max_skip.unwrap_or(ENV_VARS.graphql.max_skip),
188191
trace: do_trace,
192+
log_store: self.log_store.cheap_clone(),
189193
},
190194
));
191195
}

node/src/launcher.rs

Lines changed: 116 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use tokio_util::sync::CancellationToken;
3737

3838
use crate::config::Config;
3939
use crate::helpers::watch_subgraph_updates;
40+
use crate::log_config_provider::{LogStoreConfigProvider, LogStoreConfigSources};
4041
use crate::network_setup::Networks;
4142
use crate::opt::Opt;
4243
use crate::store_builder::StoreBuilder;
@@ -367,6 +368,7 @@ fn build_graphql_server(
367368
metrics_registry: Arc<MetricsRegistry>,
368369
network_store: &Arc<Store>,
369370
logger_factory: &LoggerFactory,
371+
log_store: Arc<dyn graph::components::log_store::LogStore>,
370372
) -> GraphQLQueryServer<GraphQlRunner<Store>> {
371373
let shards: Vec<_> = config.stores.keys().cloned().collect();
372374
let load_manager = Arc::new(LoadManager::new(
@@ -380,6 +382,7 @@ fn build_graphql_server(
380382
network_store.clone(),
381383
load_manager,
382384
metrics_registry,
385+
log_store,
383386
));
384387

385388
GraphQLQueryServer::new(logger_factory, graphql_runner.clone())
@@ -443,20 +446,121 @@ pub async fn run(
443446

444447
info!(logger, "Starting up"; "node_id" => &node_id);
445448

446-
// Optionally, identify the Elasticsearch logging configuration
447-
let elastic_config = opt
448-
.elasticsearch_url
449-
.clone()
450-
.map(|endpoint| ElasticLoggingConfig {
451-
endpoint,
452-
username: opt.elasticsearch_user.clone(),
453-
password: opt.elasticsearch_password.clone(),
454-
client: reqwest::Client::new(),
455-
});
449+
// Create log store configuration provider
450+
// Build LogStoreConfig from CLI args with backward compatibility
451+
let cli_config = if let Some(backend) = opt.log_store_backend.as_ref() {
452+
// New generic CLI args used
453+
match backend.to_lowercase().as_str() {
454+
"elasticsearch" | "elastic" | "es" => {
455+
let url = opt
456+
.log_store_elasticsearch_url
457+
.clone()
458+
.or_else(|| {
459+
if opt.elasticsearch_url.is_some() {
460+
warn!(
461+
logger,
462+
"Using deprecated --elasticsearch-url, use --log-store-elasticsearch-url instead"
463+
);
464+
}
465+
opt.elasticsearch_url.clone()
466+
});
467+
468+
url.map(|endpoint| {
469+
let index = opt
470+
.log_store_elasticsearch_index
471+
.clone()
472+
.or_else(|| std::env::var("GRAPH_LOG_STORE_ELASTICSEARCH_INDEX").ok())
473+
.or_else(|| std::env::var("GRAPH_ELASTIC_SEARCH_INDEX").ok())
474+
.unwrap_or_else(|| "subgraph".to_string());
475+
476+
let timeout_secs = std::env::var("GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT")
477+
.or_else(|_| std::env::var("GRAPH_ELASTICSEARCH_TIMEOUT"))
478+
.ok()
479+
.and_then(|s| s.parse::<u64>().ok())
480+
.unwrap_or(10);
481+
482+
graph::components::log_store::LogStoreConfig::Elasticsearch {
483+
endpoint,
484+
username: opt
485+
.log_store_elasticsearch_user
486+
.clone()
487+
.or_else(|| opt.elasticsearch_user.clone()),
488+
password: opt
489+
.log_store_elasticsearch_password
490+
.clone()
491+
.or_else(|| opt.elasticsearch_password.clone()),
492+
index,
493+
timeout_secs,
494+
}
495+
})
496+
}
497+
498+
"loki" => opt.log_store_loki_url.clone().map(|endpoint| {
499+
graph::components::log_store::LogStoreConfig::Loki {
500+
endpoint,
501+
tenant_id: opt.log_store_loki_tenant_id.clone(),
502+
}
503+
}),
504+
505+
"file" | "files" => opt.log_store_file_dir.clone().map(|directory| {
506+
graph::components::log_store::LogStoreConfig::File {
507+
directory: std::path::PathBuf::from(directory),
508+
max_file_size: opt.log_store_file_max_size.unwrap_or(100 * 1024 * 1024),
509+
retention_days: opt.log_store_file_retention_days.unwrap_or(30),
510+
}
511+
}),
512+
513+
"disabled" | "none" => None,
514+
515+
other => {
516+
warn!(logger, "Invalid log store backend: {}", other);
517+
None
518+
}
519+
}
520+
} else if opt.elasticsearch_url.is_some() {
521+
// Old Elasticsearch-specific CLI args used (backward compatibility)
522+
warn!(
523+
logger,
524+
"Using deprecated --elasticsearch-url CLI argument, \
525+
please use --log-store-backend elasticsearch --log-store-elasticsearch-url instead"
526+
);
527+
528+
let index = opt
529+
.log_store_elasticsearch_index
530+
.clone()
531+
.or_else(|| std::env::var("GRAPH_LOG_STORE_ELASTICSEARCH_INDEX").ok())
532+
.or_else(|| std::env::var("GRAPH_ELASTIC_SEARCH_INDEX").ok())
533+
.unwrap_or_else(|| "subgraph".to_string());
534+
535+
let timeout_secs = std::env::var("GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT")
536+
.or_else(|_| std::env::var("GRAPH_ELASTICSEARCH_TIMEOUT"))
537+
.ok()
538+
.and_then(|s| s.parse::<u64>().ok())
539+
.unwrap_or(10);
540+
541+
Some(
542+
graph::components::log_store::LogStoreConfig::Elasticsearch {
543+
endpoint: opt.elasticsearch_url.clone().unwrap(),
544+
username: opt.elasticsearch_user.clone(),
545+
password: opt.elasticsearch_password.clone(),
546+
index,
547+
timeout_secs,
548+
},
549+
)
550+
} else {
551+
// No CLI config provided
552+
None
553+
};
554+
555+
let log_config_provider = LogStoreConfigProvider::new(LogStoreConfigSources { cli_config });
556+
557+
// Resolve log store (for querying) and config (for drains)
558+
// Priority: GRAPH_LOG_STORE env var → CLI config → NoOp/None
559+
let (log_store, log_store_config) = log_config_provider.resolve(&logger);
456560

457561
// Create a component and subgraph logger factory
458562
let logger_factory =
459-
LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone());
563+
LoggerFactory::new(logger.clone(), log_store_config, metrics_registry.clone());
460564

461565
let arweave_resolver = Arc::new(ArweaveClient::new(
462566
logger.cheap_clone(),
@@ -573,6 +677,7 @@ pub async fn run(
573677
metrics_registry.clone(),
574678
&network_store,
575679
&logger_factory,
680+
log_store.clone(),
576681
);
577682

578683
let index_node_server = IndexNodeServer::new(

server/index-node/src/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ where
153153
max_first: u32::MAX,
154154
max_skip: u32::MAX,
155155
trace: false,
156+
log_store: Arc::new(graph::components::log_store::NoOpLogStore),
156157
};
157158
let (result, _) = execute_query(query_clone.cheap_clone(), None, None, options).await;
158159
query_clone.log_execution(0);

store/test-store/src/store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ async fn execute_subgraph_query_internal(
570570
error_policy,
571571
query.schema.id().clone(),
572572
graphql_metrics(),
573-
LOAD_MANAGER.clone()
573+
LOAD_MANAGER.clone(),
574574
)
575575
.await
576576
);
@@ -584,6 +584,7 @@ async fn execute_subgraph_query_internal(
584584
max_first: u32::MAX,
585585
max_skip: u32::MAX,
586586
trace,
587+
log_store: std::sync::Arc::new(graph::components::log_store::NoOpLogStore),
587588
},
588589
)
589590
.await;

0 commit comments

Comments
 (0)