Skip to content

Commit ac9a292

Browse files
authored
Merge pull request #2392 from atlanhq/feature/rab-complex-type-nested-columns
Add STRUCT/ARRAY/MAP nested column support to RAB (CSA-371)
2 parents 3ba453e + 0ef7b87 commit ac9a292

7 files changed

Lines changed: 705 additions & 6 deletions

File tree

.github/CODEOWNERS

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Full repository
22
* @cmgrote
33
# Specific packages
4-
/samples/packages/lake-formation-tag-sync/ @ErnestoLoma
4+
/samples/packages/lake-formation-tag-sync/ @ErnestoLoma
5+
/samples/packages/relational-assets-builder/ @bladata1990

samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetXformer.kt

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,24 @@ abstract class AssetXformer(
3131
fieldSeparator = ctx.config.assetsFieldSeparator[0],
3232
) {
3333
/** {@inheritDoc} */
34-
override fun mapRow(inputRow: Map<String, String>): List<List<String>> {
35-
val assetMap = mapAsset(inputRow)
34+
override fun mapRow(inputRow: Map<String, String>): List<List<String>> = listOf(assetMapToValueList(mapAsset(inputRow), inputRow))
35+
36+
/**
37+
* Convert an asset field map to a CSV value list aligned with [targetHeader].
38+
* Values absent from [assetMap] fall back to [fallback], then to empty string.
39+
*/
40+
protected fun assetMapToValueList(
41+
assetMap: Map<String, String>,
42+
fallback: Map<String, String> = emptyMap(),
43+
): List<String> {
3644
val valueList = mutableListOf<String>()
3745
targetHeader!!.forEach { header ->
3846
if (header != null) {
3947
// Look for the transformed value first, then fallback to passing through what came in the input
40-
val transformed = assetMap.getOrElse(header) { inputRow.getOrElse(header) { "" } }
41-
valueList.add(transformed)
48+
valueList.add(assetMap.getOrElse(header) { fallback.getOrElse(header) { "" } })
4249
}
4350
}
44-
return listOf(valueList)
51+
return valueList
4552
}
4653

4754
/** {@inheritDoc} */
@@ -87,6 +94,13 @@ abstract class AssetXformer(
8794
RowSerde.getHeaderForField(Column.PRECISION, Column::class.java),
8895
RowSerde.getHeaderForField(Column.NUMERIC_SCALE, Column::class.java),
8996
RowSerde.getHeaderForField(Column.MAX_LENGTH, Column::class.java),
97+
RowSerde.getHeaderForField(Column.PARENT_COLUMN_QUALIFIED_NAME, Column::class.java),
98+
RowSerde.getHeaderForField(Column.PARENT_COLUMN, Column::class.java),
99+
RowSerde.getHeaderForField(Column.PARENT_COLUMN_NAME, Column::class.java),
100+
RowSerde.getHeaderForField(Column.NESTED_COLUMN_ORDER, Column::class.java),
101+
RowSerde.getHeaderForField(Column.COLUMN_DEPTH_LEVEL, Column::class.java),
102+
RowSerde.getHeaderForField(Column.COLUMN_HIERARCHY, Column::class.java),
103+
RowSerde.getHeaderForField(Asset.SUB_TYPE),
90104
)
91105

92106
fun getConnectorType(inputRow: Map<String, String>): String =

samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ColumnXformer.kt

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ class ColumnXformer(
3636
companion object {
3737
const val COLUMN_PARENT_QN = "columnParentQualifiedName"
3838
const val COLUMN_NAME = "columnName"
39+
val PARENT_COLUMN_QN_HEADER = RowSerde.getHeaderForField(Column.PARENT_COLUMN_QUALIFIED_NAME, Column::class.java)
40+
val PARENT_COLUMN_HEADER = RowSerde.getHeaderForField(Column.PARENT_COLUMN, Column::class.java)
41+
val PARENT_COLUMN_NAME_HEADER = RowSerde.getHeaderForField(Column.PARENT_COLUMN_NAME, Column::class.java)
42+
val NESTED_COLUMN_ORDER_HEADER = RowSerde.getHeaderForField(Column.NESTED_COLUMN_ORDER, Column::class.java)
43+
val COLUMN_DEPTH_LEVEL_HEADER = RowSerde.getHeaderForField(Column.COLUMN_DEPTH_LEVEL, Column::class.java)
44+
val COLUMN_HIERARCHY_HEADER = RowSerde.getHeaderForField(Column.COLUMN_HIERARCHY, Column::class.java)
3945
val REQUIRED_HEADERS =
4046
mapOf<String, Set<String>>(
4147
Asset.TYPE_NAME.atlanFieldName to setOf(),
@@ -66,6 +72,7 @@ class ColumnXformer(
6672
val assetQN = "$connectionQN/${details.partialQN}"
6773
val parentQN = "$connectionQN/${details.parentPartialQN}"
6874
val rawDataType = trimWhitespace(inputRow.getOrElse(Column.DATA_TYPE.atlanFieldName) { "" })
75+
val displayDataType = baseTypeName(rawDataType)
6976
var precision: Int? = null
7077
var scale: Double? = null
7178
var maxLength: Long? = null
@@ -95,6 +102,7 @@ class ColumnXformer(
95102
RowSerde.getHeaderForField(Column.VIEW_QUALIFIED_NAME, Column::class.java) to if (details.viewPQN.isNotBlank()) "$connectionQN/${details.viewPQN}" else "",
96103
RowSerde.getHeaderForField(Column.VIEW, Column::class.java) to if (details.parentTypeName == View.TYPE_NAME) "${details.parentTypeName}@$parentQN" else "",
97104
RowSerde.getHeaderForField(Column.MATERIALIZED_VIEW, Column::class.java) to if (details.parentTypeName == MaterializedView.TYPE_NAME) "${details.parentTypeName}@$parentQN" else "",
105+
RowSerde.getHeaderForField(Column.DATA_TYPE, Column::class.java) to displayDataType,
98106
RowSerde.getHeaderForField(Column.ORDER, Column::class.java) to inputRow.getOrElse(Column.ORDER.atlanFieldName) { "" },
99107
RowSerde.getHeaderForField(Column.RAW_DATA_TYPE_DEFINITION, Column::class.java) to rawDataType,
100108
RowSerde.getHeaderForField(Column.PRECISION, Column::class.java) to (precision?.toString() ?: ""),
@@ -106,6 +114,116 @@ class ColumnXformer(
106114
}
107115
}
108116

117+
/** Returns the base type name, stripping any angle-bracket type parameters.
118+
* E.g. "STRUCT<a:INT,b:DOUBLE>" → "STRUCT", "INT" → "INT". */
119+
private fun baseTypeName(rawType: String): String = if (rawType.contains("<")) rawType.substringBefore("<").trim().uppercase() else rawType
120+
121+
/** {@inheritDoc}
122+
*
123+
* Overridden to emit additional child column rows when the column's data type is a complex type
124+
* (STRUCT, ARRAY<STRUCT>, or MAP<K, STRUCT>). Child columns are generated recursively for
125+
* deeply nested types.
126+
*/
127+
override fun mapRow(inputRow: Map<String, String>): List<List<String>> {
128+
val rows = super.mapRow(inputRow).toMutableList()
129+
val rawType = trimWhitespace(inputRow.getOrElse(Column.DATA_TYPE.atlanFieldName) { "" })
130+
val parseResult = ComplexTypeParser.extractStructFields(rawType)
131+
if (parseResult != null) {
132+
val connectionQN = getConnectionQN(inputRow)
133+
val details = getSQLHierarchyDetails(inputRow, typeNameFilter, preprocessedDetails.entityQualifiedNameToType)
134+
val parentColumnQN = "$connectionQN/${details.partialQN}"
135+
val parentAssetMap = mapAsset(inputRow)
136+
rows.addAll(buildSubColumnRows(parentAssetMap, parentColumnQN, parseResult))
137+
}
138+
return rows
139+
}
140+
141+
/**
142+
* Recursively build child column rows for all fields in the given [parseResult].
143+
*
144+
* @param baseAssetMap field map of the immediate parent column (used to inherit context fields)
145+
* @param parentColumnQN qualified name of the parent column asset (used for [PARENT_COLUMN_QN_HEADER])
146+
* @param parseResult parsed complex type fields and optional synthetic QN node (e.g. "items" for ARRAY)
147+
* @param depth nesting depth of the child columns (1 for direct children of a top-level column, 2 for grandchildren, etc.)
148+
*/
149+
private fun buildSubColumnRows(
150+
baseAssetMap: Map<String, String>,
151+
parentColumnQN: String,
152+
parseResult: ComplexTypeParser.ParseResult,
153+
depth: Int = 1,
154+
): List<List<String>> {
155+
val rows = mutableListOf<List<String>>()
156+
// For ARRAY / MAP, insert the synthetic node into the QN path but NOT into parentColumnQN
157+
val qnBase = if (parseResult.syntheticNode != null) "$parentColumnQN/${parseResult.syntheticNode}" else parentColumnQN
158+
parseResult.fields.forEachIndexed { idx, field ->
159+
val childQN = "$qnBase/${field.name}"
160+
val childAssetMap = buildChildAssetMap(baseAssetMap, parentColumnQN, childQN, field, idx + 1, depth)
161+
rows.add(assetMapToValueList(childAssetMap))
162+
// Recurse for nested complex types (e.g. STRUCT within STRUCT, ARRAY within STRUCT)
163+
val nestedResult = ComplexTypeParser.extractStructFields(field.rawType)
164+
if (nestedResult != null) {
165+
rows.addAll(buildSubColumnRows(childAssetMap, childQN, nestedResult, depth + 1))
166+
}
167+
}
168+
return rows
169+
}
170+
171+
/**
172+
* Build the asset map for a single child column, inheriting all context fields from
173+
* [parentAssetMap] and overriding the column-specific fields.
174+
*
175+
* @param parentAssetMap asset map of the immediate parent column
176+
* @param parentColumnQN qualified name of the parent column (for [PARENT_COLUMN_QN_HEADER])
177+
* @param childQN qualified name for the child column
178+
* @param field field definition (name and raw type) for the child column
179+
* @param order ordinal position of the child column within its parent
180+
* @param depth nesting depth of this child column (1 for direct children of a top-level column, 2 for grandchildren, etc.)
181+
*/
182+
private fun buildChildAssetMap(
183+
parentAssetMap: Map<String, String>,
184+
parentColumnQN: String,
185+
childQN: String,
186+
field: ComplexTypeParser.FieldDefinition,
187+
order: Int,
188+
depth: Int,
189+
): Map<String, String> {
190+
val childMap = parentAssetMap.toMutableMap()
191+
childMap[RowSerde.getHeaderForField(Asset.QUALIFIED_NAME)] = childQN
192+
childMap[RowSerde.getHeaderForField(Asset.NAME)] = field.name
193+
childMap[RowSerde.getHeaderForField(Column.DATA_TYPE, Column::class.java)] = baseTypeName(field.rawType)
194+
childMap[RowSerde.getHeaderForField(Column.RAW_DATA_TYPE_DEFINITION, Column::class.java)] = field.rawType
195+
childMap[RowSerde.getHeaderForField(Column.ORDER, Column::class.java)] = order.toString()
196+
// Clear numeric type-specific fields — they're not meaningful for the child's raw type
197+
childMap[RowSerde.getHeaderForField(Column.PRECISION, Column::class.java)] = ""
198+
childMap[RowSerde.getHeaderForField(Column.NUMERIC_SCALE, Column::class.java)] = ""
199+
childMap[RowSerde.getHeaderForField(Column.MAX_LENGTH, Column::class.java)] = ""
200+
// Clear table/view references on sub-columns so they do not appear in the table's flat
201+
// column list (table_columns relationship). Navigation is via parentColumn chain instead.
202+
childMap[RowSerde.getHeaderForField(Column.TABLE_QUALIFIED_NAME, Column::class.java)] = ""
203+
childMap[RowSerde.getHeaderForField(Column.TABLE_NAME, Column::class.java)] = ""
204+
childMap[RowSerde.getHeaderForField(Column.TABLE, Column::class.java)] = ""
205+
childMap[RowSerde.getHeaderForField(Column.VIEW_QUALIFIED_NAME, Column::class.java)] = ""
206+
childMap[RowSerde.getHeaderForField(Column.VIEW_NAME, Column::class.java)] = ""
207+
childMap[RowSerde.getHeaderForField(Column.VIEW, Column::class.java)] = ""
208+
childMap[RowSerde.getHeaderForField(Column.MATERIALIZED_VIEW, Column::class.java)] = ""
209+
childMap[PARENT_COLUMN_QN_HEADER] = parentColumnQN
210+
childMap[PARENT_COLUMN_HEADER] = "${Column.TYPE_NAME}@$parentColumnQN"
211+
childMap[PARENT_COLUMN_NAME_HEADER] = parentColumnQN.substringAfterLast('/')
212+
childMap[NESTED_COLUMN_ORDER_HEADER] = order.toString()
213+
// columnDepthLevel tells Atlan this is a nested sub-column (not a top-level table column).
214+
childMap[COLUMN_DEPTH_LEVEL_HEADER] = depth.toString()
215+
// columnHierarchy lists all ancestor columns from depth-1 up to the immediate parent.
216+
// Each entry is a JSON object: {"depth":"<n>","qualifiedName":"<qn>","name":"<name>"}.
217+
// Multiple entries are newline-delimited (CellXformer.LIST_DELIMITER).
218+
// Matches the format used in AIM nested_columns.csv reference and Databricks connector.
219+
val parentHierarchyStr = parentAssetMap.getOrElse(COLUMN_HIERARCHY_HEADER) { "" }
220+
val parentName = parentColumnQN.substringAfterLast('/')
221+
val newEntry = """{"depth": "$depth","qualifiedName": "$parentColumnQN","name": "$parentName"}"""
222+
childMap[COLUMN_HIERARCHY_HEADER] = if (parentHierarchyStr.isBlank()) newEntry else "$parentHierarchyStr\n$newEntry"
223+
childMap[RowSerde.getHeaderForField(Asset.SUB_TYPE)] = "nested"
224+
return childMap
225+
}
226+
109227
class Preprocessor(
110228
originalFile: String,
111229
fieldSeparator: Char,
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/* SPDX-License-Identifier: Apache-2.0
2+
Copyright 2023 Atlan Pte. Ltd. */
3+
package com.atlan.pkg.rab
4+
5+
/**
6+
* Utility for parsing complex SQL type definitions (STRUCT, ARRAY, MAP) into nested field information.
7+
* Supports Hive-style colon-separated fields ("name:TYPE") and space-separated fields ("name TYPE").
8+
*/
9+
object ComplexTypeParser {
10+
data class FieldDefinition(
11+
val name: String,
12+
val rawType: String,
13+
)
14+
15+
/**
16+
* Result of parsing a complex type definition.
17+
* @param fields list of named fields extracted from the type
18+
* @param syntheticNode path segment(s) to insert between a parent column QN and child field names
19+
* in the qualified name. Null for STRUCT (no synthetic node), "items" for ARRAY,
20+
* "values" for MAP. Can be combined, e.g. "items/items" for ARRAY<ARRAY<STRUCT<...>>>.
21+
*/
22+
data class ParseResult(
23+
val fields: List<FieldDefinition>,
24+
val syntheticNode: String?,
25+
)
26+
27+
/**
28+
* Attempt to parse a raw type string into structured nested field information.
29+
* Handles STRUCT<...>, ARRAY<STRUCT<...>>, and MAP<K, STRUCT<...>> recursively.
30+
* Returns null if the type is not a recognized complex type with parseable fields.
31+
*/
32+
fun extractStructFields(rawType: String): ParseResult? {
33+
val trimmed = rawType.trim()
34+
val upper = trimmed.uppercase()
35+
return when {
36+
upper.startsWith("STRUCT<") && trimmed.endsWith(">") -> {
37+
val content = trimmed.substring("STRUCT<".length, trimmed.length - 1)
38+
val fields = parseStructContent(content)
39+
fields.takeIf { it.isNotEmpty() }?.let { ParseResult(it, null) }
40+
}
41+
42+
upper.startsWith("ARRAY<") && trimmed.endsWith(">") -> {
43+
val inner = trimmed.substring("ARRAY<".length, trimmed.length - 1).trim()
44+
val innerResult = extractStructFields(inner) ?: return null
45+
ParseResult(innerResult.fields, combineSyntheticNodes("items", innerResult.syntheticNode))
46+
}
47+
48+
upper.startsWith("MAP<") && trimmed.endsWith(">") -> {
49+
val inner = trimmed.substring("MAP<".length, trimmed.length - 1).trim()
50+
val valueType = extractMapValueType(inner) ?: return null
51+
val innerResult = extractStructFields(valueType.trim()) ?: return null
52+
ParseResult(innerResult.fields, combineSyntheticNodes("values", innerResult.syntheticNode))
53+
}
54+
55+
else -> {
56+
null
57+
}
58+
}
59+
}
60+
61+
/**
62+
* Parse the content between the outermost STRUCT<...> brackets into field definitions.
63+
* Splits on commas at depth 0 (not inside nested angle brackets or parentheses).
64+
*/
65+
private fun parseStructContent(content: String): List<FieldDefinition> {
66+
val fields = mutableListOf<FieldDefinition>()
67+
var depth = 0
68+
var start = 0
69+
for (i in content.indices) {
70+
when (content[i]) {
71+
'<', '(' -> {
72+
depth++
73+
}
74+
75+
'>', ')' -> {
76+
depth--
77+
}
78+
79+
',' -> {
80+
if (depth == 0) {
81+
parseField(content.substring(start, i).trim())?.let { fields.add(it) }
82+
start = i + 1
83+
}
84+
}
85+
}
86+
}
87+
parseField(content.substring(start).trim())?.let { fields.add(it) }
88+
return fields
89+
}
90+
91+
/**
92+
* Parse a single "name:TYPE" or "name TYPE" field definition string.
93+
* Prefers colon separator (Hive/BigQuery style), falls back to first whitespace.
94+
*/
95+
private fun parseField(fieldStr: String): FieldDefinition? {
96+
if (fieldStr.isBlank()) return null
97+
val colonIdx = firstColonAtDepthZero(fieldStr)
98+
if (colonIdx > 0) {
99+
val name = fieldStr.substring(0, colonIdx).trim()
100+
val type = fieldStr.substring(colonIdx + 1).trim()
101+
if (name.isNotBlank() && type.isNotBlank()) return FieldDefinition(name, type)
102+
}
103+
// Fallback: "fieldName TYPE" (space-separated)
104+
val spaceIdx = fieldStr.indexOfFirst { it.isWhitespace() }
105+
if (spaceIdx > 0) {
106+
val name = fieldStr.substring(0, spaceIdx).trim()
107+
val type = fieldStr.substring(spaceIdx + 1).trim()
108+
if (name.isNotBlank() && type.isNotBlank()) return FieldDefinition(name, type)
109+
}
110+
return null
111+
}
112+
113+
/**
114+
* Find the index of the first colon that is not inside angle brackets or parentheses.
115+
*/
116+
private fun firstColonAtDepthZero(s: String): Int {
117+
var depth = 0
118+
for (i in s.indices) {
119+
when (s[i]) {
120+
'<', '(' -> depth++
121+
'>', ')' -> depth--
122+
':' -> if (depth == 0) return i
123+
}
124+
}
125+
return -1
126+
}
127+
128+
/**
129+
* Extract the value type from MAP<keyType, valueType> content,
130+
* splitting on the first comma at depth 0.
131+
*/
132+
private fun extractMapValueType(mapContent: String): String? {
133+
var depth = 0
134+
for (i in mapContent.indices) {
135+
when (mapContent[i]) {
136+
'<', '(' -> depth++
137+
'>', ')' -> depth--
138+
',' -> if (depth == 0) return mapContent.substring(i + 1).trim()
139+
}
140+
}
141+
return null
142+
}
143+
144+
private fun combineSyntheticNodes(
145+
outer: String,
146+
inner: String?,
147+
): String = if (inner != null) "$outer/$inner" else outer
148+
}

0 commit comments

Comments
 (0)