|
6 | 6 | import sys |
7 | 7 | from typing import Any, Dict, List, Mapping, Optional, Sequence, TypeVar, Union |
8 | 8 |
|
9 | | -from pydantic.v1 import parse_obj_as, parse_raw_as |
| 9 | +from pydantic.v1 import BaseModel, parse_obj_as, parse_raw_as |
10 | 10 |
|
11 | 11 | from pyatlan.client.aio import AsyncAtlanClient |
12 | 12 | from pyatlan.client.atlan import AtlanClient |
|
17 | 17 | # Type variable for client types |
18 | 18 | ClientType = TypeVar("ClientType", AtlanClient, AsyncAtlanClient) |
19 | 19 |
|
| 20 | + |
| 21 | +class PackageHeaders(BaseModel): |
| 22 | + """Typed container for Atlan package HTTP headers.""" |
| 23 | + |
| 24 | + agent: str = "workflow" |
| 25 | + workflow_id: Optional[str] = None |
| 26 | + app_name: Optional[str] = None |
| 27 | + run_id: Optional[str] = None |
| 28 | + |
| 29 | + class Config: |
| 30 | + allow_mutation = False |
| 31 | + |
| 32 | + def to_headers(self) -> Dict[str, str]: |
| 33 | + return { |
| 34 | + "x-atlan-agent": self.agent, |
| 35 | + "x-atlan-agent-id": self.workflow_id or "", |
| 36 | + "x-atlan-agent-app-name": self.app_name or "", |
| 37 | + "x-atlan-agent-workflow-id": self.run_id or "", |
| 38 | + } |
| 39 | + |
| 40 | + @classmethod |
| 41 | + def from_env(cls) -> Optional["PackageHeaders"]: |
| 42 | + workflow_id = os.environ.get("X_ATLAN_AGENT_ID") |
| 43 | + if not workflow_id: |
| 44 | + return None |
| 45 | + return cls( |
| 46 | + workflow_id=workflow_id, |
| 47 | + app_name=os.environ.get("X_ATLAN_AGENT_PACKAGE_NAME", ""), |
| 48 | + run_id=os.environ.get("X_ATLAN_AGENT_WORKFLOW_ID", ""), |
| 49 | + ) |
| 50 | + |
| 51 | + |
20 | 52 | # Try to import OpenTelemetry libraries |
21 | 53 | try: |
22 | 54 | from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( # type:ignore |
@@ -186,45 +218,33 @@ def set_package_ops(run_time_config: RuntimeConfig) -> AtlanClient: |
186 | 218 | :returns: an intialized AtlanClient that should be used for any calls to the SDK |
187 | 219 | """ |
188 | 220 | client = get_client(run_time_config.user_id or "") |
189 | | - if run_time_config.agent == "workflow": |
190 | | - client = set_package_headers(client) |
| 221 | + if run_time_config.agent: |
| 222 | + client = set_package_headers( |
| 223 | + client, |
| 224 | + headers=PackageHeaders( |
| 225 | + agent=run_time_config.agent, |
| 226 | + workflow_id=run_time_config.agent_id, |
| 227 | + app_name=run_time_config.agent_pkg, |
| 228 | + run_id=run_time_config.agent_wfl, |
| 229 | + ), |
| 230 | + ) |
191 | 231 | return client |
192 | 232 |
|
193 | 233 |
|
194 | 234 | def set_package_headers( |
195 | 235 | client: ClientType, |
196 | | - agent: str = "workflow", |
197 | | - workflow_id: Optional[str] = None, |
198 | | - app_name: Optional[str] = None, |
199 | | - run_id: Optional[str] = None, |
| 236 | + headers: Optional[PackageHeaders] = None, |
200 | 237 | ) -> ClientType: |
201 | 238 | """ |
202 | 239 | Configure the AtlanClient or AsyncAtlanClient with package headers. |
203 | 240 |
|
204 | | - Each header value can be passed explicitly; if omitted, falls back to the |
205 | | - corresponding environment variable (X_ATLAN_AGENT_ID, |
206 | | - X_ATLAN_AGENT_PACKAGE_NAME, X_ATLAN_AGENT_WORKFLOW_ID). |
207 | | -
|
208 | 241 | :param client: AtlanClient or AsyncAtlanClient instance to configure |
209 | | - :param agent: value for the x-atlan-agent header (default: "workflow") |
210 | | - :param workflow_id: value for the x-atlan-agent-id header (default: X_ATLAN_AGENT_ID env var) |
211 | | - :param app_name: value for the x-atlan-agent-package-name header |
212 | | - (default: X_ATLAN_AGENT_PACKAGE_NAME env var) |
213 | | - :param run_id: value for the x-atlan-agent-workflow-id header |
214 | | - (default: X_ATLAN_AGENT_WORKFLOW_ID env var) |
| 242 | + :param headers: PackageHeaders instance; if None, reads from environment variables |
215 | 243 | :returns: updated client instance of the same type. |
216 | 244 | """ |
217 | | - resolved_workflow_id = workflow_id or os.environ.get("X_ATLAN_AGENT_ID") |
218 | | - if agent and resolved_workflow_id: |
219 | | - headers: Dict[str, str] = { |
220 | | - "x-atlan-agent": agent, |
221 | | - "x-atlan-agent-id": resolved_workflow_id, |
222 | | - "x-atlan-agent-package-name": app_name |
223 | | - or os.environ.get("X_ATLAN_AGENT_PACKAGE_NAME", ""), |
224 | | - "x-atlan-agent-workflow-id": run_id |
225 | | - or os.environ.get("X_ATLAN_AGENT_WORKFLOW_ID", ""), |
226 | | - } |
227 | | - client.update_headers(headers) |
| 245 | + pkg_headers = headers or PackageHeaders.from_env() |
| 246 | + if pkg_headers and pkg_headers.workflow_id: |
| 247 | + client.update_headers(pkg_headers.to_headers()) |
228 | 248 | return client |
229 | 249 |
|
230 | 250 |
|
|
0 commit comments