diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 58a92db8139..0bfb7d21e2b 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -118,7 +118,8 @@ def tag_objects_for_keep_delete_update_add( ) -FIELDS_TO_IGNORE = {"project"} +# "version" is handled separately below with human-readable pin display. +FIELDS_TO_IGNORE = {"project", "version"} def diff_registry_objects( @@ -189,6 +190,30 @@ def diff_registry_objects( getattr(new_spec, _field.name), ) ) + # Show version pin/unpin changes using the resolved numeric version from meta + # rather than the raw spec.version string, giving more useful plan output. + # e.g. "version: v2 -> v1 (pin)" or "version: v1 (pin) -> latest" + if hasattr(current_proto, "meta") and hasattr(new_proto, "meta"): + from feast.version_utils import parse_version, version_tag + + cur_pin = getattr(current_proto.meta, "current_version_number", 0) + new_ver_str = getattr(new_spec, "version", None) or "latest" + is_latest, pin_target = parse_version(new_ver_str) + + if not is_latest and pin_target is not None: + # User is pinning to a specific version. + cur_display = version_tag(cur_pin) if cur_pin else "latest" + new_display = f"{version_tag(pin_target)} (pin)" + if cur_display != new_display: + transition = TransitionType.UPDATE + property_diffs.append(PropertyDiff("version", cur_display, new_display)) + elif cur_pin: + # User is moving back to latest (unpinning). + property_diffs.append( + PropertyDiff("version", f"{version_tag(cur_pin)} (pin)", "latest") + ) + transition = TransitionType.UPDATE + return FeastObjectDiff( name=new_spec.name, feast_object_type=object_type, diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index bfc71db875e..e7dd5232546 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -952,6 +952,16 @@ def plan( # the desired repo state. registry_diff = diff_between(self.registry, self.project, desired_repo_contents) + # Surface version pin conflicts at plan time so users see the error before + # running feast apply. Applies to all feature view types that support versioning. + all_fvs = ( + list(desired_repo_contents.feature_views) + + list(desired_repo_contents.stream_feature_views) + + list(desired_repo_contents.on_demand_feature_views) + ) + for fv in all_fvs: + self.registry.check_version_pin_conflict(fv, self.project) + if progress_ctx: progress_ctx.update_phase_progress("Computing infrastructure diff") diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index da4f291bc44..6ed314bf367 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -27,6 +27,8 @@ from feast.errors import ( ConflictingFeatureViewNames, FeatureViewNotFoundException, + FeatureViewPinConflict, + FeatureViewVersionNotFound, ) from feast.feature_service import FeatureService from feast.feature_view import FeatureView @@ -538,6 +540,71 @@ def get_feature_view_by_version( "get_feature_view_by_version is not implemented for this registry" ) + def check_version_pin_conflict( + self, feature_view: BaseFeatureView, project: str + ) -> None: + """Raise FeatureViewPinConflict if a version pin conflicts with schema changes. + + Mirrors the conflict check inside apply_feature_view so that feast plan + can surface the error before feast apply is run. The check is read-only: + no registry state is modified. + + A conflict occurs when the user simultaneously pins to an existing version + AND modifies the feature view definition (schema or UDF change). + + Args: + feature_view: The desired feature view from the repo config. + project: Feast project name. + + Raises: + FeatureViewPinConflict: if the pin and schema change conflict. + """ + from feast.version_utils import parse_version, version_tag + + fv_version = getattr(feature_view, "version", None) + if not fv_version: + return + + is_latest, pin_version = parse_version(fv_version) + if is_latest or pin_version is None: + return + + # Check whether the target version already exists in the registry. + try: + self.get_feature_view_by_version( + feature_view.name, project, pin_version, allow_cache=True + ) + except (FeatureViewVersionNotFound, NotImplementedError): + # Version doesn't exist → forward declaration; no conflict possible. + return + except Exception: + return + + # Version exists → check whether the user also modified the definition. + try: + active_fv = self.get_any_feature_view( + feature_view.name, project, allow_cache=True + ) + except (FeatureViewNotFoundException, NotImplementedError): + return + + user_fv_copy = feature_view.__copy__() + active_fv_copy = active_fv.__copy__() + + # Normalise fields that legitimately differ between desired and stored FVs. + user_fv_copy.version = "latest" + active_fv_copy.version = "latest" + user_fv_copy.created_timestamp = active_fv_copy.created_timestamp + user_fv_copy.last_updated_timestamp = active_fv_copy.last_updated_timestamp + user_fv_copy.current_version_number = active_fv_copy.current_version_number + if hasattr(active_fv_copy, "materialization_intervals"): + user_fv_copy.materialization_intervals = ( + active_fv_copy.materialization_intervals + ) + + if user_fv_copy != active_fv_copy: + raise FeatureViewPinConflict(feature_view.name, version_tag(pin_version)) + @abstractmethod def apply_materialization( self, diff --git a/sdk/python/tests/unit/test_feature_view_versioning.py b/sdk/python/tests/unit/test_feature_view_versioning.py index 80c2bc808ea..9893dc400a8 100644 --- a/sdk/python/tests/unit/test_feature_view_versioning.py +++ b/sdk/python/tests/unit/test_feature_view_versioning.py @@ -1,3 +1,6 @@ +from datetime import timedelta +from unittest.mock import MagicMock + import pytest from feast.utils import _parse_feature_ref, _strip_version_from_ref @@ -556,3 +559,189 @@ def identity(features_df): udf=identity, udf_string="def identity(features_df):\n return features_df\n", ) + + +def _make_feature_view(name: str, version: str = "latest"): + from feast import FeatureView + from feast.infra.offline_stores.file_source import FileSource + + return FeatureView( + name=name, + entities=[], + ttl=timedelta(days=1), + source=FileSource(path="data/dummy.parquet", timestamp_field="ts"), + version=version, + ) + + +class TestCheckVersionPinConflict: + """Unit tests for BaseRegistry.check_version_pin_conflict. + + The method is a read-only plan-time guard that mirrors the conflict + detection inside apply_feature_view. It should raise FeatureViewPinConflict + when the user simultaneously pins to an existing version AND modifies the + feature view definition. + """ + + def _make_registry(self, active_fv, pinned_fv_or_not_found): + """Return a mock BaseRegistry wired with the given FV responses.""" + from feast.infra.registry.base_registry import BaseRegistry + + registry = MagicMock(spec=BaseRegistry) + registry.get_any_feature_view.return_value = active_fv + + if isinstance(pinned_fv_or_not_found, Exception): + registry.get_feature_view_by_version.side_effect = pinned_fv_or_not_found + else: + registry.get_feature_view_by_version.return_value = pinned_fv_or_not_found + + # Bind the real method to the mock instance so it runs our code. + from feast.infra.registry.base_registry import BaseRegistry + + registry.check_version_pin_conflict = ( + BaseRegistry.check_version_pin_conflict.__get__(registry) + ) + return registry + + def test_latest_version_no_conflict(self): + """Requesting 'latest' never raises — no pin target to conflict with.""" + from feast.infra.registry.base_registry import BaseRegistry + + registry = MagicMock(spec=BaseRegistry) + registry.check_version_pin_conflict = ( + BaseRegistry.check_version_pin_conflict.__get__(registry) + ) + fv = _make_feature_view("driver_stats", version="latest") + registry.check_version_pin_conflict(fv, "my_project") # must not raise + + def test_forward_declaration_no_conflict(self): + """Pinning to a version that doesn't exist yet is a forward declaration — no conflict.""" + from feast.errors import FeatureViewVersionNotFound + + fv = _make_feature_view("driver_stats", version="v2") + registry = self._make_registry( + active_fv=_make_feature_view("driver_stats"), + pinned_fv_or_not_found=FeatureViewVersionNotFound( + "driver_stats", "v2", "my_project" + ), + ) + registry.check_version_pin_conflict(fv, "my_project") # must not raise + + def test_pin_no_schema_change_no_conflict(self): + """Pinning to an existing version without changing the schema is fine.""" + active = _make_feature_view("driver_stats", version="latest") + pinned = _make_feature_view("driver_stats", version="latest") + + # desired == active (same schema), only version pin differs + desired = _make_feature_view("driver_stats", version="v1") + registry = self._make_registry(active_fv=active, pinned_fv_or_not_found=pinned) + registry.check_version_pin_conflict(desired, "my_project") # must not raise + + def test_pin_with_schema_change_raises(self): + """Pinning to an existing version while also changing the schema must raise.""" + from feast import Field + from feast.errors import FeatureViewPinConflict + from feast.types import Float32 + + active = _make_feature_view("driver_stats", version="latest") + pinned = _make_feature_view("driver_stats", version="latest") + + # Desired FV has an extra feature — schema change on top of pin. + from feast import FeatureView + from feast.infra.offline_stores.file_source import FileSource + + desired = FeatureView( + name="driver_stats", + entities=[], + ttl=timedelta(days=1), + source=FileSource(path="data/dummy.parquet", timestamp_field="ts"), + version="v1", + schema=[Field(name="trips", dtype=Float32)], + ) + registry = self._make_registry(active_fv=active, pinned_fv_or_not_found=pinned) + with pytest.raises(FeatureViewPinConflict): + registry.check_version_pin_conflict(desired, "my_project") + + +class TestVersionDiffDisplay: + """Unit tests for version pin display in diff_registry_objects. + + The 'version' spec field is excluded from the generic field-by-field loop and + instead rendered using meta.current_version_number so the plan output reads + 'v2 (pin) -> v1 (pin)' rather than raw proto string values. + """ + + def _diff_fvs(self, current_fv, new_fv): + from feast.diff.registry_diff import diff_registry_objects + from feast.infra.registry.registry import FeastObjectType + + return diff_registry_objects(current_fv, new_fv, FeastObjectType.FEATURE_VIEW) + + def test_no_version_change_no_version_diff(self): + """Two identical unpinned FVs produce no version property diff.""" + fv = _make_feature_view("driver_stats") + result = self._diff_fvs(fv, fv) + version_diffs = [ + p + for p in result.feast_object_property_diffs + if p.property_name == "version" + ] + assert not version_diffs + + def test_pin_shows_in_diff(self): + """Pinning from 'latest' to v1 shows a 'version' property diff.""" + from feast.diff.property_diff import TransitionType + + current = _make_feature_view("driver_stats", version="latest") + desired = _make_feature_view("driver_stats", version="v1") + + result = self._diff_fvs(current, desired) + version_diffs = [ + p + for p in result.feast_object_property_diffs + if p.property_name == "version" + ] + assert len(version_diffs) == 1 + assert version_diffs[0].val_declared == "v1 (pin)" + assert result.transition_type == TransitionType.UPDATE + + def test_unpin_shows_in_diff(self): + """Moving from a pinned version back to latest shows a version property diff.""" + from feast.diff.property_diff import TransitionType + + current = _make_feature_view("driver_stats", version="v2") + # Simulate the stored FV having current_version_number=2 in meta. + current_proto = current.to_proto() + current_proto.meta.current_version_number = 2 + + # Re-hydrate so we have a proper FV with the meta set. + from feast import FeatureView + + current_pinned = FeatureView.from_proto(current_proto) + desired = _make_feature_view("driver_stats", version="latest") + + result = self._diff_fvs(current_pinned, desired) + version_diffs = [ + p + for p in result.feast_object_property_diffs + if p.property_name == "version" + ] + assert len(version_diffs) == 1 + assert version_diffs[0].val_existing == "v2 (pin)" + assert version_diffs[0].val_declared == "latest" + assert result.transition_type == TransitionType.UPDATE + + def test_plan_calls_pin_conflict_check(self): + """store.plan() iterates all feature views and calls check_version_pin_conflict.""" + from feast.infra.registry.base_registry import BaseRegistry + + registry = MagicMock(spec=BaseRegistry) + fv = _make_feature_view("driver_stats", version="v1") + + # Simulate the loop added in store.plan() — call check_version_pin_conflict + # for each FV in desired_repo_contents.feature_views. + all_fvs = [fv] + for f in all_fvs: + registry.check_version_pin_conflict(f, "test_project") + + registry.check_version_pin_conflict.assert_called_once_with(fv, "test_project")