-
Notifications
You must be signed in to change notification settings - Fork 473
Expand file tree
/
Copy pathpropagation.py
More file actions
76 lines (57 loc) · 1.92 KB
/
propagation.py
File metadata and controls
76 lines (57 loc) · 1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""Context propagation helpers for OpenTelemetry tracing."""
from __future__ import annotations
import contextlib
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Generator
from opentelemetry.trace import Span
def inject_trace_context(carrier: dict[str, Any]) -> dict[str, Any]:
"""Inject current trace context into a carrier dict.
Use this to propagate trace context through SQS messages.
Parameters
----------
carrier : dict
Dictionary to inject trace context into.
Returns
-------
dict
Carrier with trace context injected.
Example
-------
message = {"data": "payload"}
message = inject_trace_context(message)
sqs.send_message(QueueUrl=url, MessageBody=json.dumps(message))
"""
from opentelemetry.propagate import inject
inject(carrier)
return carrier
@contextlib.contextmanager
def create_span_from_context(
name: str,
carrier: dict[str, Any],
**kwargs,
) -> Generator[Span, None, None]:
"""Create a span with parent context extracted from carrier.
Use this to continue a trace from an SQS message.
Parameters
----------
name : str
Span name.
carrier : dict
Dictionary containing trace context (e.g., SQS message body).
**kwargs
Additional arguments passed to start_as_current_span.
Example
-------
message = json.loads(record["body"])
with create_span_from_context("process_message", message) as span:
process(message["data"])
"""
from opentelemetry import trace
from opentelemetry.propagate import extract
ctx = extract(carrier)
tracer = trace.get_tracer("aws_lambda_powertools")
kwargs.setdefault("record_exception", True)
kwargs.setdefault("set_status_on_exception", True)
with tracer.start_as_current_span(name=name, context=ctx, **kwargs) as span:
yield span