diff --git a/dataframely/_plugin.py b/dataframely/_plugin.py index 7616af3..95486dd 100644 --- a/dataframely/_plugin.py +++ b/dataframely/_plugin.py @@ -30,6 +30,7 @@ def all_rules_horizontal(rules: IntoExpr | Iterable[IntoExpr]) -> pl.Expr: function_name="all_rules_horizontal", args=rules, use_abs_path=True, + is_elementwise=True, ) @@ -61,9 +62,14 @@ def all_rules_required( ) -> pl.Expr: """Execute :mod:`~polars.all_horizontal` and `.all` for a set of rules. - Contrary to :meth:`all_rules`, this method raises a - :mod:`~polars.exceptions.ComputeError` at execution time if any rule indicates a - validation failure. The `ComputeError` includes a helpful error message. + This method differs from :meth:`all_rules` in two ways: + + - It raises a :mod:`~polars.exceptions.ComputeError` at execution time if any + rule indicates a validation failure. The `ComputeError` includes a helpful error + message. + - It broadcasts the resulting boolean series to the length of the input. This allows + element-wise evaluation and making this a non-blocking operation on the streaming + engine. Args: rules: The rules to evaluate. @@ -80,5 +86,10 @@ def all_rules_required( args=rules, kwargs={"null_is_valid": null_is_valid, "schema_name": schema_name}, use_abs_path=True, - returns_scalar=True, + # NOTE: Conceptually, we're reducing the input to a single boolean value here. + # However, we set this option to ensure that the plugin does not become + # blocking on the streaming engine. A single boolean value is simply + # broadcast and we're indifferent to actually finding all validation failures + # during `validate` (and simply fail-fast). + is_elementwise=True, ) diff --git a/dataframely/schema.py b/dataframely/schema.py index 0836974..213925f 100644 --- a/dataframely/schema.py +++ b/dataframely/schema.py @@ -549,6 +549,12 @@ def validate( should raise upon failure. If `False`, the returned lazy frame will fail to collect if the validation does not pass. + Note: + If running on the streaming engine, lazy validation will potentially + not surface *all* validation issues as the validation is aborted + once the first failure is encountered. Likewise, the reported + validation failure can be non-deterministic. + Returns: The input eager or lazy frame, wrapped in a generic version of the input's data frame type to reflect schema adherence. Columns not defined diff --git a/src/polars_plugin/mod.rs b/src/polars_plugin/mod.rs index 06cb9b9..c42d4f1 100644 --- a/src/polars_plugin/mod.rs +++ b/src/polars_plugin/mod.rs @@ -81,7 +81,8 @@ pub fn all_rules_required( // neither actually runs the filter logic, nor does it copy any data. It's essentially a no-op // that is not optimized away in a lazy frame. if failures.is_empty() { - return Ok(BooleanChunked::new(PlSmallStr::EMPTY, [true]).into_series()); + let column = Column::new_scalar(PlSmallStr::EMPTY, Scalar::from(true), 1); + return Ok(column.take_materialized_series()); } // Aggregate failure counts into a validation error. diff --git a/tests/schema/test_validate.py b/tests/schema/test_validate.py index 76af480..7ff383b 100644 --- a/tests/schema/test_validate.py +++ b/tests/schema/test_validate.py @@ -115,7 +115,7 @@ def test_invalid_column_contents( df = df_type({"a": [1, 2, 3], "b": ["x", "longtext", None], "c": ["1", None, "3"]}) with pytest.raises( ValidationError if eager else plexc.ComputeError, - match=r"2 rules failed validation", + match=r"2 rules failed validation" if eager else None, ): _validate_and_collect(MySchema, df, eager=eager) assert not MySchema.is_valid(df) @@ -143,7 +143,7 @@ def test_violated_custom_rule( df = df_type({"a": [1, 1, 2, 3, 3], "b": [2, 2, 2, 4, 5]}) with pytest.raises( ValidationError if eager else plexc.ComputeError, - match=r"2 rules failed validation", + match=r"2 rules failed validation" if eager else None, ): _validate_and_collect(MyComplexSchema, df, eager=eager) assert not MyComplexSchema.is_valid(df) @@ -285,7 +285,19 @@ def test_multiple_unique_columns_both_invalid( df = df_type({"a": [1, 1, 3], "b": ["x", "y", "y"]}) with pytest.raises( ValidationError if eager else plexc.ComputeError, - match=r"2 rules failed validation", + match=r"2 rules failed validation" if eager else None, ): _validate_and_collect(MultiUniqueSchema, df, eager=eager) assert not MultiUniqueSchema.is_valid(df) + + +# ----------------------------------- PERFORMANCE ------------------------------------ # + + +def test_lazy_validate_does_not_block_streaming_engine() -> None: + schema = create_schema("test", {"a": dy.Int64(), "b": dy.Int64()}) + lf = pl.LazyFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).lazy() + out = schema.validate(lf, eager=False) + graph = out.show_graph(engine="streaming", plan_stage="physical", raw_output=True) + assert graph is not None + assert "in-memory-map" not in graph