|
1 | 1 | # SPDX-License-Identifier: Apache-2.0 |
2 | 2 | # Copyright 2022 Atlan Pte. Ltd. |
3 | 3 | import json |
4 | | -from typing import Optional |
| 4 | +from typing import Any, Optional |
5 | 5 |
|
6 | 6 | from pyatlan.client.atlan import AtlanClient |
7 | 7 | from pyatlan.error import LogicError, NotFoundError |
| 8 | +from pyatlan.model.core import BusinessAttributes |
8 | 9 | from pyatlan.model.enums import AtlanTypeCategory |
9 | 10 | from pyatlan.model.typedef import AttributeDef, CustomMetadataDef |
10 | 11 |
|
@@ -202,3 +203,30 @@ def get_attributes_for_search_results(cls, set_name: str) -> Optional[list[str]] |
202 | 203 | cls._refresh_cache() |
203 | 204 | return cls._get_attributes_for_search_results(set_id) |
204 | 205 | return None |
| 206 | + |
| 207 | + @classmethod |
| 208 | + def get_custom_metadata( |
| 209 | + cls, |
| 210 | + name: str, |
| 211 | + asset_type: type, |
| 212 | + business_attributes: Optional[dict[str, Any]] = None, |
| 213 | + ) -> BusinessAttributes: |
| 214 | + type_name = asset_type.__name__ |
| 215 | + ba_id = cls.get_id_for_name(name) |
| 216 | + if ba_id is None: |
| 217 | + raise ValueError(f"No custom metadata with the name: {name} exist") |
| 218 | + for a_type in CustomMetadataCache.types_by_asset[type_name]: |
| 219 | + if ( |
| 220 | + hasattr(a_type, "_meta_data_type_name") |
| 221 | + and a_type._meta_data_type_name == name |
| 222 | + ): |
| 223 | + break |
| 224 | + else: |
| 225 | + raise ValueError(f"Custom metadata {name} is not applicable to {type_name}") |
| 226 | + if ba_type := CustomMetadataCache.get_type_for_id(ba_id): |
| 227 | + return ( |
| 228 | + ba_type(business_attributes[ba_id]) |
| 229 | + if business_attributes and ba_id in business_attributes |
| 230 | + else ba_type() |
| 231 | + ) |
| 232 | + raise ValueError(f"Custom metadata {name} is not applicable to {type_name}") |
0 commit comments