diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index f60eeb9d87d..1daf59fa2ec 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -148,28 +148,72 @@ class ChatRequest(BaseModel): messages: List[ChatMessage] -def _resolve_feature_counts( +def _parse_feature_info( features: Union[List[str], "feast.FeatureService"], ) -> tuple: - """Return (feature_count, feature_view_count) from the resolved features. + """Return ``(feature_view_names, feature_count)`` from resolved features. ``features`` is either a list of ``"feature_view:feature"`` strings or a ``FeatureService`` with ``feature_view_projections``. + + Returns: + (fv_names, feat_count) where fv_names is a list of unique feature + view name strings and feat_count is the total number of features. """ from feast.feature_service import FeatureService if isinstance(features, FeatureService): projections = features.feature_view_projections - fv_count = len(projections) + fv_names = [p.name for p in projections] feat_count = sum(len(p.features) for p in projections) elif isinstance(features, list): feat_count = len(features) - fv_names = {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref} - fv_count = len(fv_names) + fv_names = list( + {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref} + ) else: + fv_names = [] feat_count = 0 - fv_count = 0 - return str(feat_count), str(fv_count) + return fv_names, feat_count + + +def _resolve_feature_counts( + features: Union[List[str], "feast.FeatureService"], +) -> tuple: + """Return ``(feature_count_str, feature_view_count_str)`` for Prometheus labels.""" + fv_names, feat_count = _parse_feature_info(features) + return str(feat_count), str(len(fv_names)) + + +def _emit_online_audit( + request: GetOnlineFeaturesRequest, + features: Union[List[str], "feast.FeatureService"], + entity_count: int, + status: str, + latency_ms: float, +): + """Best-effort audit log emission for online feature requests.""" + try: + from feast.permissions.security_manager import get_security_manager + + requestor_id = "anonymous" + sm = get_security_manager() + if sm and sm.current_user: + requestor_id = sm.current_user.username or "anonymous" + + fv_names, feat_count = _parse_feature_info(features) + + feast_metrics.emit_online_audit_log( + requestor_id=requestor_id, + entity_keys=list(request.entities.keys()), + entity_count=entity_count, + feature_views=fv_names, + feature_count=feat_count, + status=status, + latency_ms=latency_ms, + ) + except Exception: + logger.warning("Failed to emit online audit log", exc_info=True) async def _get_features( @@ -387,11 +431,22 @@ async def get_online_features(request: GetOnlineFeaturesRequest) -> Any: include_feature_view_version_metadata=request.include_feature_view_version_metadata, ) - if store._get_provider().async_supported.online.read: - response = await store.get_online_features_async(**read_params) # type: ignore - else: - response = await run_in_threadpool( - lambda: store.get_online_features(**read_params) # type: ignore + audit_start_ms = time.monotonic() * 1000 + audit_status = "success" + try: + if store._get_provider().async_supported.online.read: + response = await store.get_online_features_async(**read_params) # type: ignore + else: + response = await run_in_threadpool( + lambda: store.get_online_features(**read_params) # type: ignore + ) + except Exception: + audit_status = "error" + raise + finally: + audit_latency_ms = time.monotonic() * 1000 - audit_start_ms + _emit_online_audit( + request, features, entity_count, audit_status, audit_latency_ms ) response_dict = await run_in_threadpool( diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index df324dc57d3..14ad2fe505e 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -82,6 +82,17 @@ class MetricsConfig(FeastConfigBaseModel): """Emit per-feature-view freshness gauges (feast_feature_freshness_seconds).""" + offline_features: StrictBool = True + """Emit offline store retrieval metrics + (feast_offline_store_request_total, + feast_offline_store_request_latency_seconds, + feast_offline_store_row_count).""" + + audit_logging: StrictBool = False + """Emit structured JSON audit log entries for online and offline + feature requests via the ``feast.audit`` logger. Captures requestor + identity, entity keys, feature views, row counts, and latency.""" + class BaseFeatureServerConfig(FeastConfigBaseModel): """Base Feature Server config that should be extended""" diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 4ae0c680c3b..9d9fee22623 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -11,9 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import time import warnings from abc import ABC -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import ( TYPE_CHECKING, @@ -70,6 +72,21 @@ def __init__( self.max_event_timestamp = max_event_timestamp +def _extract_retrieval_metadata(job: "RetrievalJob") -> tuple: + """Return ``(feature_view_names, feature_count)`` from a RetrievalJob's metadata.""" + try: + meta = job.metadata + if meta: + feature_count = len(meta.features) + feature_views = list( + {ref.split(":")[0] for ref in meta.features if ":" in ref} + ) + return feature_views, feature_count + except (NotImplementedError, AttributeError): + pass + return [], 0 + + class RetrievalJob(ABC): """A RetrievalJob manages the execution of a query to retrieve data from the offline store.""" @@ -152,7 +169,51 @@ def to_arrow( validation_reference (optional): The validation to apply against the retrieved dataframe. timeout (optional): The query timeout if applicable. """ - features_table = self._to_arrow_internal(timeout=timeout) + start_wall = time.monotonic() + status_label = "success" + row_count = 0 + try: + features_table = self._to_arrow_internal(timeout=timeout) + row_count = features_table.num_rows + except Exception: + status_label = "error" + raise + finally: + try: + from feast import metrics as feast_metrics + + elapsed = time.monotonic() - start_wall + + if feast_metrics._config.offline_features: + feast_metrics.offline_store_request_total.labels( + method="to_arrow", status=status_label + ).inc() + feast_metrics.offline_store_request_latency_seconds.labels( + method="to_arrow" + ).observe(elapsed) + if row_count > 0: + feast_metrics.offline_store_row_count.labels( + method="to_arrow" + ).observe(row_count) + + if feast_metrics._config.audit_logging: + feature_views, feature_count = _extract_retrieval_metadata(self) + now_iso = datetime.now(tz=timezone.utc).isoformat() + feast_metrics.emit_offline_audit_log( + method="to_arrow", + feature_views=feature_views, + feature_count=feature_count, + row_count=row_count, + status=status_label, + start_time=now_iso, + end_time=now_iso, + duration_ms=elapsed * 1000, + ) + except Exception: + logging.getLogger(__name__).debug( + "Failed to record offline store metrics", exc_info=True + ) + if self.on_demand_feature_views: # Build a mapping of ODFV name to requested feature names # This ensures we only return the features that were explicitly requested diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py index 694f25a687e..f827f6e31ee 100644 --- a/sdk/python/feast/metrics.py +++ b/sdk/python/feast/metrics.py @@ -42,6 +42,7 @@ """ import atexit +import json import logging import os import shutil @@ -51,7 +52,7 @@ from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, List, Optional import psutil @@ -123,6 +124,8 @@ class _MetricsFlags: push: bool = False materialization: bool = False freshness: bool = False + offline_features: bool = False + audit_logging: bool = False _config = _MetricsFlags() @@ -144,6 +147,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=False, ) return _MetricsFlags( enabled=True, @@ -153,6 +158,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag push=getattr(metrics_config, "push", True), materialization=getattr(metrics_config, "materialization", True), freshness=getattr(metrics_config, "freshness", True), + offline_features=getattr(metrics_config, "offline_features", True), + audit_logging=getattr(metrics_config, "audit_logging", False), ) @@ -260,6 +267,33 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag multiprocess_mode="max", ) +# --------------------------------------------------------------------------- +# Offline store retrieval metrics +# --------------------------------------------------------------------------- +offline_store_request_total = Counter( + "feast_offline_store_request_total", + "Total offline store retrieval requests", + ["method", "status"], +) +offline_store_request_latency_seconds = Histogram( + "feast_offline_store_request_latency_seconds", + "Latency of offline store retrieval operations in seconds", + ["method"], + buckets=(0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0), +) +offline_store_row_count = Histogram( + "feast_offline_store_row_count", + "Number of rows returned by offline store retrieval", + ["method"], + buckets=(100, 1000, 10000, 100000, 500000, 1000000, 5000000), +) + +# --------------------------------------------------------------------------- +# Audit logger — separate from the main feast logger so operators can +# route SOX-style audit entries to a dedicated sink. +# --------------------------------------------------------------------------- +audit_logger = logging.getLogger("feast.audit") + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -388,6 +422,71 @@ def track_materialization( ) +def emit_online_audit_log( + *, + requestor_id: str, + entity_keys: List[str], + entity_count: int, + feature_views: List[str], + feature_count: int, + status: str, + latency_ms: float, +): + """Emit a structured JSON audit log entry for an online feature request.""" + if not _config.audit_logging: + return + audit_logger.info( + _json_dumps( + { + "event": "online_feature_request", + "timestamp": datetime.now(tz=timezone.utc).isoformat(), + "requestor_id": requestor_id, + "entity_keys": entity_keys, + "entity_count": entity_count, + "feature_views": feature_views, + "feature_count": feature_count, + "status": status, + "latency_ms": round(latency_ms, 2), + } + ) + ) + + +def emit_offline_audit_log( + *, + method: str, + feature_views: List[str], + feature_count: int, + row_count: int, + status: str, + start_time: str, + end_time: str, + duration_ms: float, +): + """Emit a structured JSON audit log entry for an offline feature retrieval.""" + if not _config.audit_logging: + return + audit_logger.info( + _json_dumps( + { + "event": "offline_feature_retrieval", + "method": method, + "start_time": start_time, + "end_time": end_time, + "feature_views": feature_views, + "feature_count": feature_count, + "row_count": row_count, + "status": status, + "duration_ms": round(duration_ms, 2), + } + ) + ) + + +def _json_dumps(obj: dict) -> str: + return json.dumps(obj, separators=(",", ":")) + + def update_feature_freshness( store: "FeatureStore", ) -> None: @@ -507,6 +606,8 @@ def start_metrics_server( push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=False, ) from prometheus_client import CollectorRegistry, make_wsgi_app diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py index bffde73dd91..2750757f67a 100644 --- a/sdk/python/tests/unit/test_metrics.py +++ b/sdk/python/tests/unit/test_metrics.py @@ -18,9 +18,14 @@ import pytest from feast.metrics import ( + emit_offline_audit_log, + emit_online_audit_log, feature_freshness_seconds, materialization_duration_seconds, materialization_result_total, + offline_store_request_latency_seconds, + offline_store_request_total, + offline_store_row_count, online_features_entity_count, online_features_request_count, online_features_status_total, @@ -42,13 +47,11 @@ ) -@pytest.fixture(autouse=True) -def _enable_metrics(): - """Enable all metric categories for each test, then restore.""" +def _all_enabled_flags(): + """Return a _MetricsFlags with every category enabled.""" import feast.metrics as m - original = m._config - m._config = m._MetricsFlags( + return m._MetricsFlags( enabled=True, resource=True, request=True, @@ -56,7 +59,18 @@ def _enable_metrics(): push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=True, ) + + +@pytest.fixture(autouse=True) +def _enable_metrics(): + """Enable all metric categories for each test, then restore.""" + import feast.metrics as m + + original = m._config + m._config = _all_enabled_flags() yield m._config = original @@ -1081,3 +1095,640 @@ def test_separate_from_read_transform_metric(self): assert abs(read_delta - 0.01) < 0.001 assert abs(write_delta - 0.05) < 0.001 + + +class TestOfflineStoreMetrics: + """Tests for the offline store Prometheus metrics (RED pattern).""" + + def test_request_total_increments_on_success(self): + before = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="success").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before + 1 + ) + + def test_request_total_increments_on_error(self): + before = offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="error").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + == before + 1 + ) + + def test_latency_histogram_records(self): + before_sum = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + + offline_store_request_latency_seconds.labels(method="to_arrow").observe(2.5) + + after_sum = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + assert pytest.approx(after_sum - before_sum, abs=0.01) == 2.5 + + def test_row_count_histogram_records(self): + before_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + + offline_store_row_count.labels(method="to_arrow").observe(1000) + + after_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + assert pytest.approx(after_sum - before_sum, abs=1) == 1000 + + def test_different_methods_tracked_independently(self): + before_a = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + before_b = offline_store_request_total.labels( + method="other", status="success" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="success").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_a + 1 + ) + assert ( + offline_store_request_total.labels( + method="other", status="success" + )._value.get() + == before_b + ) + + +class TestEmitAuditLogs: + """Tests for structured JSON audit log emission.""" + + def test_emit_online_audit_log_writes_json(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="user@example.com", + entity_keys=["driver_id", "customer_id"], + entity_count=10, + feature_views=["driver_fv", "order_fv"], + feature_count=5, + status="success", + latency_ms=42.0, + ) + + mock_info.assert_called_once() + logged_json = mock_info.call_args[0][0] + record = json.loads(logged_json) + + assert record["event"] == "online_feature_request" + assert record["requestor_id"] == "user@example.com" + assert record["entity_keys"] == ["driver_id", "customer_id"] + assert record["entity_count"] == 10 + assert record["feature_views"] == ["driver_fv", "order_fv"] + assert record["feature_count"] == 5 + assert record["status"] == "success" + assert record["latency_ms"] == pytest.approx(42.0) + assert "timestamp" in record + + def test_emit_online_audit_log_noop_when_disabled(self): + import logging + + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, audit_logging=False) + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="user@example.com", + entity_keys=["driver_id"], + entity_count=1, + feature_views=["driver_fv"], + feature_count=1, + status="success", + latency_ms=10.0, + ) + mock_info.assert_not_called() + + def test_emit_offline_audit_log_writes_json(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_offline_audit_log( + method="to_arrow", + feature_views=["driver_fv"], + feature_count=3, + row_count=500, + status="success", + start_time="2026-04-27T12:00:00+00:00", + end_time="2026-04-27T12:00:01+00:00", + duration_ms=1230.0, + ) + + mock_info.assert_called_once() + logged_json = mock_info.call_args[0][0] + record = json.loads(logged_json) + + assert record["event"] == "offline_feature_retrieval" + assert record["method"] == "to_arrow" + assert record["feature_views"] == ["driver_fv"] + assert record["feature_count"] == 3 + assert record["row_count"] == 500 + assert record["status"] == "success" + assert record["duration_ms"] == pytest.approx(1230.0) + assert record["start_time"] == "2026-04-27T12:00:00+00:00" + assert record["end_time"] == "2026-04-27T12:00:01+00:00" + + def test_emit_offline_audit_log_noop_when_disabled(self): + import logging + + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, audit_logging=False) + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_offline_audit_log( + method="to_arrow", + feature_views=["fv"], + feature_count=1, + row_count=10, + status="success", + start_time="t0", + end_time="t1", + duration_ms=500.0, + ) + mock_info.assert_not_called() + + def test_emit_online_audit_log_with_error_status(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="unknown", + entity_keys=[], + entity_count=0, + feature_views=[], + feature_count=0, + status="error", + latency_ms=1.0, + ) + + record = json.loads(mock_info.call_args[0][0]) + assert record["status"] == "error" + + +class TestBuildMetricsFlagsOfflineAndAudit: + """Tests for the new offline_features and audit_logging flags.""" + + def test_no_config_defaults_for_new_flags(self): + from feast.metrics import build_metrics_flags + + flags = build_metrics_flags(None) + assert flags.offline_features is True + assert flags.audit_logging is False + + def test_explicit_enable(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + offline_features=True, + audit_logging=True, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is True + assert flags.audit_logging is True + + def test_explicit_disable(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + offline_features=False, + audit_logging=False, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is False + assert flags.audit_logging is False + + def test_missing_new_attrs_fall_back_to_defaults(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is True + assert flags.audit_logging is False + + +class TestExtractRetrievalMetadata: + """Tests for _extract_retrieval_metadata helper.""" + + def test_extracts_feature_views_and_count(self): + from feast.infra.offline_stores.offline_store import ( + RetrievalMetadata, + _extract_retrieval_metadata, + ) + + job = MagicMock() + job.metadata = RetrievalMetadata( + features=[ + "driver_fv:conv_rate", + "driver_fv:acc_rate", + "vehicle_fv:mileage", + ], + keys=["driver_id"], + ) + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert feat_count == 3 + assert set(fv_names) == {"driver_fv", "vehicle_fv"} + + def test_returns_empty_when_no_metadata(self): + from feast.infra.offline_stores.offline_store import ( + _extract_retrieval_metadata, + ) + + job = MagicMock() + job.metadata = None + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert fv_names == [] + assert feat_count == 0 + + def test_handles_not_implemented_metadata(self): + from feast.infra.offline_stores.offline_store import ( + _extract_retrieval_metadata, + ) + + job = MagicMock() + type(job).metadata = property( + lambda self: (_ for _ in ()).throw(NotImplementedError()) + ) + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert fv_names == [] + assert feat_count == 0 + + +class TestRetrievalJobToArrowInstrumentation: + """Tests for the metrics/audit instrumentation in RetrievalJob.to_arrow().""" + + def _make_job( + self, table, on_demand_fvs=None, metadata=None, raise_on_internal=None + ): + """Create a concrete RetrievalJob subclass for testing.""" + from feast.infra.offline_stores.offline_store import RetrievalJob + + class _TestJob(RetrievalJob): + def __init__(self): + self._table = table + self._odfvs = on_demand_fvs or [] + self._metadata = metadata + self._raise = raise_on_internal + + def _to_arrow_internal(self, timeout=None): + if self._raise: + raise self._raise + return self._table + + @property + def full_feature_names(self): + return False + + @property + def on_demand_feature_views(self): + return self._odfvs + + @property + def metadata(self): + return self._metadata + + return _TestJob() + + def test_success_increments_counter_and_records_latency(self): + import pyarrow as pa + + table = pa.table({"col": [1, 2, 3]}) + job = self._make_job(table) + + before_count = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + before_latency = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + + result = job.to_arrow() + + assert result.num_rows == 3 + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_count + 1 + ) + assert ( + offline_store_request_latency_seconds.labels(method="to_arrow")._sum.get() + > before_latency + ) + + def test_error_increments_error_counter(self): + job = self._make_job(None, raise_on_internal=RuntimeError("query failed")) + + before_error = offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + + with pytest.raises(RuntimeError, match="query failed"): + job.to_arrow() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + == before_error + 1 + ) + + def test_row_count_recorded_on_success(self): + import pyarrow as pa + + table = pa.table({"a": list(range(500))}) + job = self._make_job(table) + + before_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + + job.to_arrow() + + assert ( + offline_store_row_count.labels(method="to_arrow")._sum.get() + >= before_sum + 500 + ) + + def test_row_count_not_recorded_when_zero(self): + import pyarrow as pa + + table = pa.table({"a": pa.array([], type=pa.int64())}) + job = self._make_job(table) + + before_count = offline_store_row_count.labels(method="to_arrow")._sum.get() + + job.to_arrow() + + assert ( + offline_store_row_count.labels(method="to_arrow")._sum.get() == before_count + ) + + def test_metrics_skipped_when_offline_features_disabled(self): + import pyarrow as pa + + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, offline_features=False, audit_logging=False + ) + + table = pa.table({"col": [1, 2]}) + job = self._make_job(table) + + before_count = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + + job.to_arrow() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_count + ) + + def test_audit_log_emitted_on_success(self): + import pyarrow as pa + + from feast.infra.offline_stores.offline_store import RetrievalMetadata + + meta = RetrievalMetadata( + features=["driver_fv:conv_rate", "driver_fv:acc_rate"], + keys=["driver_id"], + ) + table = pa.table({"col": [1, 2, 3]}) + job = self._make_job(table, metadata=meta) + + with patch("feast.metrics.emit_offline_audit_log") as mock_audit: + job.to_arrow() + + mock_audit.assert_called_once() + call_kwargs = mock_audit.call_args[1] + assert call_kwargs["method"] == "to_arrow" + assert call_kwargs["status"] == "success" + assert call_kwargs["row_count"] == 3 + assert call_kwargs["feature_count"] == 2 + assert set(call_kwargs["feature_views"]) == {"driver_fv"} + + def test_audit_log_skipped_when_disabled(self): + import pyarrow as pa + + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, offline_features=True, audit_logging=False + ) + + table = pa.table({"col": [1]}) + job = self._make_job(table) + + with patch("feast.metrics.emit_offline_audit_log") as mock_audit: + job.to_arrow() + mock_audit.assert_not_called() + + def test_instrumentation_failure_does_not_mask_query_error(self): + """If metrics code itself throws, the original query error still propagates.""" + import pyarrow as pa + + table = pa.table({"col": [1]}) + job = self._make_job(table) + + with patch( + "feast.metrics._config", + new_callable=lambda: property( + lambda self: (_ for _ in ()).throw(RuntimeError("metrics broken")) + ), + ): + result = job.to_arrow() + assert result.num_rows == 1 + + +class TestParseFeatureInfo: + """Tests for _parse_feature_info in feature_server.""" + + def test_feature_ref_list(self): + from feast.feature_server import _parse_feature_info + + refs = ["driver_fv:conv_rate", "driver_fv:acc_rate", "vehicle_fv:mileage"] + fv_names, feat_count = _parse_feature_info(refs) + assert feat_count == 3 + assert set(fv_names) == {"driver_fv", "vehicle_fv"} + + def test_empty_list(self): + from feast.feature_server import _parse_feature_info + + fv_names, feat_count = _parse_feature_info([]) + assert fv_names == [] + assert feat_count == 0 + + def test_feature_service(self): + from feast.feature_server import _parse_feature_info + + proj1 = MagicMock() + proj1.name = "driver_fv" + proj1.features = [MagicMock(), MagicMock()] + proj2 = MagicMock() + proj2.name = "order_fv" + proj2.features = [MagicMock()] + + fs_svc = MagicMock() + fs_svc.feature_view_projections = [proj1, proj2] + + from feast.feature_service import FeatureService + + fs_svc.__class__ = FeatureService + + fv_names, feat_count = _parse_feature_info(fs_svc) + assert feat_count == 3 + assert fv_names == ["driver_fv", "order_fv"] + + def test_strips_version_suffix(self): + from feast.feature_server import _parse_feature_info + + refs = ["driver_fv@v2:conv_rate"] + fv_names, feat_count = _parse_feature_info(refs) + assert feat_count == 1 + assert fv_names == ["driver_fv"] + + +class TestEmitOnlineAudit: + """Tests for the _emit_online_audit helper in feature_server.""" + + def test_emits_audit_log_with_anonymous_user(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1, 2]}, + features=["driver_fv:conv_rate"], + ) + + with ( + patch("feast.feature_server.feast_metrics") as mock_metrics, + patch( + "feast.permissions.security_manager.get_security_manager", + return_value=None, + ), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=2, + status="success", + latency_ms=15.0, + ) + + mock_metrics.emit_online_audit_log.assert_called_once() + kwargs = mock_metrics.emit_online_audit_log.call_args[1] + assert kwargs["requestor_id"] == "anonymous" + assert kwargs["entity_keys"] == ["driver_id"] + assert kwargs["entity_count"] == 2 + assert kwargs["status"] == "success" + + def test_emits_audit_log_with_authenticated_user(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1]}, + features=["driver_fv:conv_rate"], + ) + + mock_sm = MagicMock() + mock_sm.current_user.username = "jdoe" + + with ( + patch("feast.feature_server.feast_metrics") as mock_metrics, + patch( + "feast.permissions.security_manager.get_security_manager", + return_value=mock_sm, + ), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=1, + status="success", + latency_ms=10.0, + ) + + kwargs = mock_metrics.emit_online_audit_log.call_args[1] + assert kwargs["requestor_id"] == "jdoe" + + def test_does_not_raise_on_failure(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1]}, + features=["driver_fv:conv_rate"], + ) + + with patch( + "feast.permissions.security_manager.get_security_manager", + side_effect=RuntimeError("auth broken"), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=1, + status="error", + latency_ms=5.0, + )