Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def tag_objects_for_keep_delete_update_add(
)


FIELDS_TO_IGNORE = {"project"}
# "version" is handled separately below with human-readable pin display.
FIELDS_TO_IGNORE = {"project", "version"}


def diff_registry_objects(
Expand Down Expand Up @@ -189,6 +190,30 @@ def diff_registry_objects(
getattr(new_spec, _field.name),
)
)
# Show version pin/unpin changes using the resolved numeric version from meta
# rather than the raw spec.version string, giving more useful plan output.
# e.g. "version: v2 -> v1 (pin)" or "version: v1 (pin) -> latest"
if hasattr(current_proto, "meta") and hasattr(new_proto, "meta"):
from feast.version_utils import parse_version, version_tag

cur_pin = getattr(current_proto.meta, "current_version_number", 0)
new_ver_str = getattr(new_spec, "version", None) or "latest"
is_latest, pin_target = parse_version(new_ver_str)

if not is_latest and pin_target is not None:
# User is pinning to a specific version.
cur_display = version_tag(cur_pin) if cur_pin else "latest"
new_display = f"{version_tag(pin_target)} (pin)"
if cur_display != new_display:
transition = TransitionType.UPDATE
property_diffs.append(PropertyDiff("version", cur_display, new_display))
elif cur_pin:
# User is moving back to latest (unpinning).
property_diffs.append(
PropertyDiff("version", f"{version_tag(cur_pin)} (pin)", "latest")
)
transition = TransitionType.UPDATE

