Skip to content

Commit 553c33f

Browse files
committed
Add static analysis tests for migration anti-patterns
Prevent migration bugs like #64876 (DML before disable_sqlite_fkeys silently breaking SQLite PRAGMA) from reaching production by adding: 1. AST-based static analysis tests (test_migration_patterns.py) that detect three anti-patterns across all migration files: - MIG001: DML before disable_sqlite_fkeys (triggers autobegin) - MIG002: DDL before disable_sqlite_fkeys (triggers autobegin) - MIG003: DML without context.is_offline_mode() guard 2. Unit tests for migration utility functions (test_migration_utils.py) covering disable_sqlite_fkeys, mysql_drop_foreignkey_if_exists, and ignore_sqlite_value_error with real database backends. 3. Fixes confirmed SQLite PRAGMA bugs in migrations 0100 and 0106 by moving all operations inside the disable_sqlite_fkeys block. 4. Adds # noqa: MIG0XX suppression comments (with reasons) to 20 old migrations with acknowledged violations, and configures ruff to preserve them via external = ["MIG"].
1 parent ee27b29 commit 553c33f

27 files changed

Lines changed: 769 additions & 86 deletions

.pre-commit-config.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,13 @@ repos:
10341034
pass_filenames: false
10351035
files: ^.*\.py$
10361036
require_serial: true
1037+
- id: check-migration-patterns
1038+
name: Check migration files for anti-patterns (MIG001/MIG002/MIG003)
1039+
entry: ./scripts/ci/prek/check_migration_patterns.py
1040+
language: python
1041+
pass_filenames: true
1042+
files: ^airflow-core/src/airflow/migrations/versions/.*\.py$
1043+
require_serial: true
10371044
## ADD MOST PREK HOOK ABOVE THAT LINE
10381045
# The below prek hooks are those requiring CI image to be built
10391046
## ONLY ADD PREK HOOKS HERE THAT REQUIRE CI IMAGE

airflow-core/src/airflow/migrations/versions/0003_2_7_0_add_include_deferred_column_to_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def upgrade():
4343
with op.batch_alter_table("slot_pool") as batch_op:
4444
batch_op.add_column(sa.Column("include_deferred", sa.Boolean))
4545
# Different databases support different literal for FALSE. This is fine.
46-
op.execute(sa.text(f"UPDATE slot_pool SET include_deferred = {sa.false().compile(op.get_bind())}"))
46+
op.execute(sa.text(f"UPDATE slot_pool SET include_deferred = {sa.false().compile(op.get_bind())}")) # noqa: MIG003 -- backfill default for NOT NULL
4747
with op.batch_alter_table("slot_pool") as batch_op:
4848
batch_op.alter_column("include_deferred", existing_type=sa.Boolean, nullable=False)
4949

airflow-core/src/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def upgrade():
116116
),
117117
sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"),
118118
)
119-
op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference")
119+
op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference") # noqa: MIG003 -- data copy between tables during rename
120120
op.drop_table("dag_schedule_dataset_alias_reference")
121121
op.rename_table("new_table", "dag_schedule_dataset_alias_reference")
122122
op.create_index(
@@ -227,7 +227,7 @@ def downgrade():
227227
),
228228
sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"),
229229
)
230-
op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference")
230+
op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference") # noqa: MIG003 -- data copy between tables during rename
231231
op.drop_table("dag_schedule_dataset_alias_reference")
232232
op.rename_table("new_table", "dag_schedule_dataset_alias_reference")
233233
op.create_index(

airflow-core/src/airflow/migrations/versions/0030_3_0_0_rename_schedule_interval_to_timetable_.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,4 @@ def downgrade():
5858
type_=sa.Text,
5959
nullable=True,
6060
)
61-
op.execute("UPDATE dag SET schedule_interval=NULL;")
61+
op.execute("UPDATE dag SET schedule_interval=NULL;") # noqa: MIG003 -- cleanup during downgrade

