Skip to content

Commit efe3b5f

Browse files
feat(ai): add OpenTelemetry integration for AI span export
Add PostHogSpanProcessor and PostHogTraceExporter that filter AI-related OTel spans and forward them to PostHog's OTLP endpoint. Generated-By: PostHog Code Task-Id: 1ba1f07a-1453-4162-90a8-665958c5fe46
1 parent ecd197a commit efe3b5f

10 files changed

Lines changed: 599 additions & 3 deletions

File tree

posthog/ai/otel/__init__.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""PostHog OpenTelemetry integration for AI tracing.
2+
3+
Provides components to route AI-related OpenTelemetry spans to PostHog's
4+
OTLP endpoint. Only spans matching known AI semantic convention prefixes
5+
(gen_ai, llm, ai, traceloop) are forwarded; all other spans are silently
6+
dropped.
7+
8+
Two integration patterns are supported:
9+
10+
1. **PostHogSpanProcessor** (recommended) - Self-contained processor that
11+
handles batching and export internally::
12+
13+
provider = TracerProvider()
14+
provider.add_span_processor(
15+
PostHogSpanProcessor(api_key="phc_...")
16+
)
17+
18+
2. **PostHogTraceExporter** - Exporter for use with your own
19+
BatchSpanProcessor or frameworks that only accept a SpanExporter::
20+
21+
provider = TracerProvider()
22+
provider.add_span_processor(
23+
BatchSpanProcessor(
24+
PostHogTraceExporter(api_key="phc_...")
25+
)
26+
)
27+
"""
28+
29+
from posthog.ai.otel.exporter import PostHogTraceExporter
30+
from posthog.ai.otel.processor import PostHogSpanProcessor
31+
from posthog.ai.otel.spans import is_ai_span
32+
33+
__all__ = ["PostHogSpanProcessor", "PostHogTraceExporter", "is_ai_span"]

posthog/ai/otel/exporter.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""PostHog trace exporter for OpenTelemetry.
2+
3+
Provides a SpanExporter that filters AI-related spans before forwarding them
4+
to PostHog's OTLP endpoint. Use this when your setup only accepts a
5+
SpanExporter (e.g. as an argument to BatchSpanProcessor).
6+
"""
7+
8+
from typing import Optional, Sequence
9+
10+
from opentelemetry.sdk.trace import ReadableSpan
11+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
12+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
13+
14+
from .spans import is_ai_span
15+
16+
DEFAULT_HOST = "https://us.i.posthog.com"
17+
18+
19+
class PostHogTraceExporter(SpanExporter):
20+
"""Span exporter that filters AI spans and forwards them to PostHog.
21+
22+
Wraps an OTLPSpanExporter configured for PostHog's OTLP endpoint. Spans
23+
that are not AI-related are silently dropped, returning SUCCESS immediately.
24+
25+
Usage::
26+
27+
from opentelemetry.sdk.trace import TracerProvider
28+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
29+
from posthog.ai.otel import PostHogTraceExporter
30+
31+
provider = TracerProvider()
32+
provider.add_span_processor(
33+
BatchSpanProcessor(
34+
PostHogTraceExporter(api_key="phc_...")
35+
)
36+
)
37+
"""
38+
39+
def __init__(
40+
self,
41+
api_key: str,
42+
host: str = DEFAULT_HOST,
43+
):
44+
"""
45+
Args:
46+
api_key: PostHog project API key.
47+
host: PostHog host URL. Defaults to US cloud.
48+
"""
49+
self._api_key = api_key
50+
self._host = host.rstrip("/")
51+
52+
self._exporter = OTLPSpanExporter(
53+
endpoint=f"{self._host}/v1/traces",
54+
headers={"Authorization": f"Bearer {self._api_key}"},
55+
)
56+
57+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
58+
ai_spans = [span for span in spans if is_ai_span(span)]
59+
if not ai_spans:
60+
return SpanExportResult.SUCCESS
61+
return self._exporter.export(ai_spans)
62+
63+
def shutdown(self) -> None:
64+
self._exporter.shutdown()
65+
66+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
67+
if timeout_millis is not None:
68+
return self._exporter.force_flush(timeout_millis)
69+
return self._exporter.force_flush()

