Skip to content

Commit 330da12

Browse files
committed
feat: add support for incremental for reference rules
1 parent 5c32f47 commit 330da12

1 file changed

Lines changed: 98 additions & 17 deletions

File tree

pyatlan/model/assets/core/data_quality_rule.py

Lines changed: 98 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -188,20 +188,11 @@ def column_level_rule_creator(
188188
)
189189
template_config = client.dq_template_config_cache.get_template_config(rule_type)
190190

191-
asset_for_validation = asset
192-
if row_scope_filtering_enabled and asset.qualified_name:
193-
from pyatlan.model.fluent_search import FluentSearch
194-
195-
search_request = (
196-
FluentSearch()
197-
.where(Asset.QUALIFIED_NAME.eq(asset.qualified_name))
198-
.include_on_results(
199-
Asset.ASSET_DQ_ROW_SCOPE_FILTER_COLUMN_QUALIFIED_NAME
200-
)
201-
).to_request()
202-
results = client.asset.search(search_request)
203-
if results.count == 1:
204-
asset_for_validation = results.current_page()[0]
191+
asset_for_validation, target_table_asset = (
192+
DataQualityRule.Attributes._fetch_assets_for_row_scope_validation(
193+
client, asset, rule_conditions, row_scope_filtering_enabled
194+
)
195+
)
205196

206197
validated_threshold_operator = (
207198
DataQualityRule.Attributes._validate_template_features(
@@ -211,6 +202,7 @@ def column_level_rule_creator(
211202
template_config,
212203
threshold_compare_operator,
213204
asset_for_validation,
205+
target_table_asset,
214206
)
215207
)
216208

@@ -334,14 +326,37 @@ def updater(
334326
template_config = client.dq_template_config_cache.get_template_config(
335327
retrieved_rule_type
336328
)
329+
330+
final_rule_conditions = rule_conditions or (
331+
search_result.dq_rule_config_arguments.dq_rule_config_rule_conditions # type: ignore[attr-defined]
332+
if search_result.dq_rule_config_arguments is not None # type: ignore[attr-defined]
333+
else None
334+
)
335+
336+
final_row_scope_filtering_enabled = (
337+
row_scope_filtering_enabled or retrieved_row_scope_filtering_enabled
338+
)
339+
if retrieved_asset:
340+
retrieved_asset, target_table_asset = (
341+
DataQualityRule.Attributes._fetch_assets_for_row_scope_validation(
342+
client,
343+
retrieved_asset,
344+
final_rule_conditions,
345+
final_row_scope_filtering_enabled,
346+
)
347+
)
348+
else:
349+
target_table_asset = None
350+
337351
validated_threshold_operator = (
338352
DataQualityRule.Attributes._validate_template_features(
339353
retrieved_rule_type,
340354
rule_conditions,
341-
row_scope_filtering_enabled,
355+
final_row_scope_filtering_enabled,
342356
template_config,
343357
threshold_compare_operator or retrieved_threshold_compare_operator,
344358
retrieved_asset,
359+
target_table_asset,
345360
)
346361
)
347362

@@ -365,8 +380,7 @@ def updater(
365380
),
366381
dq_rule_base_dataset_qualified_name=retrieved_asset.qualified_name,
367382
dq_rule_alert_priority=alert_priority or retrieved_alert_priority,
368-
dq_rule_row_scope_filtering_enabled=row_scope_filtering_enabled
369-
or retrieved_row_scope_filtering_enabled,
383+
dq_rule_row_scope_filtering_enabled=final_row_scope_filtering_enabled,
370384
dq_rule_base_dataset=retrieved_asset,
371385
qualified_name=qualified_name,
372386
dq_rule_dimension=dimension or retrieved_dimension,
@@ -1097,6 +1111,62 @@ class Attributes(DataQuality.Attributes):
10971111
default=None, description=""
10981112
) # relationship
10991113

1114+
@staticmethod
1115+
def _fetch_assets_for_row_scope_validation(
1116+
client: AtlanClient,
1117+
base_asset: Asset,
1118+
rule_conditions: Optional[str],
1119+
row_scope_filtering_enabled: bool,
1120+
) -> tuple[Asset, Optional[Asset]]:
1121+
asset_for_validation = base_asset
1122+
target_table_asset = None
1123+
1124+
if not row_scope_filtering_enabled:
1125+
return asset_for_validation, target_table_asset
1126+
1127+
# Extract target_table from rule_conditions
1128+
target_table_qualified_name = None
1129+
if rule_conditions:
1130+
try:
1131+
rule_conditions_json = json.loads(rule_conditions)
1132+
conditions = rule_conditions_json.get("conditions", [])
1133+
if conditions:
1134+
condition_value = conditions[0].get("value", {})
1135+
target_table_qualified_name = condition_value.get(
1136+
"target_table"
1137+
)
1138+
except (json.JSONDecodeError, (KeyError, TypeError)):
1139+
pass
1140+
1141+
qualified_names_to_search = []
1142+
if base_asset.qualified_name:
1143+
qualified_names_to_search.append(base_asset.qualified_name)
1144+
if target_table_qualified_name:
1145+
qualified_names_to_search.append(target_table_qualified_name)
1146+
1147+
if qualified_names_to_search:
1148+
from pyatlan.model.fluent_search import FluentSearch
1149+
1150+
search_request = (
1151+
FluentSearch()
1152+
.where(Asset.QUALIFIED_NAME.within(qualified_names_to_search))
1153+
.include_on_results(
1154+
Asset.ASSET_DQ_ROW_SCOPE_FILTER_COLUMN_QUALIFIED_NAME
1155+
)
1156+
).to_request()
1157+
results = client.asset.search(search_request)
1158+
1159+
for result in results.current_page():
1160+
if result.qualified_name == base_asset.qualified_name:
1161+
asset_for_validation = result
1162+
elif (
1163+
target_table_qualified_name
1164+
and result.qualified_name == target_table_qualified_name
1165+
):
1166+
target_table_asset = result
1167+
1168+
return asset_for_validation, target_table_asset
1169+
11001170
@staticmethod
11011171
def _get_template_config_value(
11021172
config_value: str,
@@ -1128,6 +1198,7 @@ def _validate_template_features(
11281198
DataQualityRuleThresholdCompareOperator
11291199
] = None,
11301200
asset: Optional[Asset] = None,
1201+
target_table_asset: Optional[Asset] = None,
11311202
) -> Optional[DataQualityRuleThresholdCompareOperator]:
11321203
if not template_config or not template_config.get("config"):
11331204
return None
@@ -1160,6 +1231,16 @@ def _validate_template_features(
11601231
getattr(asset, "qualified_name", "unknown")
11611232
)
11621233

1234+
if target_table_asset:
1235+
if not getattr(
1236+
target_table_asset,
1237+
"asset_d_q_row_scope_filter_column_qualified_name",
1238+
None,
1239+
):
1240+
raise ErrorCode.DQ_ROW_SCOPE_FILTER_COLUMN_MISSING.exception_with_parameters(
1241+
getattr(target_table_asset, "qualified_name", "unknown")
1242+
)
1243+
11631244
if rule_conditions:
11641245
allowed_rule_conditions = (
11651246
DataQualityRule.Attributes._get_template_config_value(

0 commit comments

Comments
 (0)