Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions dataframely/_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def all_rules_horizontal(rules: IntoExpr | Iterable[IntoExpr]) -> pl.Expr:
function_name="all_rules_horizontal",
args=rules,
use_abs_path=True,
is_elementwise=True,
Comment thread
borchero marked this conversation as resolved.
)


Expand Down Expand Up @@ -61,9 +62,14 @@ def all_rules_required(
) -> pl.Expr:
"""Execute :mod:`~polars.all_horizontal` and `.all` for a set of rules.

Contrary to :meth:`all_rules`, this method raises a
:mod:`~polars.exceptions.ComputeError` at execution time if any rule indicates a
validation failure. The `ComputeError` includes a helpful error message.
This method differs from :meth:`all_rules` in two ways:

- It raises a :mod:`~polars.exceptions.ComputeError` at execution time if any
rule indicates a validation failure. The `ComputeError` includes a helpful error
message.
- It broadcasts the resulting boolean series to the length of the input. This allows
element-wise evaluation and making this a non-blocking operation on the streaming
engine.

Args:
rules: The rules to evaluate.
Expand All @@ -80,5 +86,10 @@ def all_rules_required(
args=rules,
kwargs={"null_is_valid": null_is_valid, "schema_name": schema_name},
use_abs_path=True,
returns_scalar=True,
# NOTE: Conceptually, we're reducing the input to a single boolean value here.
# However, we set this option to ensure that the plugin does not become
# blocking on the streaming engine. A single boolean value is simply
# broadcast and we're indifferent to actually finding all validation failures
# during `validate` (and simply fail-fast).
is_elementwise=True,
)
6 changes: 6 additions & 0 deletions dataframely/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,12 @@ def validate(
should raise upon failure. If `False`, the returned lazy frame will
fail to collect if the validation does not pass.
Note:
If running on the streaming engine, lazy validation will potentially
not surface *all* validation issues as the validation is aborted
once the first failure is encountered. Likewise, the reported
validation failure can be non-deterministic.
Returns:
The input eager or lazy frame, wrapped in a generic version of the
input's data frame type to reflect schema adherence. Columns not defined
Expand Down
3 changes: 2 additions & 1 deletion src/polars_plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ pub fn all_rules_required(
// neither actually runs the filter logic, nor does it copy any data. It's essentially a no-op
// that is not optimized away in a lazy frame.
if failures.is_empty() {
return Ok(BooleanChunked::new(PlSmallStr::EMPTY, [true]).into_series());
let column = Column::new_scalar(PlSmallStr::EMPTY, Scalar::from(true), 1);
return Ok(column.take_materialized_series());
Comment thread
borchero marked this conversation as resolved.
}

// Aggregate failure counts into a validation error.
Expand Down
18 changes: 15 additions & 3 deletions tests/schema/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_invalid_column_contents(
df = df_type({"a": [1, 2, 3], "b": ["x", "longtext", None], "c": ["1", None, "3"]})
with pytest.raises(
ValidationError if eager else plexc.ComputeError,
match=r"2 rules failed validation",
match=r"2 rules failed validation" if eager else None,
):
_validate_and_collect(MySchema, df, eager=eager)
assert not MySchema.is_valid(df)
Expand Down Expand Up @@ -143,7 +143,7 @@ def test_violated_custom_rule(
df = df_type({"a": [1, 1, 2, 3, 3], "b": [2, 2, 2, 4, 5]})
with pytest.raises(
ValidationError if eager else plexc.ComputeError,
match=r"2 rules failed validation",
match=r"2 rules failed validation" if eager else None,
):
_validate_and_collect(MyComplexSchema, df, eager=eager)
assert not MyComplexSchema.is_valid(df)
Expand Down Expand Up @@ -285,7 +285,19 @@ def test_multiple_unique_columns_both_invalid(
df = df_type({"a": [1, 1, 3], "b": ["x", "y", "y"]})
with pytest.raises(
ValidationError if eager else plexc.ComputeError,
match=r"2 rules failed validation",
match=r"2 rules failed validation" if eager else None,
):
_validate_and_collect(MultiUniqueSchema, df, eager=eager)
assert not MultiUniqueSchema.is_valid(df)


# ----------------------------------- PERFORMANCE ------------------------------------ #


def test_lazy_validate_does_not_block_streaming_engine() -> None:
schema = create_schema("test", {"a": dy.Int64(), "b": dy.Int64()})
lf = pl.LazyFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).lazy()
out = schema.validate(lf, eager=False)
graph = out.show_graph(engine="streaming", plan_stage="physical", raw_output=True)
assert graph is not None
assert "in-memory-map" not in graph
Loading