posthog/ai/otel/processor.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""PostHog span processor for OpenTelemetry.
2+
3+
Provides a self-contained SpanProcessor that filters AI-related spans and
4+
exports them to PostHog's OTLP endpoint. This is the recommended integration
5+
for setups using TracerProvider.add_span_processor().
6+
"""
7+
8+
from typing import Optional
9+
10+
from opentelemetry.context import Context
11+
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
12+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
13+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
14+
15+
from .spans import is_ai_span
16+
17+
DEFAULT_HOST = "https://us.i.posthog.com"
18+
19+
20+
class PostHogSpanProcessor(SpanProcessor):
21+
"""Span processor that filters AI spans and exports them to PostHog.
22+
23+
Wraps a BatchSpanProcessor and OTLPSpanExporter internally, configured
24+
to send to PostHog's OTLP traces endpoint. Only spans identified as
25+
AI-related (by name or attribute prefix) are forwarded for export.
26+
27+
Usage::
28+
29+
from opentelemetry.sdk.trace import TracerProvider
30+
from posthog.ai.otel import PostHogSpanProcessor
31+
32+
provider = TracerProvider()
33+
provider.add_span_processor(
34+
PostHogSpanProcessor(api_key="phc_...")
35+
)
36+
"""
37+
38+
def __init__(
39+
self,
40+
api_key: str,
41+
host: str = DEFAULT_HOST,
42+
):
43+
"""
44+
Args:
45+
api_key: PostHog project API key.
46+
host: PostHog host URL. Defaults to US cloud.
47+
"""
48+
self._api_key = api_key
49+
self._host = host.rstrip("/")
50+
51+
exporter = OTLPSpanExporter(
52+
endpoint=f"{self._host}/v1/traces",
53+
headers={"Authorization": f"Bearer {self._api_key}"},
54+
)
55+
self._processor = BatchSpanProcessor(exporter)
56+
57+
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
58+
pass
59+
60+
def on_end(self, span: ReadableSpan) -> None:
61+
if not is_ai_span(span):
62+
return
63+
self._processor.on_end(span)
64+
65+
def shutdown(self) -> None:
66+
self._processor.shutdown()
67+
68+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
69+
if timeout_millis is not None:
70+
return self._processor.force_flush(timeout_millis)
71+
return self._processor.force_flush()

posthog/ai/otel/spans.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""Shared AI span filtering logic for OpenTelemetry integration."""
2+
3+
from typing import TYPE_CHECKING
4+
5+
if TYPE_CHECKING:
6+
from opentelemetry.sdk.trace import ReadableSpan
7+
8+
AI_SPAN_PREFIXES = ("gen_ai.", "llm.", "ai.", "traceloop.")
9+
10+
11+
def is_ai_span(span: "ReadableSpan") -> bool:
12+
"""Check if a span is AI-related by examining its name and attribute keys.
13+
14+
Matches spans whose name or any attribute key starts with one of the
15+
known AI semantic convention prefixes (gen_ai, llm, ai, traceloop).
16+
"""
17+
name = span.name
18+
if any(name.startswith(prefix) for prefix in AI_SPAN_PREFIXES):
19+
return True
20+
21+
attributes = span.attributes or {}
22+
for key in attributes:
23+
if any(key.startswith(prefix) for prefix in AI_SPAN_PREFIXES):
24+
return True
25+
26+
return False

posthog/test/ai/otel/__init__.py

