Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/ingest/benches/ingest_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl BenchFixture {
batch_max_wait: Duration::from_millis(10),
export_concurrency_per_shard: 1,
export_max_attempts: 20,
clickhouse_export_timeout: Duration::from_secs(5),
clickhouse_breaker: ClickHouseBreakerConfig::default(),
datasources: DatasourceNames::defaults(),
datasource_session_replays: "session_replays".to_string(),
Expand Down
100 changes: 83 additions & 17 deletions apps/ingest/src/main.rs

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 OTLP signal path labels all process_decoded_payload errors as error_kind="forward"

In handle_signal_inner (main.rs:2400-2401), errors from process_decoded_payload — including native pipeline rejections (throttle, backpressure, WAL I/O) — are all mapped to error_kind="forward". This means ingest_requests_total{error_kind="forward"} conflates actual collector forward failures with native pipeline rejections. The same pre-existing issue exists in the Cloudflare logpush path (main.rs:2593). The PR doesn't change this mapping, but the status code shift (backpressure 503→429) makes it more important to distinguish these in metrics since the error_kind label no longer correlates 1:1 with a status bucket.

(Refers to lines 2400-2401)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use maple_ingest::otel::{build_resource, forward_client_span, ResourceConfig};
use maple_ingest::telemetry::{
AttributeMappingRule, ClickHouseBreakerConfig, ClickHouseTarget, ClickHouseTargetProvider,
DatasourceNames, ExportDestination, MappingOperation, MappingSourceContext, PipelineError,
SamplingPolicy, TelemetryPipeline, TinybirdConfig,
SamplingPolicy, TelemetryPipeline, TelemetrySignal, TinybirdConfig,
};
use moka::future::Cache;
use opentelemetry::trace::TracerProvider as _;
Expand Down Expand Up @@ -255,6 +255,11 @@ impl AppConfig {
std::env::var("INGEST_EXPORT_MAX_ATTEMPTS").ok(),
20,
)?,
clickhouse_export_timeout: Duration::from_millis(parse_u64(
"INGEST_CLICKHOUSE_EXPORT_TIMEOUT_MS",
std::env::var("INGEST_CLICKHOUSE_EXPORT_TIMEOUT_MS").ok(),
3_000,
)?),
clickhouse_breaker: ClickHouseBreakerConfig {
// 0 disables the breaker (full retry budget on every batch).
failure_threshold: parse_u32(
Expand Down Expand Up @@ -916,6 +921,33 @@ fn otel_status_for_rejection(status: u16) -> &'static str {
}
}

/// Map a telemetry pipeline rejection to the client-facing HTTP error.
///
/// Transient queue conditions (`Throttled` = per-org byte cap, `Backpressure` =
/// lane channel full) are retryable and surface as **429** — via
/// `otel_status_for_rejection` these become `Ok` spans, so a customer's slow
/// BYO-ClickHouse target (which backs the lane up) does not flood our error
/// dashboards as "Unknown Error" 503s. Genuine backend failures
/// (`QueueUnavailable` = WAL I/O, `Encode`) stay **503** and are labeled via
/// `otel.status_description` on the handler span.
///
/// Single source of truth for both the OTLP signal path and the session-replay
/// handlers, which previously diverged (the replay paths blanket-mapped every
/// variant to 503).
fn api_error_from_pipeline(error: &PipelineError) -> ApiError {
match error {
PipelineError::Throttled(_) => {
ApiError::too_many_requests("Ingest queue full for org, retry shortly")
}
PipelineError::Backpressure(_) => {
ApiError::too_many_requests("Ingest export lane full, retry shortly")
}
PipelineError::QueueUnavailable(_) | PipelineError::Encode(_) => {
ApiError::service_unavailable("Telemetry backend unavailable")
}
}
}

/// Resolve the deployment environment in maple's canonical priority order.
/// MAPLE_ENVIRONMENT is what apps/api/alchemy.run.ts and friends set via
/// resolveDeploymentEnvironment(stage); RAILWAY_ENVIRONMENT_NAME is Railway's
Expand Down Expand Up @@ -1645,6 +1677,7 @@ async fn handle_replay_meta(
otel.name = "POST /v1/sessionReplays/meta",
otel.kind = "server",
otel.status_code = tracing::field::Empty,
otel.status_description = tracing::field::Empty,
"http.request.method" = "POST",
"http.route" = "/v1/sessionReplays/meta",
"http.request.body.size" = body.len(),
Expand All @@ -1670,6 +1703,7 @@ async fn handle_replay_meta(
span_handle.record("http.response.status_code", status);
span_handle.record("error.type", error.error_kind());
span_handle.record("otel.status_code", otel_status_for_rejection(status));
span_handle.record("otel.status_description", error.message.as_str());
error.into_response()
}
}
Expand Down Expand Up @@ -1743,11 +1777,13 @@ async fn handle_replay_meta_inner(
&org_id,
state.config.tinybird.datasource_session_replays.clone(),
rows,
TelemetrySignal::SessionReplays,
destination,
)
.await
.map_err(|e| {
ApiError::service_unavailable(format!("failed to enqueue session metadata: {e}"))
warn!(org_id = %org_id, error = %e, "session metadata enqueue rejected");
api_error_from_pipeline(&e)
})?;

// Meter browser sessions to Autumn after the rows are safely enqueued, mirroring
Expand All @@ -1774,6 +1810,7 @@ async fn handle_session_events(
otel.name = "POST /v1/sessionEvents",
otel.kind = "server",
otel.status_code = tracing::field::Empty,
otel.status_description = tracing::field::Empty,
"http.request.method" = "POST",
"http.route" = "/v1/sessionEvents",
"http.request.body.size" = body.len(),
Expand All @@ -1799,6 +1836,7 @@ async fn handle_session_events(
span_handle.record("http.response.status_code", status);
span_handle.record("error.type", error.error_kind());
span_handle.record("otel.status_code", otel_status_for_rejection(status));
span_handle.record("otel.status_description", error.message.as_str());
error.into_response()
}
}
Expand Down Expand Up @@ -1859,11 +1897,13 @@ async fn handle_session_events_inner(
&org_id,
state.config.tinybird.datasource_session_events.clone(),
rows,
TelemetrySignal::SessionEvents,
destination,
)
.await
.map_err(|e| {
ApiError::service_unavailable(format!("failed to enqueue session events: {e}"))
warn!(org_id = %org_id, error = %e, "session events enqueue rejected");
api_error_from_pipeline(&e)
})?;
Ok(count)
}
Expand All @@ -1880,6 +1920,7 @@ async fn handle_replay_blob(
otel.name = "POST /v1/sessionReplays/blob",
otel.kind = "server",
otel.status_code = tracing::field::Empty,
otel.status_description = tracing::field::Empty,
"http.request.method" = "POST",
"http.route" = "/v1/sessionReplays/blob",
"http.request.body.size" = body.len(),
Expand All @@ -1905,6 +1946,7 @@ async fn handle_replay_blob(
span_handle.record("http.response.status_code", status);
span_handle.record("error.type", error.error_kind());
span_handle.record("otel.status_code", otel_status_for_rejection(status));
span_handle.record("otel.status_description", error.message.as_str());
error.into_response()
}
}
Expand Down Expand Up @@ -1991,11 +2033,13 @@ async fn handle_replay_blob_inner(
.datasource_session_replay_events
.clone(),
vec![serialized],
TelemetrySignal::SessionReplays,
destination,
)
.await
.map_err(|e| {
ApiError::service_unavailable(format!("failed to enqueue replay events: {e}"))
warn!(org_id = %org_id, error = %e, "session replay blob enqueue rejected");
api_error_from_pipeline(&e)
})?;
Ok(())
}
Expand Down Expand Up @@ -2039,6 +2083,7 @@ async fn handle_signal(
otel.name = %otel_name,
otel.kind = "server",
otel.status_code = tracing::field::Empty,
otel.status_description = tracing::field::Empty,
"http.request.method" = "POST",
"http.route" = %route,
"http.request.body.size" = body_bytes,
Expand Down Expand Up @@ -2087,6 +2132,7 @@ async fn handle_signal(
span_handle.record("http.response.status_code", status);
span_handle.record("error.type", error_kind);
span_handle.record("otel.status_code", otel_status_for_rejection(status));
span_handle.record("otel.status_description", error.message.as_str());
metrics::request_completed(signal.path(), "error", error_kind, duration.as_secs_f64());
error.into_response()
}
Expand All @@ -2113,6 +2159,7 @@ async fn handle_cloudflare_logpush(
otel.name = %otel_name,
otel.kind = "server",
otel.status_code = tracing::field::Empty,
otel.status_description = tracing::field::Empty,
"http.request.method" = "POST",
"http.route" = "/v1/logpush/cloudflare/http_requests/{connector_id}",
"http.request.body.size" = body_bytes,
Expand Down Expand Up @@ -2159,6 +2206,7 @@ async fn handle_cloudflare_logpush(
span_handle.record("http.response.status_code", status);
span_handle.record("error.type", error_kind);
span_handle.record("otel.status_code", otel_status_for_rejection(status));
span_handle.record("otel.status_description", error.message.as_str());
metrics::request_completed("logs", "error", error_kind, duration.as_secs_f64());
if error_kind == "auth" {
metrics::cloudflare_auth_failure("http_requests");
Expand Down Expand Up @@ -3138,7 +3186,7 @@ async fn forward_to_collector(
url = %url,
"Collector forwarding failed"
);
ApiError::service_unavailable("Telemetry backend unavailable")
ApiError::service_unavailable("Collector forwarding failed: transport error")
})?;

let forward_duration = forward_start.elapsed();
Expand Down Expand Up @@ -3176,7 +3224,7 @@ async fn forward_to_collector(
"Collector returned error"
);
return Err(ApiError::service_unavailable(
"Telemetry backend unavailable",
"Collector returned server error",
));
}

Expand Down Expand Up @@ -3286,17 +3334,7 @@ async fn accept_native_decoded_payload(
}
}
.map_err(|error| {
let api_error = match &error {
PipelineError::Throttled(_) => {
ApiError::too_many_requests("Per-org ingest queue limit exceeded")
}
PipelineError::Backpressure(_) => {
ApiError::service_unavailable("Telemetry backend unavailable")
}
PipelineError::QueueUnavailable(_) | PipelineError::Encode(_) => {
ApiError::service_unavailable("Telemetry backend unavailable")
}
};
let api_error = api_error_from_pipeline(&error);
error!(
error = %error,
signal = signal.path(),
Expand Down Expand Up @@ -4189,6 +4227,33 @@ mod tests {
);
}

#[test]
fn api_error_from_pipeline_maps_variants_to_status() {
// Transient queue conditions are retryable → 429 (classified Ok via
// otel_status_for_rejection, so a stalled BYO-ClickHouse target backing
// the lane up does not flood the error dashboards as "Unknown Error").
assert_eq!(
api_error_from_pipeline(&PipelineError::Throttled("x")).status,
StatusCode::TOO_MANY_REQUESTS
);
assert_eq!(
api_error_from_pipeline(&PipelineError::Backpressure("x")).status,
StatusCode::TOO_MANY_REQUESTS
);
// Genuine backend failures stay 503 (Error), now labeled via the span's
// otel.status_description rather than surfacing as "Unknown Error".
assert_eq!(
api_error_from_pipeline(&PipelineError::QueueUnavailable("x".into())).status,
StatusCode::SERVICE_UNAVAILABLE
);
assert_eq!(
api_error_from_pipeline(&PipelineError::Encode("x".into())).status,
StatusCode::SERVICE_UNAVAILABLE
);
assert_eq!(otel_status_for_rejection(429), "Ok");
assert_eq!(otel_status_for_rejection(503), "Error");
}

#[test]
fn sentinel_token_matches_only_exact_literal() {
assert!(is_sentinel_token("MAPLE_TEST"));
Expand Down Expand Up @@ -4663,6 +4728,7 @@ mod tests {
batch_max_wait: Duration::from_millis(1),
export_concurrency_per_shard: 1,
export_max_attempts: 1,
clickhouse_export_timeout: Duration::from_secs(5),
clickhouse_breaker: ClickHouseBreakerConfig::default(),
datasources: DatasourceNames::defaults(),
datasource_session_replays: "session_replays".to_string(),
Expand Down
21 changes: 21 additions & 0 deletions apps/ingest/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ static ORG_THROTTLED_TOTAL: LazyLock<Counter<u64>> = LazyLock::new(|| {
.build()
});

static BACKPRESSURE_SHED_TOTAL: LazyLock<Counter<u64>> = LazyLock::new(|| {
METER
.u64_counter("ingest_backpressure_shed_total")
.with_description("Frames shed because a lane's bounded export channel was full")
.build()
});

static CLOUDFLARE_BATCHES_TOTAL: LazyLock<Counter<u64>> = LazyLock::new(|| {
METER
.u64_counter("ingest_cloudflare_batches_total")
Expand Down Expand Up @@ -353,6 +360,20 @@ pub fn org_throttled(org_id: &str, reason: &'static str) {
);
}

/// A frame was shed because its export lane's bounded channel was full
/// (backpressure) — typically a stalled destination that cannot drain fast
/// enough. Counted per shed event, symmetric with `org_throttled`.
pub fn backpressure_shed(org_id: &str, destination: &str, signal: &str) {
BACKPRESSURE_SHED_TOTAL.add(
1,
&[
KeyValue::new("org_id", org_id.to_string()),
KeyValue::new("destination", destination.to_string()),
KeyValue::new("signal", signal.to_string()),
],
);
}

/// Current in-flight request count for an org.
pub fn org_requests_in_flight(org_id: &str, value: u64) {
ORG_REQUESTS_IN_FLIGHT.record(value, &[KeyValue::new("org_id", org_id.to_string())]);
Expand Down
Loading
Loading