return FeastObjectDiff(
name=new_spec.name,
feast_object_type=object_type,
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,16 @@ def plan(
# the desired repo state.
registry_diff = diff_between(self.registry, self.project, desired_repo_contents)

# Surface version pin conflicts at plan time so users see the error before
# running feast apply. Applies to all feature view types that support versioning.
all_fvs = (
list(desired_repo_contents.feature_views)
+ list(desired_repo_contents.stream_feature_views)
+ list(desired_repo_contents.on_demand_feature_views)
)
for fv in all_fvs:
self.registry.check_version_pin_conflict(fv, self.project)

if progress_ctx:
progress_ctx.update_phase_progress("Computing infrastructure diff")

Expand Down
67 changes: 67 additions & 0 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from feast.errors import (
ConflictingFeatureViewNames,
FeatureViewNotFoundException,
FeatureViewPinConflict,
FeatureViewVersionNotFound,
)
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -538,6 +540,71 @@ def get_feature_view_by_version(
"get_feature_view_by_version is not implemented for this registry"
)

def check_version_pin_conflict(
self, feature_view: BaseFeatureView, project: str
) -> None:
"""Raise FeatureViewPinConflict if a version pin conflicts with schema changes.

Mirrors the conflict check inside apply_feature_view so that feast plan
can surface the error before feast apply is run. The check is read-only:
no registry state is modified.

A conflict occurs when the user simultaneously pins to an existing version
AND modifies the feature view definition (schema or UDF change).

Args:
feature_view: The desired feature view from the repo config.
project: Feast project name.

Raises:
FeatureViewPinConflict: if the pin and schema change conflict.
"""
from feast.version_utils import parse_version, version_tag

fv_version = getattr(feature_view, "version", None)
if not fv_version:
return

is_latest, pin_version = parse_version(fv_version)
if is_latest or pin_version is None:
return

# Check whether the target version already exists in the registry.
try:
self.get_feature_view_by_version(
feature_view.name, project, pin_version, allow_cache=True
)
except (FeatureViewVersionNotFound, NotImplementedError):
# Version doesn't exist → forward declaration; no conflict possible.
return
except Exception:
return

# Version exists → check whether the user also modified the definition.
try:
active_fv = self.get_any_feature_view(
feature_view.name, project, allow_cache=True
)
except (FeatureViewNotFoundException, NotImplementedError):
return

user_fv_copy = feature_view.__copy__()
active_fv_copy = active_fv.__copy__()

# Normalise fields that legitimately differ between desired and stored FVs.
user_fv_copy.version = "latest"
active_fv_copy.version = "latest"
user_fv_copy.created_timestamp = active_fv_copy.created_timestamp
user_fv_copy.last_updated_timestamp = active_fv_copy.last_updated_timestamp
user_fv_copy.current_version_number = active_fv_copy.current_version_number
if hasattr(active_fv_copy, "materialization_intervals"):
user_fv_copy.materialization_intervals = (
active_fv_copy.materialization_intervals
)

if user_fv_copy != active_fv_copy:
raise FeatureViewPinConflict(feature_view.name, version_tag(pin_version))

@abstractmethod
def apply_materialization(
self,
Expand Down
189 changes: 189 additions & 0 deletions sdk/python/tests/unit/test_feature_view_versioning.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from datetime import timedelta
from unittest.mock import MagicMock

import pytest

from feast.utils import _parse_feature_ref, _strip_version_from_ref
Expand Down Expand Up @@ -556,3 +559,189 @@ def identity(features_df):
udf=identity,
udf_string="def identity(features_df):\n return features_df\n",
)


def _make_feature_view(name: str, version: str = "latest"):
from feast import FeatureView
from feast.infra.offline_stores.file_source import FileSource

return FeatureView(
name=name,
entities=[],
ttl=timedelta(days=1),
source=FileSource(path="data/dummy.parquet", timestamp_field="ts"),
version=version,
)


class TestCheckVersionPinConflict:
"""Unit tests for BaseRegistry.check_version_pin_conflict.

The method is a read-only plan-time guard that mirrors the conflict
detection inside apply_feature_view. It should raise FeatureViewPinConflict
when the user simultaneously pins to an existing version AND modifies the
feature view definition.
"""

def _make_registry(self, active_fv, pinned_fv_or_not_found):
"""Return a mock BaseRegistry wired with the given FV responses."""
from feast.infra.registry.base_registry import BaseRegistry

registry = MagicMock(spec=BaseRegistry)
registry.get_any_feature_view.return_value = active_fv

if isinstance(pinned_fv_or_not_found, Exception):
registry.get_feature_view_by_version.side_effect = pinned_fv_or_not_found
else:
registry.get_feature_view_by_version.return_value = pinned_fv_or_not_found

# Bind the real method to the mock instance so it runs our code.
from feast.infra.registry.base_registry import BaseRegistry

registry.check_version_pin_conflict = (
BaseRegistry.check_version_pin_conflict.__get__(registry)
)
return registry

def test_latest_version_no_conflict(self):
"""Requesting 'latest' never raises — no pin target to conflict with."""
from feast.infra.registry.base_registry import BaseRegistry

registry = MagicMock(spec=BaseRegistry)
registry.check_version_pin_conflict = (
BaseRegistry.check_version_pin_conflict.__get__(registry)
)
fv = _make_feature_view("driver_stats", version="latest")
registry.check_version_pin_conflict(fv, "my_project") # must not raise

def test_forward_declaration_no_conflict(self):
"""Pinning to a version that doesn't exist yet is a forward declaration — no conflict."""
from feast.errors import FeatureViewVersionNotFound

fv = _make_feature_view("driver_stats", version="v2")
registry = self._make_registry(
active_fv=_make_feature_view("driver_stats"),
pinned_fv_or_not_found=FeatureViewVersionNotFound(
"driver_stats", "v2", "my_project"
),
)
registry.check_version_pin_conflict(fv, "my_project") # must not raise

def test_pin_no_schema_change_no_conflict(self):
"""Pinning to an existing version without changing the schema is fine."""
active = _make_feature_view("driver_stats", version="latest")
pinned = _make_feature_view("driver_stats", version="latest")

# desired == active (same schema), only version pin differs
desired = _make_feature_view("driver_stats", version="v1")
registry = self._make_registry(active_fv=active, pinned_fv_or_not_found=pinned)
registry.check_version_pin_conflict(desired, "my_project") # must not raise

def test_pin_with_schema_change_raises(self):
"""Pinning to an existing version while also changing the schema must raise."""
from feast import Field
from feast.errors import FeatureViewPinConflict
from feast.types import Float32

active = _make_feature_view("driver_stats", version="latest")
pinned = _make_feature_view("driver_stats", version="latest")

# Desired FV has an extra feature — schema change on top of pin.
from feast import FeatureView
from feast.infra.offline_stores.file_source import FileSource

desired = FeatureView(
name="driver_stats",
entities=[],
ttl=timedelta(days=1),
source=FileSource(path="data/dummy.parquet", timestamp_field="ts"),
version="v1",
schema=[Field(name="trips", dtype=Float32)],
)
registry = self._make_registry(active_fv=active, pinned_fv_or_not_found=pinned)
with pytest.raises(FeatureViewPinConflict):
registry.check_version_pin_conflict(desired, "my_project")


class TestVersionDiffDisplay:
"""Unit tests for version pin display in diff_registry_objects.

The 'version' spec field is excluded from the generic field-by-field loop and
instead rendered using meta.current_version_number so the plan output reads
'v2 (pin) -> v1 (pin)' rather than raw proto string values.
"""

def _diff_fvs(self, current_fv, new_fv):
from feast.diff.registry_diff import diff_registry_objects
from feast.infra.registry.registry import FeastObjectType

return diff_registry_objects(current_fv, new_fv, FeastObjectType.FEATURE_VIEW)

def test_no_version_change_no_version_diff(self):
"""Two identical unpinned FVs produce no version property diff."""
fv = _make_feature_view("driver_stats")
result = self._diff_fvs(fv, fv)
version_diffs = [
p
for p in result.feast_object_property_diffs
if p.property_name == "version"
]
assert not version_diffs

def test_pin_shows_in_diff(self):
"""Pinning from 'latest' to v1 shows a 'version' property diff."""
from feast.diff.property_diff import TransitionType

current = _make_feature_view("driver_stats", version="latest")
desired = _make_feature_view("driver_stats", version="v1")

result = self._diff_fvs(current, desired)
version_diffs = [
p
for p in result.feast_object_property_diffs
if p.property_name == "version"
]
assert len(version_diffs) == 1
assert version_diffs[0].val_declared == "v1 (pin)"
assert result.transition_type == TransitionType.UPDATE

def test_unpin_shows_in_diff(self):
"""Moving from a pinned version back to latest shows a version property diff."""
from feast.diff.property_diff import TransitionType

current = _make_feature_view("driver_stats", version="v2")
# Simulate the stored FV having current_version_number=2 in meta.
current_proto = current.to_proto()
current_proto.meta.current_version_number = 2

# Re-hydrate so we have a proper FV with the meta set.
from feast import FeatureView

current_pinned = FeatureView.from_proto(current_proto)
desired = _make_feature_view("driver_stats", version="latest")

result = self._diff_fvs(current_pinned, desired)
version_diffs = [
p
for p in result.feast_object_property_diffs
if p.property_name == "version"
]
assert len(version_diffs) == 1
assert version_diffs[0].val_existing == "v2 (pin)"
assert version_diffs[0].val_declared == "latest"
assert result.transition_type == TransitionType.UPDATE

def test_plan_calls_pin_conflict_check(self):
"""store.plan() iterates all feature views and calls check_version_pin_conflict."""
from feast.infra.registry.base_registry import BaseRegistry

registry = MagicMock(spec=BaseRegistry)
fv = _make_feature_view("driver_stats", version="v1")

# Simulate the loop added in store.plan() — call check_version_pin_conflict
# for each FV in desired_repo_contents.feature_views.
all_fvs = [fv]
for f in all_fvs:
registry.check_version_pin_conflict(f, "test_project")

registry.check_version_pin_conflict.assert_called_once_with(fv, "test_project")
Loading