Skip to content

Commit 74ecf0f

Browse files
s3riusCopilot
andauthored
Added otlp for JS consumers and publishing. (#48)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
1 parent 9928961 commit 74ecf0f

25 files changed

Lines changed: 1088 additions & 77 deletions

examples/consumers.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,31 @@ async def main() -> None:
4141
# We publish a single message
4242
await js.publish("stream.example.test", "message for stream")
4343

44-
# We use messages() to get async iterator which we
45-
# use to get messages for push_consumer.
46-
async for push_message in await push_consumer.messages():
47-
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
48-
await push_message.ack()
49-
break
44+
async with push_consumer.consume() as messages:
45+
async for push_message in messages:
46+
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
47+
break
5048

51-
# Pull consumers have to request batches of messages.
49+
# Pull consumers have 2 different APIs.
50+
# 1. You can use fetch directly.
51+
# 2. Use async iterator API.
52+
53+
# Here's how to call pull-consumer fetch method.
54+
# It returns a batch of messages.
55+
# However, please be careful, this method has worse opentelemetry
56+
# instrumentation. Because essentailly it's the same as just calling a function.
57+
# with no scope.
5258
for pull_message in await pull_consumer.fetch(max_messages=10):
5359
print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201
5460
await pull_message.ack()
5561

62+
# This API is more prefered, because it has better
63+
# Opentelemetry instrumentation.
64+
async with pull_consumer.consume() as messages:
65+
async for message in messages:
66+
print(f"[FROM_PULL] {message.payload!r}") # noqa: T201
67+
break
68+
5669
# Cleanup
5770
await stream.consumers.delete(push_consumer.name)
5871
await stream.consumers.delete(pull_consumer.name)

examples/opentelemetry.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from natsrpy.instrumentation import NatsrpyInstrumentor
2+
3+
NatsrpyInstrumentor().instrument(
4+
# If true, then message payload will be attached
5+
# to some spans.
6+
capture_body=False,
7+
# If true, then message headers will be attached
8+
# to some spans.
9+
capture_headers=False,
10+
)
11+
12+
# We also support zero-code instrumentation.
13+
# In case if you're using it, you can specify those parameters
14+
# by setting the following environment variables:
15+
# * `OTEL_PYTHON_NATSRPY_CAPTURE_BODY=true`
16+
# * `OTEL_PYTHON_NATSRPY_CAPTURE_HEADERS=true`

python/natsrpy/_natsrpy_rs/__init__.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class SubscriptionCtxManager(Generic[_T]):
102102
"""
103103

104104
def __aenter__(self) -> Future[_T]: ...
105-
async def __aexit__(
105+
def __aexit__(
106106
self,
107107
_exc_type: type[BaseException] | None = None,
108108
_exc_val: BaseException | None = None,

python/natsrpy/_natsrpy_rs/js/consumers.pyi

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from asyncio import Future
22
from datetime import timedelta
3+
from types import TracebackType
34
from typing import final
45

56
from natsrpy._natsrpy_rs.js import JetStreamMessage
@@ -12,8 +13,11 @@ __all__ = [
1213
"PriorityPolicy",
1314
"PullConsumer",
1415
"PullConsumerConfig",
16+
"PullConsumerContextManager",
17+
"PullConsumerFetcher",
1518
"PushConsumer",
1619
"PushConsumerConfig",
20+
"PushConsumerContextManager",
1721
"ReplayPolicy",
1822
]
1923

@@ -283,6 +287,28 @@ class MessagesIterator:
283287
def __aiter__(self) -> Self: ...
284288
def __anext__(self) -> Future[JetStreamMessage]: ...
285289

290+
@final
291+
class PushConsumerContextManager:
292+
"""
293+
Context manager for consuming messages from push-based consumer.
294+
295+
This class is used to scope the message consumption.
296+
Mostly used for opentelemetry support.
297+
"""
298+
299+
def __aenter__(self) -> Future[MessagesIterator]:
300+
"""Get an async iterator for consuming messages.
301+
302+
:return: an async iterator over JetStream messages.
303+
"""
304+
305+
def __aexit__(
306+
self,
307+
_exc_type: type[BaseException] | None = None,
308+
_exc_val: BaseException | None = None,
309+
_exc_tb: TracebackType | None = None,
310+
) -> Future[None]: ...
311+
286312
@final
287313
class PushConsumer:
288314
"""A push-based JetStream consumer.
@@ -298,11 +324,26 @@ class PushConsumer:
298324
def stream_name(self) -> str:
299325
"""Get stream name that this consumer attached to."""
300326

301-
def messages(self) -> Future[MessagesIterator]:
302-
"""Get an async iterator for consuming messages.
327+
def consume(self) -> PushConsumerContextManager:
328+
"""Start consuming messages."""
303329

304-
:return: an async iterator over JetStream messages.
305-
"""
330+
@final
331+
class PullConsumerFetcher:
332+
def __aiter__(self) -> Self:
333+
"""Returns this very object."""
334+
335+
def __anext__(self) -> Future[JetStreamMessage]:
336+
"""Get a next message from the stream."""
337+
338+
@final
339+
class PullConsumerContextManager:
340+
def __aenter__(self) -> Future[PullConsumerFetcher]: ...
341+
def __aexit__(
342+
self,
343+
_exc_type: type[BaseException] | None = None,
344+
_exc_val: BaseException | None = None,
345+
_exc_tb: TracebackType | None = None,
346+
) -> Future[None]: ...
306347

307348
@final
308349
class PullConsumer:
@@ -319,6 +360,9 @@ class PullConsumer:
319360
def stream_name(self) -> str:
320361
"""Get stream name that this consumer attached to."""
321362

363+
def consume(self) -> PullConsumerContextManager:
364+
"""Start consuming messages."""
365+
322366
def fetch(
323367
self,
324368
max_messages: int | None = None,

python/natsrpy/_natsrpy_rs/js/kv.pyi

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ class KVConfig:
122122
"""
123123

124124
bucket: str
125-
description: str
125+
description: str | None
126126
max_value_size: int | None
127127
history: int | None
128-
max_age: float | None
128+
max_age: timedelta | None
129129
max_bytes: int | None
130130
storage: StorageType | None
131131
num_replicas: int | None
@@ -135,7 +135,7 @@ class KVConfig:
135135
mirror_direct: bool | None
136136
compression: bool | None
137137
placement: Placement | None
138-
limit_markers: float | None
138+
limit_markers: timedelta | None
139139

140140
def __new__(
141141
cls,

python/natsrpy/_natsrpy_rs/js/stream.pyi

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ class SubjectTransform:
132132
source: str
133133
destination: str
134134

135-
def __new__(cls, source: str, destination: str) -> Self: ...
136-
137135
@final
138136
class Source:
139137
"""Configuration for a stream source or mirror origin.
@@ -454,7 +452,7 @@ class StreamInfo:
454452
"""
455453

456454
config: StreamConfig
457-
created: float
455+
created: int
458456
state: StreamState
459457
cluster: ClusterInfo | None
460458
mirror: SourceInfo | None

python/natsrpy/instrumentation/__init__.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ async def main() -> None:
3333
"""
3434

3535
import logging
36+
import os
3637
from collections.abc import Collection
3738
from importlib import metadata
3839
from typing import Any
3940

41+
from .js_consumer import JSConsumerInstrumentation
42+
from .js_publish import JSPublishInstrumentation
4043
from .nats_core import NatsCoreInstrumentator
4144

4245
try:
@@ -68,12 +71,42 @@ def instrumentation_dependencies(self) -> Collection[str]:
6871

6972
def _instrument(self, **kwargs: Any) -> None:
7073
tracer_provider = kwargs.get("tracer_provider")
74+
capture_body = (
75+
os.environ.get(
76+
"OTEL_PYTHON_NATSRPY_CAPTURE_BODY",
77+
str(kwargs.get("capture_body", False)),
78+
).lower()
79+
== "true"
80+
)
81+
capture_headers = (
82+
os.environ.get(
83+
"OTEL_PYTHON_NATSRPY_CAPTURE_HEADERS",
84+
str(kwargs.get("capture_headers", False)),
85+
).lower()
86+
== "true"
87+
)
7188
tracer = trace.get_tracer(
7289
_INSTRUMENTATION_MODULE_NAME,
7390
metadata.version("natsrpy"),
7491
tracer_provider,
7592
)
76-
NatsCoreInstrumentator(tracer).instrument()
93+
NatsCoreInstrumentator(
94+
tracer,
95+
capture_body=capture_body,
96+
capture_headers=capture_headers,
97+
).instrument()
98+
JSConsumerInstrumentation(
99+
tracer,
100+
capture_body=capture_body,
101+
capture_headers=capture_headers,
102+
).instrument()
103+
JSPublishInstrumentation(
104+
tracer,
105+
capture_body=capture_body,
106+
capture_headers=capture_headers,
107+
).instrument()
77108

78109
def _uninstrument(self, **kwargs: Any) -> None:
79110
NatsCoreInstrumentator.uninstrument()
111+
JSConsumerInstrumentation.uninstrument()
112+
JSPublishInstrumentation.uninstrument()
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from collections.abc import Callable
2+
from typing import Any, Concatenate
3+
4+
from typing_extensions import ParamSpec
5+
from wrapt import ObjectProxy
6+
7+
_P = ParamSpec("_P")
8+
9+
10+
class AsyncCtxManagerProxy(ObjectProxy): # type: ignore
11+
"""
12+
Proxy object for context managers.
13+
14+
This class wraps a context manager,
15+
wrapping returned values on __aenter__,
16+
and calling __cancel_ctx__ at the exit.
17+
"""
18+
19+
def __init__(
20+
self,
21+
wrapped: Any,
22+
sub_wrappers: dict[type[Any], Callable[Concatenate[Any, _P], Any]],
23+
*args: _P.args,
24+
**kwargs: _P.kwargs,
25+
) -> None:
26+
super().__init__(wrapped)
27+
self._self_sub_args = args
28+
self._self_sub_kwargs = kwargs
29+
self._self_sub = None
30+
self._self_subwrappers = sub_wrappers
31+
32+
async def __aenter__(self) -> Any:
33+
sub: Any = await self.__wrapped__.__aenter__()
34+
sub_wrapper = self._self_subwrappers.get(type(sub))
35+
if sub_wrapper:
36+
sub = sub_wrapper(sub, *self._self_sub_args, **self._self_sub_kwargs)
37+
self._self_sub = sub
38+
return sub
39+
40+
async def __aexit__(self, *args: object, **kwargs: dict[Any, Any]) -> Any:
41+
if self._self_sub and hasattr(self._self_sub, "__cancel_ctx__"):
42+
self._self_sub.__cancel_ctx__(*args, **kwargs)

0 commit comments

Comments
 (0)