Skip to content

Commit a52536c

Browse files
BLDX-1076: Add GENERIC_OPENLINEAGE connector type and make emit() accept connector_type
Add GENERIC_OPENLINEAGE = ("generic-openlineage", ELT) to AtlanConnectorType so PyAtlan can target the marketplace generic-openlineage (GOLC) endpoint at /events/openlineage/generic-openlineage/api/v1/lineage. Make OpenLineageEvent.emit() / emit_async() accept an optional connector_type kwarg (defaulting to SPARK for backwards compatibility), mirroring the existing pattern on emit_raw()/emit_raw_async(). Applied to both the current and pyatlan_v9 copies of event.py. Unit tests cover: - GENERIC_OPENLINEAGE enum value - Routing to /events/openlineage/generic-openlineage/api/v1/lineage - emit() honors the connector_type kwarg - emit() default is still SPARK
1 parent d32c766 commit a52536c

4 files changed

Lines changed: 77 additions & 8 deletions

File tree

pyatlan/model/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ def get_connector_name(
272272
SYNAPSE = ("synapse", AtlanConnectionCategory.WAREHOUSE)
273273
AIRFLOW = ("airflow", AtlanConnectionCategory.ELT)
274274
OPENLINEAGE = ("openlineage", AtlanConnectionCategory.ELT)
275+
GENERIC_OPENLINEAGE = ("generic-openlineage", AtlanConnectionCategory.ELT)
275276
DATAFLOW = ("dataflow", AtlanConnectionCategory.ELT)
276277
QLIKSENSE = ("qlik-sense", AtlanConnectionCategory.BI)
277278
KAFKA = ("kafka", AtlanConnectionCategory.EVENT_BUS)

pyatlan/model/open_lineage/event.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,26 +108,36 @@ def creator(
108108
event_time=datetime.now(tz=utc).isoformat(), # type:ignore[call-arg]
109109
)
110110

111-
def emit(self, client: AtlanClient) -> None:
111+
def emit(
112+
self,
113+
client: AtlanClient,
114+
connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
115+
) -> None:
112116
"""
113117
Send the OpenLineage event to Atlan to be processed.
114118
115119
:param client: connectivity to an Atlan tenant
120+
:param connector_type: connector type for the OpenLineage event
116121
:raises AtlanError: on any API communication issues
117122
"""
118123
return client.open_lineage.send(
119-
request=self, connector_type=AtlanConnectorType.SPARK
124+
request=self, connector_type=connector_type
120125
)
121126

122-
async def emit_async(self, client: AsyncAtlanClient) -> None:
127+
async def emit_async(
128+
self,
129+
client: AsyncAtlanClient,
130+
connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
131+
) -> None:
123132
"""
124133
Send the OpenLineage event to Atlan to be processed (async version).
125134
126135
:param client: async connectivity to an Atlan tenant
136+
:param connector_type: connector type for the OpenLineage event
127137
:raises AtlanError: on any API communication issues
128138
"""
129139
return await client.open_lineage.send(
130-
request=self, connector_type=AtlanConnectorType.SPARK
140+
request=self, connector_type=connector_type
131141
)
132142

133143
@classmethod

pyatlan_v9/model/open_lineage/event.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,26 +118,36 @@ def creator(
118118
event_time=datetime.now(tz=utc).isoformat(),
119119
)
120120

121-
def emit(self, client: AtlanClient) -> None:
121+
def emit(
122+
self,
123+
client: AtlanClient,
124+
connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
125+
) -> None:
122126
"""
123127
Send the OpenLineage event to Atlan to be processed.
124128
125129
:param client: connectivity to an Atlan tenant
130+
:param connector_type: connector type for the OpenLineage event
126131
:raises AtlanError: on any API communication issues
127132
"""
128133
return client.open_lineage.send(
129-
request=self, connector_type=AtlanConnectorType.SPARK
134+
request=self, connector_type=connector_type
130135
)
131136

132-
async def emit_async(self, client: "AsyncAtlanClient") -> None:
137+
async def emit_async(
138+
self,
139+
client: "AsyncAtlanClient",
140+
connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
141+
) -> None:
133142
"""
134143
Asynchronously send the OpenLineage event to Atlan to be processed.
135144
136145
:param client: async connectivity to an Atlan tenant
146+
:param connector_type: connector type for the OpenLineage event
137147
:raises AtlanError: on any API communication issues
138148
"""
139149
return await client.open_lineage.send(
140-
request=self, connector_type=AtlanConnectorType.SPARK
150+
request=self, connector_type=connector_type
141151
)
142152

143153
@classmethod

tests/unit/model/open_lineage/open_lineage_test.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,54 @@ def test_ol_client_send(
152152
mock_api_caller.reset_mock()
153153

154154

155+
def test_ol_generic_openlineage_enum_value():
156+
assert AtlanConnectorType.GENERIC_OPENLINEAGE.value == "generic-openlineage"
157+
158+
159+
def test_ol_client_send_generic_openlineage_routes_to_endpoint(mock_api_caller):
160+
mock_api_caller._call_api.return_value = "Event received"
161+
OpenLineageClient(client=mock_api_caller).send(
162+
request=OpenLineageEvent(),
163+
connector_type=AtlanConnectorType.GENERIC_OPENLINEAGE,
164+
)
165+
166+
assert mock_api_caller._call_api.call_count == 1
167+
api_arg = mock_api_caller._call_api.call_args.kwargs["api"]
168+
assert api_arg.path == "generic-openlineage/api/v1/lineage"
169+
mock_api_caller.reset_mock()
170+
171+
172+
def test_ol_event_emit_accepts_connector_type(mock_api_caller):
173+
mock_api_caller._call_api.return_value = "Event received"
174+
client = AtlanClient()
175+
with patch.object(
176+
client.open_lineage, "send", return_value=None
177+
) as mock_send:
178+
OpenLineageEvent().emit(
179+
client=client,
180+
connector_type=AtlanConnectorType.GENERIC_OPENLINEAGE,
181+
)
182+
mock_send.assert_called_once()
183+
assert (
184+
mock_send.call_args.kwargs["connector_type"]
185+
== AtlanConnectorType.GENERIC_OPENLINEAGE
186+
)
187+
188+
189+
def test_ol_event_emit_defaults_to_spark(mock_api_caller):
190+
mock_api_caller._call_api.return_value = "Event received"
191+
client = AtlanClient()
192+
with patch.object(
193+
client.open_lineage, "send", return_value=None
194+
) as mock_send:
195+
OpenLineageEvent().emit(client=client)
196+
mock_send.assert_called_once()
197+
assert (
198+
mock_send.call_args.kwargs["connector_type"]
199+
== AtlanConnectorType.SPARK
200+
)
201+
202+
155203
def test_ol_client_send_when_ol_not_configured(client, mock_session):
156204
expected_error = (
157205
"ATLAN-PYTHON-400-064 Requested OpenLineage "

0 commit comments

Comments
 (0)