diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 2989ef216..3fac0a84d 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -1664,6 +1664,22 @@ def _parse_usage_model(usage: Union[pydantic.BaseModel, dict]) -> Any: 0, usage_model[f"input_modality_{item['modality']}"] - value ) + # Anthropic extended prompt caching: cache_creation is a dict keyed by cache tier. + # Example: {"ephemeral_1h_input_tokens": 500, "ephemeral_5m_input_tokens": 0} + # Flatten into individual keys and expose an aggregated total that mirrors the + # legacy cache_creation_input_tokens field for backward-compatible cost tracking. + if "cache_creation" in usage_model and isinstance( + usage_model["cache_creation"], dict + ): + cache_creation = usage_model.pop("cache_creation") + total = 0 + for tier_key, tier_val in cache_creation.items(): + if isinstance(tier_val, int): + usage_model[f"cache_creation_{tier_key}"] = tier_val + total += tier_val + if total > 0: + usage_model.setdefault("cache_creation_input_tokens", total) + usage_model = {k: v for k, v in usage_model.items() if isinstance(v, int)} return usage_model if usage_model else None diff --git a/tests/unit/test_parse_usage_model.py b/tests/unit/test_parse_usage_model.py index 764d6132b..071c5089b 100644 --- a/tests/unit/test_parse_usage_model.py +++ b/tests/unit/test_parse_usage_model.py @@ -1,6 +1,78 @@ from langfuse.langchain.CallbackHandler import _parse_usage_model +def test_anthropic_cache_creation_dict_flattened(): + """Anthropic extended caching: cache_creation dict is flattened into per-tier keys + and an aggregated cache_creation_input_tokens total is added.""" + usage = { + "input_tokens": 9454, + "output_tokens": 380, + "cache_read_input_tokens": 0, + "cache_creation": { + "ephemeral_1h_input_tokens": 500, + "ephemeral_5m_input_tokens": 200, + }, + } + result = _parse_usage_model(usage) + + # Core fields survive + assert result["input"] == 9454 + assert result["output"] == 380 + assert result["cache_read_input_tokens"] == 0 + + # Per-tier keys are present and individually correct + assert result["cache_creation_ephemeral_1h_input_tokens"] == 500 + assert result["cache_creation_ephemeral_5m_input_tokens"] == 200 + + # Aggregated total equals sum of all tiers + assert result["cache_creation_input_tokens"] == 700 + + # The original nested dict must not be present + assert "cache_creation" not in result + + +def test_anthropic_cache_creation_all_zeros_no_aggregate(): + """When all cache_creation tier values are zero no aggregate key is added + (avoids noise in traces where caching did not fire).""" + usage = { + "input_tokens": 100, + "output_tokens": 50, + "cache_creation": { + "ephemeral_1h_input_tokens": 0, + "ephemeral_5m_input_tokens": 0, + }, + } + result = _parse_usage_model(usage) + + assert result["input"] == 100 + assert result["output"] == 50 + # Per-tier zero keys are still stored + assert result["cache_creation_ephemeral_1h_input_tokens"] == 0 + assert result["cache_creation_ephemeral_5m_input_tokens"] == 0 + # No aggregate added when total is zero + assert "cache_creation_input_tokens" not in result + assert "cache_creation" not in result + + +def test_anthropic_cache_creation_legacy_field_not_overwritten(): + """If both the legacy cache_creation_input_tokens (int) and the new cache_creation + (dict) are present, the legacy value is preserved and the dict total is not added.""" + usage = { + "input_tokens": 100, + "output_tokens": 50, + "cache_creation_input_tokens": 999, # legacy field already present; intentionally != tier sum (300) + "cache_creation": { + "ephemeral_1h_input_tokens": 200, + "ephemeral_5m_input_tokens": 100, + }, + } + result = _parse_usage_model(usage) + + # setdefault must not overwrite the existing legacy value + assert result["cache_creation_input_tokens"] == 999 + assert "cache_creation" not in result + + def test_standard_tier_input_token_details(): """Standard tier: audio and cache_read are subtracted from input.""" usage = {