|
| 1 | +use async_trait::async_trait; |
| 2 | +use reqwest::Client; |
| 3 | +use serde::Deserialize; |
| 4 | +use serde_json::json; |
| 5 | +use std::collections::HashMap; |
| 6 | +use std::time::Duration; |
| 7 | + |
| 8 | +use crate::log::elastic::ElasticLoggingConfig; |
| 9 | +use crate::prelude::DeploymentHash; |
| 10 | + |
| 11 | +use super::{LogEntry, LogMeta, LogQuery, LogStore, LogStoreError}; |
| 12 | + |
| 13 | +pub struct ElasticsearchLogStore { |
| 14 | + endpoint: String, |
| 15 | + username: Option<String>, |
| 16 | + password: Option<String>, |
| 17 | + client: Client, |
| 18 | + index: String, |
| 19 | + timeout: Duration, |
| 20 | +} |
| 21 | + |
| 22 | +impl ElasticsearchLogStore { |
| 23 | + pub fn new(config: ElasticLoggingConfig, index: String, timeout: Duration) -> Self { |
| 24 | + Self { |
| 25 | + endpoint: config.endpoint, |
| 26 | + username: config.username, |
| 27 | + password: config.password, |
| 28 | + client: config.client, |
| 29 | + index, |
| 30 | + timeout, |
| 31 | + } |
| 32 | + } |
| 33 | + |
| 34 | + fn build_query(&self, query: &LogQuery) -> serde_json::Value { |
| 35 | + let mut must_clauses = Vec::new(); |
| 36 | + |
| 37 | + // Filter by subgraph ID |
| 38 | + must_clauses.push(json!({ |
| 39 | + "term": { |
| 40 | + "subgraphId": query.subgraph_id.to_string() |
| 41 | + } |
| 42 | + })); |
| 43 | + |
| 44 | + // Filter by log level |
| 45 | + if let Some(level) = &query.level { |
| 46 | + must_clauses.push(json!({ |
| 47 | + "term": { |
| 48 | + "level": level.as_str() |
| 49 | + } |
| 50 | + })); |
| 51 | + } |
| 52 | + |
| 53 | + // Filter by time range |
| 54 | + if query.from.is_some() || query.to.is_some() { |
| 55 | + let mut range = serde_json::Map::new(); |
| 56 | + if let Some(from) = &query.from { |
| 57 | + range.insert("gte".to_string(), json!(from)); |
| 58 | + } |
| 59 | + if let Some(to) = &query.to { |
| 60 | + range.insert("lte".to_string(), json!(to)); |
| 61 | + } |
| 62 | + must_clauses.push(json!({ |
| 63 | + "range": { |
| 64 | + "timestamp": range |
| 65 | + } |
| 66 | + })); |
| 67 | + } |
| 68 | + |
| 69 | + // Filter by text search |
| 70 | + if let Some(search) = &query.search { |
| 71 | + must_clauses.push(json!({ |
| 72 | + "match": { |
| 73 | + "text": search |
| 74 | + } |
| 75 | + })); |
| 76 | + } |
| 77 | + |
| 78 | + json!({ |
| 79 | + "query": { |
| 80 | + "bool": { |
| 81 | + "must": must_clauses |
| 82 | + } |
| 83 | + }, |
| 84 | + "from": query.skip, |
| 85 | + "size": query.first, |
| 86 | + "sort": [ |
| 87 | + { "timestamp": { "order": "desc" } } |
| 88 | + ] |
| 89 | + }) |
| 90 | + } |
| 91 | + |
| 92 | + async fn execute_search( |
| 93 | + &self, |
| 94 | + query_body: serde_json::Value, |
| 95 | + ) -> Result<Vec<LogEntry>, LogStoreError> { |
| 96 | + let url = format!("{}/{}/_search", self.endpoint, self.index); |
| 97 | + |
| 98 | + let mut request = self |
| 99 | + .client |
| 100 | + .post(&url) |
| 101 | + .json(&query_body) |
| 102 | + .timeout(self.timeout); |
| 103 | + |
| 104 | + // Add basic auth if credentials provided |
| 105 | + if let (Some(username), Some(password)) = (&self.username, &self.password) { |
| 106 | + request = request.basic_auth(username, Some(password)); |
| 107 | + } |
| 108 | + |
| 109 | + let response = request.send().await.map_err(|e| { |
| 110 | + LogStoreError::QueryFailed( |
| 111 | + anyhow::Error::from(e).context("Elasticsearch request failed"), |
| 112 | + ) |
| 113 | + })?; |
| 114 | + |
| 115 | + if !response.status().is_success() { |
| 116 | + let status = response.status(); |
| 117 | + // Include response body in error context for debugging |
| 118 | + // The body is part of the error chain but not the main error message to avoid |
| 119 | + // leaking sensitive Elasticsearch internals in logs |
| 120 | + let body_text = response |
| 121 | + .text() |
| 122 | + .await |
| 123 | + .unwrap_or_else(|_| "<failed to read response body>".to_string()); |
| 124 | + return Err(LogStoreError::QueryFailed( |
| 125 | + anyhow::anyhow!("Elasticsearch query failed with status {}", status) |
| 126 | + .context(format!("Response body: {}", body_text)), |
| 127 | + )); |
| 128 | + } |
| 129 | + |
| 130 | + let response_body: ElasticsearchResponse = response.json().await.map_err(|e| { |
| 131 | + LogStoreError::QueryFailed( |
| 132 | + anyhow::Error::from(e).context( |
| 133 | + "failed to parse Elasticsearch search response: response format may have changed or be invalid", |
| 134 | + ), |
| 135 | + ) |
| 136 | + })?; |
| 137 | + |
| 138 | + let entries = response_body |
| 139 | + .hits |
| 140 | + .hits |
| 141 | + .into_iter() |
| 142 | + .filter_map(|hit| self.parse_log_entry(hit.source)) |
| 143 | + .collect(); |
| 144 | + |
| 145 | + Ok(entries) |
| 146 | + } |
| 147 | + |
| 148 | + fn parse_log_entry(&self, source: ElasticsearchLogDocument) -> Option<LogEntry> { |
| 149 | + let level = source.level.parse().ok()?; |
| 150 | + let subgraph_id = DeploymentHash::new(&source.subgraph_id).ok()?; |
| 151 | + |
| 152 | + // Convert arguments HashMap to Vec<(String, String)> |
| 153 | + let arguments: Vec<(String, String)> = source.arguments.into_iter().collect(); |
| 154 | + |
| 155 | + Some(LogEntry { |
| 156 | + id: source.id, |
| 157 | + subgraph_id, |
| 158 | + timestamp: source.timestamp, |
| 159 | + level, |
| 160 | + text: source.text, |
| 161 | + arguments, |
| 162 | + meta: LogMeta { |
| 163 | + module: source.meta.module, |
| 164 | + line: source.meta.line, |
| 165 | + column: source.meta.column, |
| 166 | + }, |
| 167 | + }) |
| 168 | + } |
| 169 | +} |
| 170 | + |
| 171 | +#[async_trait] |
| 172 | +impl LogStore for ElasticsearchLogStore { |
| 173 | + async fn query_logs(&self, query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError> { |
| 174 | + let query_body = self.build_query(&query); |
| 175 | + self.execute_search(query_body).await |
| 176 | + } |
| 177 | + |
| 178 | + fn is_available(&self) -> bool { |
| 179 | + true |
| 180 | + } |
| 181 | +} |
| 182 | + |
| 183 | +// Elasticsearch response types |
| 184 | +#[derive(Debug, Deserialize)] |
| 185 | +struct ElasticsearchResponse { |
| 186 | + hits: ElasticsearchHits, |
| 187 | +} |
| 188 | + |
| 189 | +#[derive(Debug, Deserialize)] |
| 190 | +struct ElasticsearchHits { |
| 191 | + hits: Vec<ElasticsearchHit>, |
| 192 | +} |
| 193 | + |
| 194 | +#[derive(Debug, Deserialize)] |
| 195 | +struct ElasticsearchHit { |
| 196 | + #[serde(rename = "_source")] |
| 197 | + source: ElasticsearchLogDocument, |
| 198 | +} |
| 199 | + |
| 200 | +#[derive(Debug, Deserialize)] |
| 201 | +struct ElasticsearchLogDocument { |
| 202 | + id: String, |
| 203 | + #[serde(rename = "subgraphId")] |
| 204 | + subgraph_id: String, |
| 205 | + timestamp: String, |
| 206 | + level: String, |
| 207 | + text: String, |
| 208 | + arguments: HashMap<String, String>, |
| 209 | + meta: ElasticsearchLogMeta, |
| 210 | +} |
| 211 | + |
| 212 | +#[derive(Debug, Deserialize)] |
| 213 | +struct ElasticsearchLogMeta { |
| 214 | + module: String, |
| 215 | + line: i64, |
| 216 | + column: i64, |
| 217 | +} |
0 commit comments