airflow-core/src/airflow/migrations/versions/0035_3_0_0_update_user_id_type.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ def downgrade():
5757
# Clean up non-numeric user_id values before type conversion to prevent errors
5858
if dialect_name == "postgresql":
5959
# For PostgreSQL, use regex to identify non-integer values and set them to NULL
60-
op.execute(
60+
op.execute( # noqa: MIG003 -- user ID type reversion during downgrade
6161
text("""
6262
UPDATE dag_run_note
6363
SET user_id = NULL
6464
WHERE user_id IS NOT NULL
6565
AND user_id !~ '^[0-9]+$'
6666
""")
6767
)
68-
op.execute(
68+
op.execute( # noqa: MIG003 -- user ID type reversion during downgrade
6969
text("""
7070
UPDATE task_instance_note
7171
SET user_id = NULL
@@ -75,15 +75,15 @@ def downgrade():
7575
)
7676
elif dialect_name == "mysql":
7777
# For MySQL, use REGEXP to identify non-integer values
78-
op.execute(
78+
op.execute( # noqa: MIG003 -- user ID type reversion during downgrade
7979
text("""
8080
UPDATE dag_run_note
8181
SET user_id = NULL
8282
WHERE user_id IS NOT NULL
8383
AND user_id NOT REGEXP '^[0-9]+$'
8484
""")
8585
)
86-
op.execute(
86+
op.execute( # noqa: MIG003 -- user ID type reversion during downgrade
8787
text("""
8888
UPDATE task_instance_note
8989
SET user_id = NULL
@@ -93,15 +93,15 @@ def downgrade():
9393
)
9494
elif dialect_name == "sqlite":
9595
# SQLite doesn't have regex, so use GLOB pattern matching
96-
op.execute(
96+
op.execute( # noqa: MIG003 -- user ID type reversion during downgrade
9797
text("""
9898
UPDATE dag_run_note
9999
SET user_id = NULL
100100
WHERE user_id IS NOT NULL
101101
AND (user_id = '' OR user_id GLOB '*[^0-9]*')
102102
""")
103103
)
104-
op.execute(
104+
op.execute( # noqa: MIG003 -- user ID type reversion during downgrade
105105
text("""
106106
UPDATE task_instance_note
107107
SET user_id = NULL

airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def downgrade():
8989
# - Join to find if each row has a non-orphaned row with the same URI. If there is, select its id.
9090
# - Modifying the table currently used for selection is not allowed; the additional layer
9191
# SELECT id FROM (...) AS d3 makes MySQL think the selecting table is different.
92-
op.execute(
92+
op.execute( # noqa: MIG003 -- data cleanup during downgrade
9393
"delete from dataset where id in ("
9494
" select id from ("
9595
" select d1.id from dataset d1"
@@ -100,7 +100,7 @@ def downgrade():
100100
")"
101101
)
102102
else:
103-
op.execute(
103+
op.execute( # noqa: MIG003 -- data cleanup during downgrade
104104
"delete from dataset as d1 where d1.is_orphaned = true "
105105
"and exists (select 1 from dataset as d2 where d1.uri = d2.uri and d2.is_orphaned = false)"
106106
)

airflow-core/src/airflow/migrations/versions/0038_3_0_0_add_asset_active.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def upgrade():
5858
sa.Index("idx_asset_active_name_unique", "name", unique=True),
5959
sa.Index("idx_asset_active_uri_unique", "uri", unique=True),
6060
)
61-
op.execute("insert into asset_active (name, uri) select name, uri from dataset where is_orphaned = false")
61+
op.execute("insert into asset_active (name, uri) select name, uri from dataset where is_orphaned = false") # noqa: MIG003 -- orphan flag data migration
6262
with op.batch_alter_table("dataset", schema=None) as batch_op:
6363
batch_op.drop_column("is_orphaned")
6464

@@ -68,7 +68,7 @@ def downgrade():
6868
batch_op.add_column(
6969
sa.Column("is_orphaned", sa.Boolean, default=False, nullable=False, server_default="0")
7070
)
71-
op.execute(
71+
op.execute( # noqa: MIG003 -- orphan flag data migration
7272
"update dataset set is_orphaned = true "
7373
"where not exists (select 1 from asset_active "
7474
"where dataset.name = asset_active.name and dataset.uri = asset_active.uri)"

airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,13 @@ def upgrade():
240240
op.execute(mysql_uuid7_fn)
241241

242242
# Migrate existing rows with UUID v7
243-
op.execute("""
243+
op.execute( # noqa: MIG003 -- UUID primary key population
244+
"""
244245
UPDATE task_instance
245246
SET id = uuid_generate_v7(coalesce(queued_dttm, start_date, NOW(3)))
246247
WHERE id IS NULL
247-
""")
248+
"""
249+
)
248250

249251
# Drop this function as it is no longer needed
250252
op.execute(mysql_uuid7_fn_drop)

airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ def upgrade():
4747
"""Apply add dag versioning."""
4848
dialect_name = op.get_bind().dialect.name
4949

50-
op.execute("delete from dag_code;")
51-
op.execute("delete from serialized_dag;")
50+
op.execute("delete from dag_code;") # noqa: MIG003 -- dag versioning data migration
51+
op.execute("delete from serialized_dag;") # noqa: MIG003 -- dag versioning data migration
5252

5353
op.create_table(
5454
"dag_version",
@@ -160,8 +160,8 @@ def downgrade():
160160
batch_op.drop_constraint("created_dag_version_id_fkey", type_="foreignkey")
161161
batch_op.drop_column("created_dag_version_id")
162162

163-
op.execute("delete from dag_code;")
164-
op.execute("delete from serialized_dag;")
163+
op.execute("delete from dag_code;") # noqa: MIG003 -- dag versioning data migration
164+
op.execute("delete from serialized_dag;") # noqa: MIG003 -- dag versioning data migration
165165

166166
with op.batch_alter_table("dag_code", schema=None) as batch_op:
167167
batch_op.drop_constraint("dag_code_pkey", type_="primary")

airflow-core/src/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def downgrade():
243243
)
244244
elif dialect == "mysql":
245245
op.add_column("xcom", sa.Column("value_blob", LONGBLOB, nullable=True))
246-
op.execute("UPDATE xcom SET value_blob = CAST(value AS BINARY);")
246+
op.execute("UPDATE xcom SET value_blob = CAST(value AS BINARY);") # noqa: MIG003 -- type conversion during downgrade
247247
op.drop_column("xcom", "value")
248248
op.alter_column("xcom", "value_blob", existing_type=LONGBLOB, new_column_name="value")
249249

0 commit comments

Comments
 (0)