Whitespace-only changes.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import unittest
2+
from unittest.mock import MagicMock, patch
3+
4+
from opentelemetry.sdk.trace.export import SpanExportResult
5+
6+
from posthog.ai.otel.exporter import PostHogTraceExporter
7+
8+
9+
def _make_span(name: str = "test", attributes: dict | None = None) -> MagicMock:
10+
span = MagicMock()
11+
span.name = name
12+
span.attributes = attributes or {}
13+
return span
14+
15+
16+
class TestPostHogTraceExporter(unittest.TestCase):
17+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
18+
def test_configures_exporter_with_correct_endpoint(self, mock_otlp_cls):
19+
PostHogTraceExporter(api_key="phc_test123")
20+
mock_otlp_cls.assert_called_once_with(
21+
endpoint="https://us.i.posthog.com/v1/traces",
22+
headers={"Authorization": "Bearer phc_test123"},
23+
)
24+
25+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
26+
def test_configures_custom_host(self, mock_otlp_cls):
27+
PostHogTraceExporter(api_key="phc_test", host="https://eu.i.posthog.com")
28+
mock_otlp_cls.assert_called_once_with(
29+
endpoint="https://eu.i.posthog.com/v1/traces",
30+
headers={"Authorization": "Bearer phc_test"},
31+
)
32+
33+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
34+
def test_exports_ai_spans(self, mock_otlp_cls):
35+
exporter = PostHogTraceExporter(api_key="phc_test")
36+
inner = mock_otlp_cls.return_value
37+
inner.export.return_value = SpanExportResult.SUCCESS
38+
39+
spans = [_make_span("gen_ai.chat"), _make_span("llm.call")]
40+
result = exporter.export(spans)
41+
42+
self.assertEqual(result, SpanExportResult.SUCCESS)
43+
inner.export.assert_called_once_with(spans)
44+
45+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
46+
def test_filters_out_non_ai_spans(self, mock_otlp_cls):
47+
exporter = PostHogTraceExporter(api_key="phc_test")
48+
inner = mock_otlp_cls.return_value
49+
inner.export.return_value = SpanExportResult.SUCCESS
50+
51+
ai_span = _make_span("gen_ai.chat")
52+
http_span = _make_span("http.request")
53+
result = exporter.export([ai_span, http_span])
54+
55+
self.assertEqual(result, SpanExportResult.SUCCESS)
56+
inner.export.assert_called_once_with([ai_span])
57+
58+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
59+
def test_returns_success_when_no_ai_spans(self, mock_otlp_cls):
60+
exporter = PostHogTraceExporter(api_key="phc_test")
61+
inner = mock_otlp_cls.return_value
62+
63+
result = exporter.export([_make_span("http.request"), _make_span("db.query")])
64+
65+
self.assertEqual(result, SpanExportResult.SUCCESS)
66+
inner.export.assert_not_called()
67+
68+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
69+
def test_returns_success_for_empty_batch(self, mock_otlp_cls):
70+
exporter = PostHogTraceExporter(api_key="phc_test")
71+
inner = mock_otlp_cls.return_value
72+
73+
result = exporter.export([])
74+
75+
self.assertEqual(result, SpanExportResult.SUCCESS)
76+
inner.export.assert_not_called()
77+
78+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
79+
def test_exports_spans_with_ai_attributes(self, mock_otlp_cls):
80+
exporter = PostHogTraceExporter(api_key="phc_test")
81+
inner = mock_otlp_cls.return_value
82+
inner.export.return_value = SpanExportResult.SUCCESS
83+
84+
span = _make_span("http.request", {"gen_ai.system": "openai"})
85+
result = exporter.export([span])
86+
87+
self.assertEqual(result, SpanExportResult.SUCCESS)
88+
inner.export.assert_called_once_with([span])
89+
90+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
91+
def test_shutdown_delegates(self, mock_otlp_cls):
92+
exporter = PostHogTraceExporter(api_key="phc_test")
93+
inner = mock_otlp_cls.return_value
94+
95+
exporter.shutdown()
96+
inner.shutdown.assert_called_once()
97+
98+
@patch("posthog.ai.otel.exporter.OTLPSpanExporter")
99+
def test_force_flush_delegates(self, mock_otlp_cls):
100+
exporter = PostHogTraceExporter(api_key="phc_test")
101+
inner = mock_otlp_cls.return_value
102+
103+
exporter.force_flush(timeout_millis=5000)
104+
inner.force_flush.assert_called_once_with(5000)

0 commit comments

Comments
 (0)