Defer heavy Google SDK imports to runtime in Dataproc and BigQuery operators#64938
Defer heavy Google SDK imports to runtime in Dataproc and BigQuery operators#64938alamashir wants to merge 4 commits intoapache:mainfrom
Conversation
…erators (apache#62373) Move heavy imports (google.cloud.dataproc_v1, google.cloud.bigquery, hooks, triggers) from module-level to method bodies in dataproc.py and bigquery.py operators. This prevents 15-26s import delays on small workers that cause DagBag timeout errors during DAG parsing. Key changes: - Use _UNSET sentinel pattern for DEFAULT/DEFAULT_RETRY parameters - Move _BigQueryHookWithFlexibleProjectId into get_db_hook() method - Add lazy imports in all execute/helper methods - Update test mock paths to match new import locations
|
cc @VladaZakharova can you review? |
|
Can you please share results of all the system tests for dataproc and bigquery? you changed a lot of templated fields, so i want to verify that all the tests are still green |
VladaZakharova
left a comment
There was a problem hiding this comment.
please provide the screenshots
|
@VladaZakharova do you just mean running test suite and providing screenshot of that? |
|
@VladaZakharova |
There was a problem hiding this comment.
Pull request overview
This PR reduces DAG-parse import latency for the Google Dataproc and BigQuery operators by deferring heavy Google SDK and provider imports until execution-time code paths, addressing DagBag import timeouts on small workers (issue #62373).
Changes:
- Moved
google.*SDK imports and Google provider hook/trigger imports out of module scope indataproc.pyandbigquery.py, and introduced an_UNSETsentinel to avoid importinggoogle.api_coredefaults at definition time. - Refactored BigQuery’s flexible-project hook wrapper to be defined inside
get_db_hook()soBigQueryHookisn’t imported at operator module import time. - Updated unit tests’
mock.patch()targets to patch the hook/SDK symbols at their new import locations.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py |
Defers Dataproc SDK/hook/trigger imports and replaces default retry args with _UNSET + runtime DEFAULT resolution. |
providers/google/src/airflow/providers/google/cloud/operators/bigquery.py |
Defers BigQuery SDK/hook/trigger imports and nests flexible-project hook wrapper inside get_db_hook(). |
providers/google/tests/unit/google/cloud/operators/test_dataproc.py |
Updates patch paths to hook/SDK modules now that operators no longer re-export symbols at module scope. |
providers/google/tests/unit/google/cloud/operators/test_bigquery.py |
Updates patch paths (hooks/mixin method) to match the new import structure. |
Comments suppressed due to low confidence (1)
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:1390
DataprocJobBaseOperator.__init__imports and instantiatesDataprocHook, which importsgoogle.cloud.dataproc_v1at module import time (providers/google/cloud/hooks/dataproc.py:32). Operator instances are created during DAG parsing, so this can still trigger heavy Google SDK imports before task execution and undermine the goal of avoidingDagBagimport timeouts. Consider deferring hook import/creation untilexecute()(or via a lazycached_property) and postponing theproject_idfallback resolution (project_id or hook.project_id) to runtime.
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook
self.job_error_states = job_error_states or {"ERROR"}
self.impersonation_chain = impersonation_chain
self.hook = DataprocHook(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain)
self.project_id = project_id or self.hook.project_id
self.job_template: DataProcJobBuilder | None = None
| def project_id(self, value: str) -> None: | ||
| cached_creds, _ = self.get_credentials_and_project_id() | ||
| self._cached_project_id = value or PROVIDE_PROJECT_ID | ||
| self._cached_credntials = cached_creds |
There was a problem hiding this comment.
In _BigQueryHookWithFlexibleProjectId.project_id setter, this assigns to self._cached_credntials, but the base hook caching attribute is _cached_credentials (see airflow/providers/google/common/hooks/base_google.py). As written, the credentials cache is not updated/used and a new misspelled attribute is created instead. Update this to write to the correct _cached_credentials attribute (and consider keeping backwards-compat only if needed).
| self._cached_credntials = cached_creds | |
| self._cached_credentials = cached_creds |
No, i mean running in breeze system tests that are located in folder tests/system/dataproc and tests/system/bigquery and attach the screenshots of those tests green |


Summary
Fixes #62373
google.cloud.dataproc_v1,google.cloud.bigquery, hooks, triggers) from module-level to method bodies indataproc.pyandbigquery.pyoperators_UNSETsentinel pattern forDEFAULT/DEFAULT_RETRYdefault parameter values to avoid importinggoogle.api_coreat class definition time_BigQueryHookWithFlexibleProjectIdclass definition insideget_db_hook()method to avoid importingBigQueryHookat module levelThis prevents 15-26s import delays on small workers (1 vCPU, 2 GiB) that cause
DagBag timeout error(30s limit) during DAG parsing. Heavy imports now only happen at task execution time.Test plan
pytest providers/google/tests/unit/google/cloud/operators/test_dataproc.py— 108 passedpytest providers/google/tests/unit/google/cloud/operators/test_bigquery.py— 128 passedruff checkpasses on all changed filespython -c "from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator"completes without loading google.cloud.dataproc_v1