@@ -140,9 +140,21 @@ class AtlanConnectionCategory(str, Enum):
140140 CUSTOM = "custom"
141141
142142
143- class AtlanConnectorType (str , Enum ):
143+ class AtlanConnectorType (str , Enum , metaclass = utils . ExtendableEnumMeta ):
144144 category : AtlanConnectionCategory
145145
146+ @classmethod
147+ def get_values (cls ):
148+ return [member .value for member in cls ._member_map_ .values ()]
149+
150+ @classmethod
151+ def get_names (cls ):
152+ return list (cls ._member_map_ .keys ())
153+
154+ @classmethod
155+ def get_items (cls ):
156+ return [(name , member .value ) for name , member in cls ._member_map_ .items ()]
157+
146158 @classmethod
147159 def _get_connector_type_from_qualified_name (
148160 cls , qualified_name : str
@@ -152,12 +164,17 @@ def _get_connector_type_from_qualified_name(
152164 raise ValueError (
153165 f"Qualified name '{ qualified_name } ' does not contain enough segments."
154166 )
155- connector_type_key = tokens [1 ].upper ()
156- # Check if the connector_type_key exists in AtlanConnectorType
167+
168+ connector_value = tokens [1 ]
169+ # Ensure the enum name is converted to UPPER_SNAKE_CASE from kebab-case
170+ connector_type_key = tokens [1 ].replace ("-" , "_" ).upper ()
171+
172+ # Check if the connector_type_key exists in AtlanConnectorType;
173+ # if so, return it directly. Otherwise, it may be a custom type.
157174 if connector_type_key not in AtlanConnectorType .__members__ :
158- raise ValueError (
159- f"Could not determine AtlanConnectorType from ' { qualified_name } '; "
160- f"' { connector_type_key } ' is not a valid connector type."
175+ return AtlanConnectorType . CREATE_CUSTOM (
176+ name = connector_type_key ,
177+ value = connector_value ,
161178 )
162179 return AtlanConnectorType [connector_type_key ]
163180
@@ -169,6 +186,12 @@ def __new__(
169186 obj .category = category
170187 return obj
171188
189+ @classmethod
190+ def CREATE_CUSTOM (
191+ cls , name : str , value : str , category = AtlanConnectionCategory .CUSTOM
192+ ) -> "AtlanConnectorType" :
193+ return cls .add_value (name , value , category )
194+
172195 def to_qualified_name (self ):
173196 return f"default/{ self .value } /{ int (utils .get_epoch_timestamp ())} "
174197
@@ -193,14 +216,22 @@ def get_connector_name(
193216 fields = qualified_name .split ("/" )
194217 if len (fields ) != qualified_name_len :
195218 raise ValueError (err )
219+
220+ connector_value = fields [1 ]
221+ # Try enum conversion; fallback to custom connector if it fails
196222 try :
197- connector_name = AtlanConnectorType (fields [1 ]).value # type:ignore
198- if attribute_name != "connection_qualified_name" :
199- connection_qn = f"{ fields [0 ]} /{ fields [1 ]} /{ fields [2 ]} "
200- return connection_qn , connector_name
201- return connector_name
202- except ValueError as e :
203- raise ValueError (err ) from e
223+ connector_name = AtlanConnectorType (connector_value ).value # type: ignore
224+ except ValueError :
225+ custom_connection = AtlanConnectorType .CREATE_CUSTOM (
226+ # Ensure the enum name is converted to UPPER_SNAKE_CASE from kebab-case
227+ name = connector_value .replace ("-" , "_" ).upper (),
228+ value = connector_value ,
229+ )
230+ connector_name = custom_connection .value
231+ if attribute_name != "connection_qualified_name" :
232+ connection_qn = f"{ fields [0 ]} /{ fields [1 ]} /{ fields [2 ]} "
233+ return connection_qn , connector_name
234+ return connector_name
204235
205236 SNOWFLAKE = ("snowflake" , AtlanConnectionCategory .WAREHOUSE )
206237 TABLEAU = ("tableau" , AtlanConnectionCategory .BI )
0 commit comments