Skip to content

Commit 46a6203

Browse files
feat: add Temporal metrics collection with Prometheus integration
- Add TemporalRuntimeManager singleton for Runtime lifecycle management - Add prometheus_utils helper for shared Prometheus server management - Add Temporal metrics configuration to BaseConfig and .env.test - Integrate Runtime with metrics into TemporalAdapter and Worker - Add BDD tests for Temporal metrics collection scenarios - Fix Redis spop() type handling for async edge cases
1 parent f32ba3b commit 46a6203

14 files changed

Lines changed: 954 additions & 123 deletions

File tree

.env.test

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ TEMPORAL__HOST=localhost
2424
TEMPORAL__PORT=7233
2525
TEMPORAL__NAMESPACE=default
2626
TEMPORAL__TASK_QUEUE=test-task-queue
27+
TEMPORAL__ENABLE_METRICS=true
28+
TEMPORAL__METRICS_PORT=8201
29+
30+
# Prometheus Configuration
31+
PROMETHEUS__IS_ENABLED=true
32+
PROMETHEUS__SERVER_PORT=8200
2733

2834
# Redis Configuration
2935
REDIS__IMAGE=redis:8.4.0-alpine

archipy/adapters/redis/adapters.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,10 @@ def spop(self, name: str, count: int | None = None) -> bytes | float | int | str
677677
Returns:
678678
bytes | float | int | str | list | None: Popped member(s) or None.
679679
"""
680-
return self.client.spop(name, count)
680+
result = self.client.spop(name, count)
681+
if isinstance(result, Awaitable):
682+
raise TypeError("Unexpected awaitable from sync Redis client")
683+
return result
681684

682685
@override
683686
def srem(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:

archipy/adapters/temporal/adapters.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from archipy.models.errors.base_error import BaseError
3030

3131
from .ports import TemporalPort
32+
from .runtime import TemporalRuntimeManager
3233

3334
T = TypeVar("T")
3435

@@ -86,6 +87,16 @@ async def get_client(self) -> Client:
8687
tls_config = self._build_tls_config()
8788
connect_kwargs["tls"] = tls_config
8889

90+
# Configure Runtime with Prometheus telemetry if enabled
91+
if self.config.ENABLE_METRICS:
92+
runtime_manager = TemporalRuntimeManager()
93+
runtime = runtime_manager.get_runtime(
94+
prometheus_enabled=True,
95+
prometheus_port=self.config.METRICS_PORT,
96+
)
97+
if runtime is not None:
98+
connect_kwargs["runtime"] = runtime
99+
89100
self._client = await Client.connect(
90101
f"{self.config.HOST}:{self.config.PORT}",
91102
**connect_kwargs,
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""Temporal Runtime singleton for managing Runtime instances with telemetry.
2+
3+
This module provides a singleton class for creating and managing Temporal Runtime
4+
instances with Prometheus metrics integration.
5+
"""
6+
7+
import logging
8+
9+
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
10+
11+
from archipy.helpers.metaclasses.singleton import Singleton
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class TemporalRuntimeManager(metaclass=Singleton, thread_safe=True):
17+
"""Singleton manager for Temporal Runtime instances with telemetry configuration.
18+
19+
This class ensures only one Runtime instance is created and reused across all
20+
Temporal clients and workers. Once created with metrics enabled, the Runtime
21+
cannot be changed (Temporal SDK limitation).
22+
23+
Example:
24+
```python
25+
from archipy.adapters.temporal.runtime import TemporalRuntimeManager
26+
27+
# Get the singleton manager
28+
manager = TemporalRuntimeManager()
29+
30+
# Get Runtime with Prometheus enabled
31+
runtime = manager.get_runtime(prometheus_enabled=True, prometheus_port=18201)
32+
```
33+
"""
34+
35+
def __init__(self) -> None:
36+
"""Initialize the TemporalRuntimeManager singleton."""
37+
self._runtime: Runtime | None = None
38+
39+
def get_runtime(self, prometheus_enabled: bool = False, prometheus_port: int = 18201) -> Runtime | None:
40+
"""Get or create a Runtime with Prometheus telemetry.
41+
42+
Args:
43+
prometheus_enabled (bool): Whether to enable Prometheus metrics collection.
44+
prometheus_port (int): Port for the Prometheus metrics endpoint.
45+
46+
Returns:
47+
Runtime | None: The configured Runtime instance if metrics are enabled,
48+
None otherwise (uses default Runtime).
49+
50+
Note:
51+
Once a Runtime is created with metrics enabled, it cannot be disabled
52+
or recreated on a different port due to Temporal SDK limitations.
53+
Subsequent calls will return the existing Runtime regardless of parameters.
54+
"""
55+
if not prometheus_enabled:
56+
logger.debug("Prometheus metrics disabled for Temporal, using default runtime")
57+
return None
58+
59+
# If Runtime already created, return it (can't change once bound to port)
60+
if self._runtime is not None:
61+
logger.debug("Returning existing Temporal Runtime instance")
62+
return self._runtime
63+
64+
logger.info("Creating Temporal Runtime with Prometheus metrics on port %d", prometheus_port)
65+
66+
try:
67+
self._runtime = Runtime(
68+
telemetry=TelemetryConfig(
69+
metrics=PrometheusConfig(bind_address=f"0.0.0.0:{prometheus_port}"),
70+
),
71+
)
72+
logger.info("Temporal Runtime created successfully with Prometheus telemetry")
73+
except Exception:
74+
logger.exception("Failed to create Temporal Runtime with Prometheus config")
75+
# Return None so Temporal uses default Runtime
76+
return None
77+
78+
return self._runtime
79+
80+
def reset_runtime(self) -> None:
81+
"""Reset the Runtime instance.
82+
83+
Warning:
84+
This does NOT actually close the Runtime or release the port binding.
85+
The Temporal SDK does not support Runtime cleanup. This method only
86+
resets internal references for testing purposes. The port will remain
87+
bound until the process exits.
88+
"""
89+
logger.warning("Resetting Temporal Runtime reference (port remains bound until process exit)")
90+
self._runtime = None

