diff --git a/apps/ingest/benches/ingest_bench.rs b/apps/ingest/benches/ingest_bench.rs index d8cfb900..a0da117e 100644 --- a/apps/ingest/benches/ingest_bench.rs +++ b/apps/ingest/benches/ingest_bench.rs @@ -108,6 +108,7 @@ impl BenchFixture { batch_max_wait: Duration::from_millis(10), export_concurrency_per_shard: 1, export_max_attempts: 20, + clickhouse_export_timeout: Duration::from_secs(5), clickhouse_breaker: ClickHouseBreakerConfig::default(), datasources: DatasourceNames::defaults(), datasource_session_replays: "session_replays".to_string(), diff --git a/apps/ingest/src/main.rs b/apps/ingest/src/main.rs index 5d6b06db..bb445b00 100644 --- a/apps/ingest/src/main.rs +++ b/apps/ingest/src/main.rs @@ -36,7 +36,7 @@ use maple_ingest::otel::{build_resource, forward_client_span, ResourceConfig}; use maple_ingest::telemetry::{ AttributeMappingRule, ClickHouseBreakerConfig, ClickHouseTarget, ClickHouseTargetProvider, DatasourceNames, ExportDestination, MappingOperation, MappingSourceContext, PipelineError, - SamplingPolicy, TelemetryPipeline, TinybirdConfig, + SamplingPolicy, TelemetryPipeline, TelemetrySignal, TinybirdConfig, }; use moka::future::Cache; use opentelemetry::trace::TracerProvider as _; @@ -255,6 +255,11 @@ impl AppConfig { std::env::var("INGEST_EXPORT_MAX_ATTEMPTS").ok(), 20, )?, + clickhouse_export_timeout: Duration::from_millis(parse_u64( + "INGEST_CLICKHOUSE_EXPORT_TIMEOUT_MS", + std::env::var("INGEST_CLICKHOUSE_EXPORT_TIMEOUT_MS").ok(), + 3_000, + )?), clickhouse_breaker: ClickHouseBreakerConfig { // 0 disables the breaker (full retry budget on every batch). failure_threshold: parse_u32( @@ -916,6 +921,33 @@ fn otel_status_for_rejection(status: u16) -> &'static str { } } +/// Map a telemetry pipeline rejection to the client-facing HTTP error. +/// +/// Transient queue conditions (`Throttled` = per-org byte cap, `Backpressure` = +/// lane channel full) are retryable and surface as **429** — via +/// `otel_status_for_rejection` these become `Ok` spans, so a customer's slow +/// BYO-ClickHouse target (which backs the lane up) does not flood our error +/// dashboards as "Unknown Error" 503s. Genuine backend failures +/// (`QueueUnavailable` = WAL I/O, `Encode`) stay **503** and are labeled via +/// `otel.status_description` on the handler span. +/// +/// Single source of truth for both the OTLP signal path and the session-replay +/// handlers, which previously diverged (the replay paths blanket-mapped every +/// variant to 503). +fn api_error_from_pipeline(error: &PipelineError) -> ApiError { + match error { + PipelineError::Throttled(_) => { + ApiError::too_many_requests("Ingest queue full for org, retry shortly") + } + PipelineError::Backpressure(_) => { + ApiError::too_many_requests("Ingest export lane full, retry shortly") + } + PipelineError::QueueUnavailable(_) | PipelineError::Encode(_) => { + ApiError::service_unavailable("Telemetry backend unavailable") + } + } +} + /// Resolve the deployment environment in maple's canonical priority order. /// MAPLE_ENVIRONMENT is what apps/api/alchemy.run.ts and friends set via /// resolveDeploymentEnvironment(stage); RAILWAY_ENVIRONMENT_NAME is Railway's @@ -1645,6 +1677,7 @@ async fn handle_replay_meta( otel.name = "POST /v1/sessionReplays/meta", otel.kind = "server", otel.status_code = tracing::field::Empty, + otel.status_description = tracing::field::Empty, "http.request.method" = "POST", "http.route" = "/v1/sessionReplays/meta", "http.request.body.size" = body.len(), @@ -1670,6 +1703,7 @@ async fn handle_replay_meta( span_handle.record("http.response.status_code", status); span_handle.record("error.type", error.error_kind()); span_handle.record("otel.status_code", otel_status_for_rejection(status)); + span_handle.record("otel.status_description", error.message.as_str()); error.into_response() } } @@ -1743,11 +1777,13 @@ async fn handle_replay_meta_inner( &org_id, state.config.tinybird.datasource_session_replays.clone(), rows, + TelemetrySignal::SessionReplays, destination, ) .await .map_err(|e| { - ApiError::service_unavailable(format!("failed to enqueue session metadata: {e}")) + warn!(org_id = %org_id, error = %e, "session metadata enqueue rejected"); + api_error_from_pipeline(&e) })?; // Meter browser sessions to Autumn after the rows are safely enqueued, mirroring @@ -1774,6 +1810,7 @@ async fn handle_session_events( otel.name = "POST /v1/sessionEvents", otel.kind = "server", otel.status_code = tracing::field::Empty, + otel.status_description = tracing::field::Empty, "http.request.method" = "POST", "http.route" = "/v1/sessionEvents", "http.request.body.size" = body.len(), @@ -1799,6 +1836,7 @@ async fn handle_session_events( span_handle.record("http.response.status_code", status); span_handle.record("error.type", error.error_kind()); span_handle.record("otel.status_code", otel_status_for_rejection(status)); + span_handle.record("otel.status_description", error.message.as_str()); error.into_response() } } @@ -1859,11 +1897,13 @@ async fn handle_session_events_inner( &org_id, state.config.tinybird.datasource_session_events.clone(), rows, + TelemetrySignal::SessionEvents, destination, ) .await .map_err(|e| { - ApiError::service_unavailable(format!("failed to enqueue session events: {e}")) + warn!(org_id = %org_id, error = %e, "session events enqueue rejected"); + api_error_from_pipeline(&e) })?; Ok(count) } @@ -1880,6 +1920,7 @@ async fn handle_replay_blob( otel.name = "POST /v1/sessionReplays/blob", otel.kind = "server", otel.status_code = tracing::field::Empty, + otel.status_description = tracing::field::Empty, "http.request.method" = "POST", "http.route" = "/v1/sessionReplays/blob", "http.request.body.size" = body.len(), @@ -1905,6 +1946,7 @@ async fn handle_replay_blob( span_handle.record("http.response.status_code", status); span_handle.record("error.type", error.error_kind()); span_handle.record("otel.status_code", otel_status_for_rejection(status)); + span_handle.record("otel.status_description", error.message.as_str()); error.into_response() } } @@ -1991,11 +2033,13 @@ async fn handle_replay_blob_inner( .datasource_session_replay_events .clone(), vec![serialized], + TelemetrySignal::SessionReplays, destination, ) .await .map_err(|e| { - ApiError::service_unavailable(format!("failed to enqueue replay events: {e}")) + warn!(org_id = %org_id, error = %e, "session replay blob enqueue rejected"); + api_error_from_pipeline(&e) })?; Ok(()) } @@ -2039,6 +2083,7 @@ async fn handle_signal( otel.name = %otel_name, otel.kind = "server", otel.status_code = tracing::field::Empty, + otel.status_description = tracing::field::Empty, "http.request.method" = "POST", "http.route" = %route, "http.request.body.size" = body_bytes, @@ -2087,6 +2132,7 @@ async fn handle_signal( span_handle.record("http.response.status_code", status); span_handle.record("error.type", error_kind); span_handle.record("otel.status_code", otel_status_for_rejection(status)); + span_handle.record("otel.status_description", error.message.as_str()); metrics::request_completed(signal.path(), "error", error_kind, duration.as_secs_f64()); error.into_response() } @@ -2113,6 +2159,7 @@ async fn handle_cloudflare_logpush( otel.name = %otel_name, otel.kind = "server", otel.status_code = tracing::field::Empty, + otel.status_description = tracing::field::Empty, "http.request.method" = "POST", "http.route" = "/v1/logpush/cloudflare/http_requests/{connector_id}", "http.request.body.size" = body_bytes, @@ -2159,6 +2206,7 @@ async fn handle_cloudflare_logpush( span_handle.record("http.response.status_code", status); span_handle.record("error.type", error_kind); span_handle.record("otel.status_code", otel_status_for_rejection(status)); + span_handle.record("otel.status_description", error.message.as_str()); metrics::request_completed("logs", "error", error_kind, duration.as_secs_f64()); if error_kind == "auth" { metrics::cloudflare_auth_failure("http_requests"); @@ -3138,7 +3186,7 @@ async fn forward_to_collector( url = %url, "Collector forwarding failed" ); - ApiError::service_unavailable("Telemetry backend unavailable") + ApiError::service_unavailable("Collector forwarding failed: transport error") })?; let forward_duration = forward_start.elapsed(); @@ -3176,7 +3224,7 @@ async fn forward_to_collector( "Collector returned error" ); return Err(ApiError::service_unavailable( - "Telemetry backend unavailable", + "Collector returned server error", )); } @@ -3286,17 +3334,7 @@ async fn accept_native_decoded_payload( } } .map_err(|error| { - let api_error = match &error { - PipelineError::Throttled(_) => { - ApiError::too_many_requests("Per-org ingest queue limit exceeded") - } - PipelineError::Backpressure(_) => { - ApiError::service_unavailable("Telemetry backend unavailable") - } - PipelineError::QueueUnavailable(_) | PipelineError::Encode(_) => { - ApiError::service_unavailable("Telemetry backend unavailable") - } - }; + let api_error = api_error_from_pipeline(&error); error!( error = %error, signal = signal.path(), @@ -4189,6 +4227,33 @@ mod tests { ); } + #[test] + fn api_error_from_pipeline_maps_variants_to_status() { + // Transient queue conditions are retryable → 429 (classified Ok via + // otel_status_for_rejection, so a stalled BYO-ClickHouse target backing + // the lane up does not flood the error dashboards as "Unknown Error"). + assert_eq!( + api_error_from_pipeline(&PipelineError::Throttled("x")).status, + StatusCode::TOO_MANY_REQUESTS + ); + assert_eq!( + api_error_from_pipeline(&PipelineError::Backpressure("x")).status, + StatusCode::TOO_MANY_REQUESTS + ); + // Genuine backend failures stay 503 (Error), now labeled via the span's + // otel.status_description rather than surfacing as "Unknown Error". + assert_eq!( + api_error_from_pipeline(&PipelineError::QueueUnavailable("x".into())).status, + StatusCode::SERVICE_UNAVAILABLE + ); + assert_eq!( + api_error_from_pipeline(&PipelineError::Encode("x".into())).status, + StatusCode::SERVICE_UNAVAILABLE + ); + assert_eq!(otel_status_for_rejection(429), "Ok"); + assert_eq!(otel_status_for_rejection(503), "Error"); + } + #[test] fn sentinel_token_matches_only_exact_literal() { assert!(is_sentinel_token("MAPLE_TEST")); @@ -4663,6 +4728,7 @@ mod tests { batch_max_wait: Duration::from_millis(1), export_concurrency_per_shard: 1, export_max_attempts: 1, + clickhouse_export_timeout: Duration::from_secs(5), clickhouse_breaker: ClickHouseBreakerConfig::default(), datasources: DatasourceNames::defaults(), datasource_session_replays: "session_replays".to_string(), diff --git a/apps/ingest/src/metrics.rs b/apps/ingest/src/metrics.rs index bdbb0b7e..66e67b78 100644 --- a/apps/ingest/src/metrics.rs +++ b/apps/ingest/src/metrics.rs @@ -41,6 +41,13 @@ static ORG_THROTTLED_TOTAL: LazyLock> = LazyLock::new(|| { .build() }); +static BACKPRESSURE_SHED_TOTAL: LazyLock> = LazyLock::new(|| { + METER + .u64_counter("ingest_backpressure_shed_total") + .with_description("Frames shed because a lane's bounded export channel was full") + .build() +}); + static CLOUDFLARE_BATCHES_TOTAL: LazyLock> = LazyLock::new(|| { METER .u64_counter("ingest_cloudflare_batches_total") @@ -353,6 +360,20 @@ pub fn org_throttled(org_id: &str, reason: &'static str) { ); } +/// A frame was shed because its export lane's bounded channel was full +/// (backpressure) — typically a stalled destination that cannot drain fast +/// enough. Counted per shed event, symmetric with `org_throttled`. +pub fn backpressure_shed(org_id: &str, destination: &str, signal: &str) { + BACKPRESSURE_SHED_TOTAL.add( + 1, + &[ + KeyValue::new("org_id", org_id.to_string()), + KeyValue::new("destination", destination.to_string()), + KeyValue::new("signal", signal.to_string()), + ], + ); +} + /// Current in-flight request count for an org. pub fn org_requests_in_flight(org_id: &str, value: u64) { ORG_REQUESTS_IN_FLIGHT.record(value, &[KeyValue::new("org_id", org_id.to_string())]); diff --git a/apps/ingest/src/telemetry.rs b/apps/ingest/src/telemetry.rs index f5cfed7f..b40d7d55 100644 --- a/apps/ingest/src/telemetry.rs +++ b/apps/ingest/src/telemetry.rs @@ -112,7 +112,11 @@ pub struct ClickHouseBreakerConfig { impl Default for ClickHouseBreakerConfig { fn default() -> Self { Self { - failure_threshold: 4, + // Trip after 2 consecutive failures (not 4) so a stalled BYO-ClickHouse + // target enters fast-shed quickly, keeping the single-threaded lane + // worker draining instead of holding batches through the retry budget + // (which backs the bounded channel up into accept-path 503s). + failure_threshold: 2, cooldown: Duration::from_secs(30), } } @@ -224,9 +228,30 @@ pub enum TelemetrySignal { Traces, Logs, Metrics, - /// Session-replay metadata + rrweb event rows (NDJSON written directly by + /// Session-replay metadata + rrweb event chunks (NDJSON written directly by /// the ingest gateway, not derived from OTLP). Events land in ClickHouse. SessionReplays, + /// Distilled session-event rows (NDJSON written directly by the gateway). + /// Carried separately from `SessionReplays` so per-signal metrics label it + /// as `session_events` (matching its `maple.signal` span attribute). + SessionEvents, +} + +impl TelemetrySignal { + /// Canonical lowercase label for metric/log dimensions, matching the HTTP + /// `Signal::path()` vocabulary ("traces"/"logs"/"metrics") and the + /// `maple.signal` span attribute ("session_replays"/"session_events") so + /// per-signal metrics correlate across the codebase. Prefer this over the + /// `Debug` derive, which yields PascalCase. + pub fn as_str(self) -> &'static str { + match self { + Self::Traces => "traces", + Self::Logs => "logs", + Self::Metrics => "metrics", + Self::SessionReplays => "session_replays", + Self::SessionEvents => "session_events", + } + } } /// Datasource (ClickHouse/Tinybird table) names the OTLP→NDJSON encoders write @@ -292,6 +317,12 @@ pub struct TinybirdConfig { pub batch_max_wait: Duration, pub export_concurrency_per_shard: usize, pub export_max_attempts: u32, + /// Per-attempt timeout for a direct ClickHouse export POST. Kept short and + /// separate from `forward_timeout` so a hanging BYO-ClickHouse target fails + /// fast and the breaker trips quickly — otherwise the single-threaded lane + /// worker holds the batch through the retry budget while new frames back the + /// bounded channel up into accept-path backpressure. + pub clickhouse_export_timeout: Duration, pub clickhouse_breaker: ClickHouseBreakerConfig, pub datasources: DatasourceNames, pub datasource_session_replays: String, @@ -644,8 +675,9 @@ impl TelemetryPipeline { org_id: &str, datasource: String, rows: Vec>, + signal: TelemetrySignal, ) -> Result { - self.accept_rows_to(org_id, datasource, rows, ExportDestination::Tinybird) + self.accept_rows_to(org_id, datasource, rows, signal, ExportDestination::Tinybird) .await } @@ -654,19 +686,14 @@ impl TelemetryPipeline { org_id: &str, datasource: String, rows: Vec>, + signal: TelemetrySignal, destination: ExportDestination, ) -> Result { let stats = AcceptStats { rows: rows.len(), dropped: 0, }; - let frames = rows_to_frames( - org_id, - hash64(org_id), - TelemetrySignal::SessionReplays, - datasource, - rows, - ); + let frames = rows_to_frames(org_id, hash64(org_id), signal, datasource, rows); self.commit_frames(frames, destination).await?; Ok(stats) } @@ -687,9 +714,17 @@ impl TelemetryPipeline { // cannot fill the Tinybird channel for the same shard (and vice versa). let lane = lane_index(shard, destination); let sender = self.inner.lane_senders[lane].clone(); - let permit = sender - .try_reserve_owned() - .map_err(|_| PipelineError::Backpressure("Telemetry queue is full"))?; + let permit = match sender.try_reserve_owned() { + Ok(permit) => permit, + Err(_) => { + metrics::backpressure_shed( + &frame.org_id, + frame.destination.as_str(), + frame.signal.as_str(), + ); + return Err(PipelineError::Backpressure("Telemetry queue is full")); + } + }; let queued_bytes = frame.payload.len() as u64; self.reserve_org_queue_bytes(&frame.org_id, queued_bytes)?; let (start, end) = self.inner.wal.append(lane, &frame).await.map_err(|error| { @@ -1232,6 +1267,7 @@ fn signal_tag(signal: TelemetrySignal) -> u8 { TelemetrySignal::Logs => 2, TelemetrySignal::Metrics => 3, TelemetrySignal::SessionReplays => 4, + TelemetrySignal::SessionEvents => 5, } } @@ -1241,6 +1277,7 @@ fn signal_from_tag(tag: u8) -> Option { 2 => Some(TelemetrySignal::Logs), 3 => Some(TelemetrySignal::Metrics), 4 => Some(TelemetrySignal::SessionReplays), + 5 => Some(TelemetrySignal::SessionEvents), _ => None, } } @@ -1622,6 +1659,7 @@ impl ExportWorker { let mut request = self .http .post(request_url) + .timeout(self.cfg.clickhouse_export_timeout) .header("X-ClickHouse-User", target.user.as_str()) .header("X-ClickHouse-Database", target.database.as_str()) .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson") @@ -2578,6 +2616,7 @@ mod tests { batch_max_wait: Duration::from_millis(10), export_concurrency_per_shard: 1, export_max_attempts: 20, + clickhouse_export_timeout: Duration::from_secs(5), clickhouse_breaker: ClickHouseBreakerConfig::default(), datasources: DatasourceNames::defaults(), datasource_session_replays: "session_replays".to_string(), @@ -3529,6 +3568,42 @@ mod tests { let _ = std::fs::remove_dir_all(queue_dir); } + #[test] + fn telemetry_signal_as_str_is_canonical_lowercase() { + // Metric/log dimensions must match the lowercase `signal.path()` / + // `maple.signal` vocabulary so `ingest_backpressure_shed_total` correlates + // with other per-signal metrics (Debug derive would give PascalCase). + assert_eq!(TelemetrySignal::Traces.as_str(), "traces"); + assert_eq!(TelemetrySignal::Logs.as_str(), "logs"); + assert_eq!(TelemetrySignal::Metrics.as_str(), "metrics"); + assert_eq!(TelemetrySignal::SessionReplays.as_str(), "session_replays"); + assert_eq!(TelemetrySignal::SessionEvents.as_str(), "session_events"); + } + + #[test] + fn signal_tag_round_trips_all_variants() { + // WAL on-disk format: every signal must survive tag encode/decode so + // committed frames replay to the right signal after a restart. + for signal in [ + TelemetrySignal::Traces, + TelemetrySignal::Logs, + TelemetrySignal::Metrics, + TelemetrySignal::SessionReplays, + TelemetrySignal::SessionEvents, + ] { + assert_eq!(signal_from_tag(signal_tag(signal)), Some(signal)); + } + } + + #[test] + fn breaker_default_failure_threshold_is_two() { + // Lowered from 4 → 2 so a stalled BYO-ClickHouse target enters fast-shed + // sooner, keeping the single-threaded lane worker draining instead of + // holding batches through the retry budget (which backs the bounded + // channel up into accept-path backpressure 503s). + assert_eq!(ClickHouseBreakerConfig::default().failure_threshold, 2); + } + #[test] fn clickhouse_breaker_trips_sheds_and_recovers() { let registry = ClickHouseBreakerRegistry::new(ClickHouseBreakerConfig {