Skip to content

Commit 8c13b5f

Browse files
committed
graph,graphql,node,server,store,tests: Make subgraph logs queryable
1 parent dc48c6c commit 8c13b5f

19 files changed

Lines changed: 1147 additions & 16 deletions

File tree

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
use async_trait::async_trait;
2+
use reqwest::Client;
3+
use serde::Deserialize;
4+
use serde_json::json;
5+
use std::collections::HashMap;
6+
use std::time::Duration;
7+
8+
use crate::log::elastic::ElasticLoggingConfig;
9+
use crate::prelude::DeploymentHash;
10+
11+
use super::{LogEntry, LogLevel, LogMeta, LogQuery, LogStore, LogStoreError};
12+
13+
pub struct ElasticsearchLogStore {
14+
endpoint: String,
15+
username: Option<String>,
16+
password: Option<String>,
17+
client: Client,
18+
index: String,
19+
}
20+
21+
impl ElasticsearchLogStore {
22+
pub fn new(config: ElasticLoggingConfig, index: String) -> Self {
23+
Self {
24+
endpoint: config.endpoint,
25+
username: config.username,
26+
password: config.password,
27+
client: config.client,
28+
index,
29+
}
30+
}
31+
32+
fn build_query(&self, query: &LogQuery) -> serde_json::Value {
33+
let mut must_clauses = Vec::new();
34+
35+
// Filter by subgraph ID
36+
must_clauses.push(json!({
37+
"term": {
38+
"subgraphId": query.subgraph_id.to_string()
39+
}
40+
}));
41+
42+
// Filter by log level
43+
if let Some(level) = &query.level {
44+
must_clauses.push(json!({
45+
"term": {
46+
"level": level.as_str()
47+
}
48+
}));
49+
}
50+
51+
// Filter by time range
52+
if query.from.is_some() || query.to.is_some() {
53+
let mut range = serde_json::Map::new();
54+
if let Some(from) = &query.from {
55+
range.insert("gte".to_string(), json!(from));
56+
}
57+
if let Some(to) = &query.to {
58+
range.insert("lte".to_string(), json!(to));
59+
}
60+
must_clauses.push(json!({
61+
"range": {
62+
"timestamp": range
63+
}
64+
}));
65+
}
66+
67+
// Filter by text search
68+
if let Some(text) = &query.text {
69+
must_clauses.push(json!({
70+
"match": {
71+
"text": text
72+
}
73+
}));
74+
}
75+
76+
json!({
77+
"query": {
78+
"bool": {
79+
"must": must_clauses
80+
}
81+
},
82+
"from": query.skip,
83+
"size": query.first,
84+
"sort": [
85+
{ "timestamp": { "order": "desc" } }
86+
]
87+
})
88+
}
89+
90+
async fn execute_search(
91+
&self,
92+
query_body: serde_json::Value,
93+
) -> Result<Vec<LogEntry>, LogStoreError> {
94+
let url = format!("{}/{}/_search", self.endpoint, self.index);
95+
96+
let mut request = self
97+
.client
98+
.post(&url)
99+
.json(&query_body)
100+
.timeout(Duration::from_secs(10));
101+
102+
// Add basic auth if credentials provided
103+
if let (Some(username), Some(password)) = (&self.username, &self.password) {
104+
request = request.basic_auth(username, Some(password));
105+
}
106+
107+
let response = request.send().await.map_err(|e| {
108+
LogStoreError::QueryFailed(
109+
anyhow::Error::from(e).context("Elasticsearch request failed"),
110+
)
111+
})?;
112+
113+
if !response.status().is_success() {
114+
let status = response.status();
115+
// Do not include the response body in the error to avoid leaking
116+
// sensitive Elasticsearch internals
117+
return Err(LogStoreError::QueryFailed(anyhow::anyhow!(
118+
"Elasticsearch query failed with status {}",
119+
status
120+
)));
121+
}
122+
123+
let response_body: ElasticsearchResponse = response.json().await.map_err(|e| {
124+
LogStoreError::QueryFailed(
125+
anyhow::Error::from(e).context(
126+
"failed to parse Elasticsearch search response: response format may have changed or be invalid",
127+
),
128+
)
129+
})?;
130+
131+
let entries = response_body
132+
.hits
133+
.hits
134+
.into_iter()
135+
.filter_map(|hit| self.parse_log_entry(hit.source))
136+
.collect();
137+
138+
Ok(entries)
139+
}
140+
141+
fn parse_log_entry(&self, source: ElasticsearchLogDocument) -> Option<LogEntry> {
142+
let level = LogLevel::from_str(&source.level)?;
143+
let subgraph_id = DeploymentHash::new(&source.subgraph_id).ok()?;
144+
145+
// Convert arguments HashMap to Vec<(String, String)>
146+
let arguments: Vec<(String, String)> = source.arguments.into_iter().collect();
147+
148+
Some(LogEntry {
149+
id: source.id,
150+
subgraph_id,
151+
timestamp: source.timestamp,
152+
level,
153+
text: source.text,
154+
arguments,
155+
meta: LogMeta {
156+
module: source.meta.module,
157+
line: source.meta.line,
158+
column: source.meta.column,
159+
},
160+
})
161+
}
162+
}
163+
164+
#[async_trait]
165+
impl LogStore for ElasticsearchLogStore {
166+
async fn query_logs(&self, query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError> {
167+
let query_body = self.build_query(&query);
168+
self.execute_search(query_body).await
169+
}
170+
171+
fn is_available(&self) -> bool {
172+
true
173+
}
174+
}
175+
176+
// Elasticsearch response types
177+
#[derive(Debug, Deserialize)]
178+
struct ElasticsearchResponse {
179+
hits: ElasticsearchHits,
180+
}
181+
182+
#[derive(Debug, Deserialize)]
183+
struct ElasticsearchHits {
184+
hits: Vec<ElasticsearchHit>,
185+
}
186+
187+
#[derive(Debug, Deserialize)]
188+
struct ElasticsearchHit {
189+
#[serde(rename = "_source")]
190+
source: ElasticsearchLogDocument,
191+
}
192+
193+
#[derive(Debug, Deserialize)]
194+
struct ElasticsearchLogDocument {
195+
id: String,
196+
#[serde(rename = "subgraphId")]
197+
subgraph_id: String,
198+
timestamp: String,
199+
level: String,
200+
text: String,
201+
arguments: HashMap<String, String>,
202+
meta: ElasticsearchLogMeta,
203+
}
204+
205+
#[derive(Debug, Deserialize)]
206+
struct ElasticsearchLogMeta {
207+
module: String,
208+
line: i64,
209+
column: i64,
210+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
pub mod elasticsearch;
2+
3+
use async_trait::async_trait;
4+
use thiserror::Error;
5+
6+
use crate::prelude::DeploymentHash;
7+
8+
#[derive(Error, Debug)]
9+
pub enum LogStoreError {
10+
#[error("log store query failed: {0}")]
11+
QueryFailed(#[from] anyhow::Error),
12+
13+
#[error("log store is unavailable")]
14+
Unavailable,
15+
}
16+
17+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18+
pub enum LogLevel {
19+
Critical,
20+
Error,
21+
Warning,
22+
Info,
23+
Debug,
24+
}
25+
26+
impl LogLevel {
27+
pub fn as_str(&self) -> &'static str {
28+
match self {
29+
LogLevel::Critical => "critical",
30+
LogLevel::Error => "error",
31+
LogLevel::Warning => "warning",
32+
LogLevel::Info => "info",
33+
LogLevel::Debug => "debug",
34+
}
35+
}
36+
37+
pub fn from_str(s: &str) -> Option<Self> {
38+
match s.trim().to_lowercase().as_str() {
39+
"critical" => Some(LogLevel::Critical),
40+
"error" => Some(LogLevel::Error),
41+
"warning" => Some(LogLevel::Warning),
42+
"info" => Some(LogLevel::Info),
43+
"debug" => Some(LogLevel::Debug),
44+
_ => None,
45+
}
46+
}
47+
}
48+
49+
#[derive(Debug, Clone)]
50+
pub struct LogMeta {
51+
pub module: String,
52+
pub line: i64,
53+
pub column: i64,
54+
}
55+
56+
#[derive(Debug, Clone)]
57+
pub struct LogEntry {
58+
pub id: String,
59+
pub subgraph_id: DeploymentHash,
60+
pub timestamp: String,
61+
pub level: LogLevel,
62+
pub text: String,
63+
pub arguments: Vec<(String, String)>,
64+
pub meta: LogMeta,
65+
}
66+
67+
#[derive(Debug, Clone)]
68+
pub struct LogQuery {
69+
pub subgraph_id: DeploymentHash,
70+
pub level: Option<LogLevel>,
71+
pub from: Option<String>,
72+
pub to: Option<String>,
73+
pub text: Option<String>,
74+
pub first: u32,
75+
pub skip: u32,
76+
}
77+
78+
#[async_trait]
79+
pub trait LogStore: Send + Sync + 'static {
80+
async fn query_logs(&self, query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError>;
81+
fn is_available(&self) -> bool;
82+
}
83+
84+
/// A no-op LogStore that returns empty results
85+
/// Used when Elasticsearch is not configured
86+
pub struct NoOpLogStore;
87+
88+
#[async_trait]
89+
impl LogStore for NoOpLogStore {
90+
async fn query_logs(&self, _query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError> {
91+
Ok(vec![])
92+
}
93+
94+
fn is_available(&self) -> bool {
95+
false
96+
}
97+
}

graph/src/components/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ pub mod server;
5050
/// Components dealing with storing entities.
5151
pub mod store;
5252

53+
/// Components dealing with log storage.
54+
pub mod log_store;
55+
5356
pub mod link_resolver;
5457

5558
pub mod trigger_processor;

0 commit comments

Comments
 (0)