archipy/adapters/temporal/worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ async def start_worker(
297297

298298
try:
299299
# Create the Temporal worker
300+
# Note: Worker inherits Runtime (including Prometheus config) from the Client
300301
worker = Worker(
301302
client,
302303
task_queue=task_queue,

archipy/configs/config_template.py

Lines changed: 12 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,6 @@ class ElasticsearchConfig(BaseModel):
3131
3232
Contains settings related to Elasticsearch server connectivity, authentication,
3333
TLS/SSL, request handling, node status management, and batch operation parameters.
34-
35-
Attributes:
36-
HOSTS (list[str]): List of Elasticsearch server hosts (e.g., ['https://localhost:9200']).
37-
HTTP_USER_NAME (str | None): Username for HTTP authentication.
38-
HTTP_PASSWORD (SecretStr | None): Password for HTTP authentication.
39-
CA_CERTS (str | None): Path to CA bundle for SSL verification.
40-
SSL_ASSERT_FINGERPRINT (str | None): SSL certificate fingerprint for verification.
41-
VERIFY_CERTS (bool): Whether to verify SSL certificates.
42-
CLIENT_CERT (str | None): Path to client certificate for TLS authentication.
43-
CLIENT_KEY (str | None): Path to client key for TLS authentication.
44-
HTTP_COMPRESS (bool): Whether to enable HTTP compression (gzip).
45-
REQUEST_TIMEOUT (float | None): Timeout for HTTP requests in seconds.
46-
MAX_RETRIES (int): Maximum number of retries per request.
47-
RETRY_ON_TIMEOUT (bool): Whether to retry on connection timeouts.
48-
RETRY_ON_STATUS (tuple[int, ...]): HTTP status codes to retry on.
49-
IGNORE_STATUS (tuple[int, ...]): HTTP status codes to ignore as errors.
50-
SNIFF_ON_START (bool): Whether to sniff nodes on client instantiation.
51-
SNIFF_BEFORE_REQUESTS (bool): Whether to sniff nodes before requests.
52-
SNIFF_ON_NODE_FAILURE (bool): Whether to sniff nodes on node failure.
53-
MIN_DELAY_BETWEEN_SNIFFING (float): Minimum delay between sniffing attempts in seconds.
54-
NODE_SELECTOR_CLASS (str): Node selector strategy ('round_robin' or 'random').
55-
CONNECTIONS_PER_NODE (int): Number of HTTP connections per node.
56-
DEAD_NODE_BACKOFF_FACTOR (float): Factor for calculating node timeout duration after failures.
57-
MAX_DEAD_NODE_BACKOFF (float): Maximum timeout duration for a dead node in seconds.
5834
"""
5935

6036
HOSTS: list[str] = Field(default=["https://localhost:9200"], description="List of Elasticsearch server hosts")
@@ -821,30 +797,25 @@ class TemporalConfig(BaseModel):
821797
822798
Controls connection parameters, security settings, and timeout configurations
823799
for Temporal workflow orchestration services.
824-
825-
Attributes:
826-
HOST (str): Temporal server host address.
827-
PORT (int): Temporal server port number.
828-
NAMESPACE (str): Temporal namespace for workflow isolation.
829-
TASK_QUEUE (str): Default task queue for workflow and activity execution.
830-
TLS_CA_CERT (str | None): Path to TLS CA certificate for secure connections.
831-
TLS_CLIENT_CERT (str | None): Path to TLS client certificate for mutual authentication.
832-
TLS_CLIENT_KEY (str | None): Path to TLS client private key.
833-
WORKFLOW_EXECUTION_TIMEOUT (int): Maximum workflow execution time in seconds.
834-
WORKFLOW_RUN_TIMEOUT (int): Maximum single workflow run time in seconds.
835-
WORKFLOW_TASK_TIMEOUT (int): Maximum workflow task processing time in seconds.
836-
ACTIVITY_START_TO_CLOSE_TIMEOUT (int): Maximum activity execution time in seconds.
837-
ACTIVITY_HEARTBEAT_TIMEOUT (int): Activity heartbeat timeout in seconds.
838-
RETRY_MAXIMUM_ATTEMPTS (int): Maximum retry attempts for failed activities.
839-
RETRY_BACKOFF_COEFFICIENT (float): Backoff multiplier for retry delays.
840-
RETRY_MAXIMUM_INTERVAL (int): Maximum retry interval in seconds.
841800
"""
842801

843802
HOST: str = Field(default="localhost", description="Temporal server host address")
844803
PORT: int = Field(default=7233, ge=1, le=65535, description="Temporal server port number")
845804
NAMESPACE: str = Field(default="default", description="Temporal namespace for workflow isolation")
846805
TASK_QUEUE: str = Field(default="task-queue", description="Default task queue name")
847806

807+
# Metrics Configuration
808+
ENABLE_METRICS: bool = Field(
809+
default=False,
810+
description="Enable Prometheus metrics collection for Temporal workflows and activities",
811+
)
812+
METRICS_PORT: int = Field(
813+
default=8201,
814+
ge=1,
815+
le=65535,
816+
description="Port for Temporal Prometheus metrics endpoint (separate from main Prometheus port)",
817+
)
818+
848819
# TLS Configuration
849820
TLS_CA_CERT: str | None = Field(default=None, description="Path to TLS CA certificate")
850821
TLS_CLIENT_CERT: str | None = Field(default=None, description="Path to TLS client certificate")
@@ -925,29 +896,6 @@ class ScyllaDBConfig(BaseModel):
925896
Contains settings related to ScyllaDB cluster connectivity, authentication,
926897
compression, consistency levels, connection management, retry policies,
927898
prepared statement caching, and health checks.
928-
929-
Attributes:
930-
CONTACT_POINTS (list[str]): List of ScyllaDB node addresses.
931-
PORT (int): CQL native transport port number.
932-
KEYSPACE (str | None): Default keyspace name.
933-
USERNAME (str | None): Username for authentication.
934-
PASSWORD (SecretStr | None): Password for authentication.
935-
PROTOCOL_VERSION (int): Protocol version to use.
936-
COMPRESSION (bool): Enable LZ4 compression.
937-
CONNECT_TIMEOUT (int): Connection timeout in seconds.
938-
REQUEST_TIMEOUT (int): Request timeout in seconds.
939-
CONSISTENCY_LEVEL (Literal): Default consistency level.
940-
DISABLE_SHARD_AWARENESS (bool): Disable shard awareness (default: False).
941-
RETRY_POLICY (Literal): Retry policy type (default: "EXPONENTIAL_BACKOFF").
942-
Options: "EXPONENTIAL_BACKOFF", "FALLTHROUGH", "DOWNGRADING_CONSISTENCY".
943-
RETRY_MAX_NUM_RETRIES (float): Maximum number of retries for ExponentialBackoffRetryPolicy (default: 3.0).
944-
RETRY_MIN_INTERVAL (float): Minimum interval in seconds between retries (default: 0.1).
945-
RETRY_MAX_INTERVAL (float): Maximum interval in seconds between retries (default: 10.0).
946-
ENABLE_PREPARED_STATEMENT_CACHE (bool): Enable prepared statement caching (default: True).
947-
PREPARED_STATEMENT_CACHE_SIZE (int): Maximum cached prepared statements (default: 100).
948-
PREPARED_STATEMENT_CACHE_TTL_SECONDS (int): TTL for cache in seconds (default: 3600).
949-
HEALTH_CHECK_TIMEOUT (int): Timeout for health check queries in seconds (default: 5).
950-
ENABLE_CONNECTION_POOL_MONITORING (bool): Enable pool monitoring (default: False).
951899
"""
952900

953901
CONTACT_POINTS: list[str] = Field(

archipy/helpers/utils/app_utils.py

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import logging
4-
import socket
54
from collections.abc import Callable
65
from concurrent import futures
76
from contextlib import AbstractAsyncContextManager
@@ -12,6 +11,7 @@
1211

1312
from archipy.configs.base_config import BaseConfig
1413
from archipy.helpers.utils.base_utils import BaseUtils
14+
from archipy.helpers.utils.prometheus_utils import is_prometheus_server_running
1515
from archipy.models.errors import (
1616
BaseError,
1717
InvalidArgumentError,
@@ -20,8 +20,8 @@
2020
)
2121

2222
if TYPE_CHECKING:
23-
from grpc.experimental import aio as grpc_aio
24-
from grpc.experimental.aio import Server as GrpcAioServer
23+
from grpc import aio as grpc_aio
24+
from grpc.aio import Server as GrpcAioServer
2525

2626
CreateGrpcServerType = Callable[..., GrpcAioServer]
2727
else:
@@ -32,7 +32,7 @@
3232

3333
try:
3434
import grpc
35-
from grpc.experimental import aio as grpc_aio
35+
from grpc import aio as grpc_aio
3636

3737
create_grpc_server: CreateGrpcServerType = grpc_aio.server
3838
GRPC_APP = True
@@ -53,19 +53,8 @@
5353
FASTAPI_APP = False
5454

5555

56-
def _is_prometheus_server_running(port: int) -> bool:
57-
"""Check if Prometheus server is already running on the specified port.
58-
59-
Args:
60-
port (int): The port number to check.
61-
62-
Returns:
63-
bool: True if server is running, False otherwise.
64-
"""
65-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
66-
result = sock.connect_ex(("localhost", port))
67-
sock.close()
68-
return result == 0
56+
# Backward compatibility alias
57+
_is_prometheus_server_running = is_prometheus_server_running
6958

7059

7160
class FastAPIExceptionHandler:
@@ -225,12 +214,10 @@ def setup_metric_interceptor(app: FastAPI, config: BaseConfig) -> None:
225214
return
226215

227216
try:
228-
from prometheus_client import start_http_server
229-
230217
from archipy.helpers.interceptors.fastapi.metric.interceptor import FastAPIMetricInterceptor
218+
from archipy.helpers.utils.prometheus_utils import start_prometheus_server_if_needed
231219

232-
if not _is_prometheus_server_running(config.PROMETHEUS.SERVER_PORT):
233-
start_http_server(config.PROMETHEUS.SERVER_PORT)
220+
start_prometheus_server_if_needed(config.PROMETHEUS.SERVER_PORT)
234221

235222
app.add_middleware(FastAPIMetricInterceptor) # type: ignore[arg-type]
236223
except Exception:
@@ -317,12 +304,10 @@ def setup_metric_interceptor(config: BaseConfig, interceptors: list) -> None:
317304
return
318305

319306
try:
320-
from prometheus_client import start_http_server
321-
322307
from archipy.helpers.interceptors.grpc.metric.server_interceptor import AsyncGrpcServerMetricInterceptor
308+
from archipy.helpers.utils.prometheus_utils import start_prometheus_server_if_needed
323309

324-
if not _is_prometheus_server_running(config.PROMETHEUS.SERVER_PORT):
325-
start_http_server(config.PROMETHEUS.SERVER_PORT)
310+
start_prometheus_server_if_needed(config.PROMETHEUS.SERVER_PORT)
326311

327312
interceptors.append(AsyncGrpcServerMetricInterceptor())
328313

@@ -363,12 +348,10 @@ def setup_metric_interceptor(config: BaseConfig, interceptors: list) -> None:
363348
return
364349

365350
try:
366-
from prometheus_client import start_http_server
367-
368351
from archipy.helpers.interceptors.grpc.metric.server_interceptor import GrpcServerMetricInterceptor
352+
from archipy.helpers.utils.prometheus_utils import start_prometheus_server_if_needed
369353

370-
if not _is_prometheus_server_running(config.PROMETHEUS.SERVER_PORT):
371-
start_http_server(config.PROMETHEUS.SERVER_PORT)
354+
start_prometheus_server_if_needed(config.PROMETHEUS.SERVER_PORT)
372355

373356
interceptors.append(GrpcServerMetricInterceptor())
374357

@@ -448,7 +431,7 @@ def create_async_grpc_app(
448431
AsyncGrpcAPIUtils.setup_metric_interceptor(config, async_interceptors)
449432

450433
if create_grpc_server is None:
451-
raise ImportError("grpc.experimental.aio is not available")
434+
raise ImportError("grpc.aio is not available")
452435
app = create_grpc_server(
453436
futures.ThreadPoolExecutor(max_workers=config.GRPC.THREAD_WORKER_COUNT),
454437
interceptors=async_interceptors,

0 commit comments

Comments
 (0)