Skip to content

Commit c181d51

Browse files
authored
Merge pull request #723 from atlanhq/APP-8724
APP-8724: Add support for `Databricks` system tables in the `Databricks` crawler
2 parents 3ca5560 + 3da56fd commit c181d51

4 files changed

Lines changed: 432 additions & 2 deletions

File tree

pyatlan/model/packages/base/crawler.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,41 @@ def build_flat_hierarchical_filter(raw_filter: Optional[list]) -> str:
114114
except (AttributeError, TypeError):
115115
raise ErrorCode.UNABLE_TO_TRANSLATE_FILTERS.exception_with_parameters()
116116

117+
@staticmethod
118+
def build_selective_hierarchical_filter(raw_filter: Optional[dict]) -> str:
119+
"""
120+
Build a selective hierarchical filter from the provided map of databases and schemas.
121+
122+
Transforms a mapping of database names to schema lists into a nested dictionary
123+
structure where each database contains a dictionary of its schemas.
124+
125+
:param raw_filter: Dictionary keyed by database name with each value being a list of schemas.
126+
Example: {"samples": ["ti1", "ti2"], "ex-system": []}
127+
:returns: A JSON string representing the hierarchical filter structure.
128+
Example: "{"samples":{"ti1":{},"ti2":{}},"ex-system":{}}"
129+
:raises InvalidRequestException: If the provided filter cannot be translated
130+
"""
131+
if not raw_filter:
132+
return ""
133+
134+
try:
135+
to_include: Dict[str, Dict[str, Dict]] = {}
136+
137+
for db_name, schemas in raw_filter.items():
138+
# Create a dictionary for this database
139+
schema_dict: Dict[str, Any] = {}
140+
141+
# Add each schema as a key with an empty dict as value
142+
for schema in schemas:
143+
schema_dict[schema] = {}
144+
145+
# Add the database with its schemas (empty dict if no schemas)
146+
to_include[db_name] = schema_dict
147+
148+
return dumps(to_include)
149+
except (AttributeError, TypeError):
150+
raise ErrorCode.UNABLE_TO_TRANSLATE_FILTERS.exception_with_parameters()
151+
117152
@staticmethod
118153
def build_flat_filter(raw_filter: Optional[list]) -> str:
119154
"""

pyatlan/model/packages/databricks_crawler.py

Lines changed: 154 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from __future__ import annotations
22

33
from enum import Enum
4-
from typing import List, Optional
4+
from typing import Any, List, Optional
5+
from warnings import warn
56

67
from pyatlan.client.atlan import AtlanClient
8+
from pyatlan.model.core import AtlanObject
79
from pyatlan.model.enums import AtlanConnectorType, WorkflowPackage
810
from pyatlan.model.packages.base.crawler import AbstractCrawler
911
from pyatlan.model.workflow import WorkflowMetadata
@@ -36,6 +38,22 @@ class DatabricksCrawler(AbstractCrawler):
3638
class ExtractionMethod(str, Enum):
3739
JDBC = "jdbc"
3840
REST = "rest"
41+
SYSTEM_TABLES = "system-tables"
42+
43+
class RegexAssetTypes(str, Enum):
44+
DATABASES = "database"
45+
SCHEMAS = "schema"
46+
TABLE_VIEWS = "table"
47+
48+
class AssetsSelectionCriteria(str, Enum):
49+
INCLUDE_BY_HIERARCHY = "include-filter-system-tables"
50+
EXCLUDE_BY_HIERARCHY = "exclude-filter-system-tables"
51+
INCLUDE_BY_REGEX = "include-regex-system-tables"
52+
EXCLUDE_BY_REGEX = "exclude-regex-system-tables"
53+
54+
class AssetsSelection(AtlanObject):
55+
type: DatabricksCrawler.AssetsSelectionCriteria
56+
values: Any
3957

4058
def __init__(
4159
self,
@@ -112,12 +130,17 @@ def basic_auth(
112130
self, personal_access_token: str, http_path: str
113131
) -> DatabricksCrawler:
114132
"""
115-
Set up the crawler to use basic authentication.
133+
(DEPRECATED) Set up the crawler to use basic authentication.
116134
117135
:param personal_access_token: through which to access Databricks instance
118136
:param http_path: HTTP path of your Databricks instance
119137
:returns: crawler, set up to use basic authentication
120138
"""
139+
warn(
140+
"This method is deprecated, please use 'pat()' instead, which offers identical functionality.",
141+
DeprecationWarning,
142+
stacklevel=2,
143+
)
121144
local_creds = {
122145
"authType": "basic",
123146
"username": "",
@@ -130,6 +153,28 @@ def basic_auth(
130153
self._credentials_body.update(local_creds)
131154
return self
132155

156+
def pat(self, access_token: str, sql_warehouse_id: str) -> DatabricksCrawler:
157+
"""
158+
Set up the crawler to use PAT authentication.
159+
160+
:param access_token: through which to access Databricks instance
161+
:param sql_warehouse_id: ID of the associated SQL warehouse
162+
if this data source is backed by a SQL warehouse. eg: `3d939b0cc668be06`
163+
ref: https://docs.databricks.com/api/workspace/datasources/list#warehouse_id
164+
:returns: crawler, set up to use PAT
165+
"""
166+
local_creds = {
167+
"authType": "basic",
168+
"username": "",
169+
"password": access_token,
170+
"connector_type": "dual",
171+
"extra": {
172+
"__http_path": f"/sql/1.0/warehouses/{sql_warehouse_id}",
173+
},
174+
}
175+
self._credentials_body.update(local_creds)
176+
return self
177+
133178
def aws_service(self, client_id: str, client_secret: str) -> DatabricksCrawler:
134179
"""
135180
Set up the crawler to use AWS service principal.
@@ -188,6 +233,35 @@ def metadata_extraction_method(
188233
self._parameters.append({"name": "extract-strategy", "value": type.value})
189234
return self
190235

236+
def enable_cross_workspace_discovery(
237+
self, include: bool = False
238+
) -> DatabricksCrawler:
239+
"""
240+
Whether to enable cross-workspace discovery to discover assets from other workspaces.
241+
242+
:param include: if True, cross-workspace discovery will be included while crawling Databricks, default: False
243+
:returns: crawler, set to include or exclude cross-workspace discovery
244+
"""
245+
self._parameters.append(
246+
{
247+
"name": "enable-cross-workspace-discovery",
248+
"value": "true" if include else "false",
249+
}
250+
)
251+
return self
252+
253+
def enable_incremental_extraction(self, include: bool = False) -> DatabricksCrawler:
254+
"""
255+
Whether to enable or disable schema incremental extraction on source.
256+
257+
:param include: if True, incremental extraction will be included while crawling Databricks, default: False
258+
:returns: crawler, set to include or exclude incremental extraction
259+
"""
260+
self._parameters.append(
261+
{"name": "incremental-extraction", "value": "true" if include else "false"}
262+
)
263+
return self
264+
191265
def enable_view_lineage(self, include: bool = True) -> DatabricksCrawler:
192266
"""
193267
Whether to enable view lineage as part of crawling Databricks.
@@ -280,6 +354,81 @@ def exclude_for_rest_api(self, assets: List[str]) -> DatabricksCrawler:
280354
)
281355
return self
282356

357+
def asset_selection_for_system_tables(
358+
self, selection_criteria: List[DatabricksCrawler.AssetsSelection]
359+
) -> DatabricksCrawler:
360+
"""
361+
Defines the filter for system table assets to include or exclude when crawling.
362+
363+
This method allows you to configure asset selection specifically for Databricks
364+
system tables using various selection criteria including hierarchical filtering
365+
and regex-based filtering.
366+
367+
:param selection_criteria: List of selection criteria objects containing
368+
the type of selection (include/exclude) and the corresponding values
369+
for filtering system table assets
370+
:returns: crawler, configured with system table asset selection filters
371+
"""
372+
for criteria in selection_criteria:
373+
if (
374+
criteria.type
375+
== DatabricksCrawler.AssetsSelectionCriteria.INCLUDE_BY_HIERARCHY
376+
):
377+
include_assets = criteria.values or {}
378+
to_include = self.build_selective_hierarchical_filter(include_assets)
379+
self._parameters.append(
380+
dict(name=criteria.type.value, value=to_include or "{}")
381+
)
382+
383+
elif (
384+
criteria.type
385+
== DatabricksCrawler.AssetsSelectionCriteria.EXCLUDE_BY_HIERARCHY
386+
):
387+
exclude_assets = criteria.values or {}
388+
to_exclude = self.build_selective_hierarchical_filter(exclude_assets)
389+
self._parameters.append(
390+
dict(name=criteria.type.value, value=to_exclude or "{}")
391+
)
392+
393+
elif (
394+
criteria.type
395+
== DatabricksCrawler.AssetsSelectionCriteria.INCLUDE_BY_REGEX
396+
):
397+
include_regex = criteria.values
398+
asset_type = include_regex.get("asset_type")
399+
if asset_type not in DatabricksCrawler.RegexAssetTypes:
400+
raise ValueError(
401+
f"Invalid asset_type: {asset_type}. Must be one of {[e.value for e in DatabricksCrawler.RegexAssetTypes]}"
402+
)
403+
self._parameters.append(
404+
dict(
405+
name=f"include-{asset_type.value}-regex",
406+
value=include_regex.get("regex", ""),
407+
)
408+
)
409+
410+
elif (
411+
criteria.type
412+
== DatabricksCrawler.AssetsSelectionCriteria.EXCLUDE_BY_REGEX
413+
):
414+
exclude_regex = criteria.values
415+
asset_type = exclude_regex.get("asset_type")
416+
if asset_type not in DatabricksCrawler.RegexAssetTypes:
417+
raise ValueError(
418+
f"Invalid asset_type: {asset_type}. Must be one of {[e.value for e in DatabricksCrawler.RegexAssetTypes]}"
419+
)
420+
self._parameters.append(
421+
dict(
422+
# NOTE: temp-table-regex-system-tables is the name
423+
# of the parameter for exclude regex for system tables (TABLE_VIEWS)
424+
name="temp-table-regex-system-tables"
425+
if asset_type == DatabricksCrawler.RegexAssetTypes.TABLE_VIEWS
426+
else f"exclude-{asset_type.value}-regex",
427+
value=exclude_regex.get("regex", ""),
428+
)
429+
)
430+
return self
431+
283432
def sql_warehouse(self, warehouse_ids: List[str]) -> DatabricksCrawler:
284433
"""
285434
Defines the filter for SQL warehouses to include when crawling.
@@ -380,3 +529,6 @@ def _get_metadata(self) -> WorkflowMetadata:
380529
name=f"{self._PACKAGE_PREFIX}-{self._epoch}",
381530
namespace="default",
382531
)
532+
533+
534+
DatabricksCrawler.AssetsSelection.update_forward_refs()

0 commit comments

Comments
 (0)