Skip to content

Commit 7348e09

Browse files
authored
Remove unused/dead code in migration utils (#64947)
1 parent bcb489c commit 7348e09

1 file changed

Lines changed: 0 additions & 246 deletions

File tree

  • airflow-core/src/airflow/migrations

airflow-core/src/airflow/migrations/utils.py

Lines changed: 0 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,8 @@
1717
from __future__ import annotations
1818

1919
import contextlib
20-
from collections import defaultdict
2120
from contextlib import contextmanager
2221

23-
from sqlalchemy import text
24-
25-
26-
def get_mssql_table_constraints(conn, table_name) -> dict[str, dict[str, list[str]]]:
27-
"""
28-
Return the primary and unique constraint along with column name.
29-
30-
Some tables like `task_instance` are missing the primary key constraint
31-
name and the name is auto-generated by the SQL server, so this function
32-
helps to retrieve any primary or unique constraint name.
33-
34-
:param conn: sql connection object
35-
:param table_name: table name
36-
:return: a dictionary of ((constraint name, constraint type), column name) of table
37-
"""
38-
query = text(
39-
f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
40-
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
41-
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
42-
WHERE tc.TABLE_NAME = '{table_name}' AND
43-
(tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE'
44-
or UPPER(tc.CONSTRAINT_TYPE) = 'FOREIGN KEY')
45-
"""
46-
)
47-
result = conn.execute(query).fetchall()
48-
constraint_dict = defaultdict(lambda: defaultdict(list))
49-
for constraint, constraint_type, col_name in result:
50-
constraint_dict[constraint_type][constraint].append(col_name)
51-
return constraint_dict
52-
5322

5423
@contextmanager
5524
def disable_sqlite_fkeys(op):
@@ -86,224 +55,9 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op):
8655
""")
8756

8857

89-
def mysql_drop_index_if_exists(index_name, table_name, op):
90-
"""Older Mysql versions do not support DROP INDEX IF EXISTS."""
91-
op.execute(f"""
92-
CREATE PROCEDURE DropIndexIfExists()
93-
BEGIN
94-
IF EXISTS (
95-
SELECT 1
96-
FROM information_schema.STATISTICS
97-
WHERE
98-
TABLE_SCHEMA = DATABASE() AND
99-
TABLE_NAME = '{table_name}' AND
100-
INDEX_NAME = '{index_name}'
101-
) THEN
102-
DROP INDEX `{index_name}` ON `{table_name}`;
103-
END IF;
104-
END;
105-
CALL DropIndexIfExists();
106-
DROP PROCEDURE DropIndexIfExists;
107-
""")
108-
109-
11058
def ignore_sqlite_value_error():
11159
from alembic import op
11260

11361
if op.get_bind().dialect.name == "sqlite":
11462
return contextlib.suppress(ValueError)
11563
return contextlib.nullcontext()
116-
117-
118-
def get_dialect_name(op) -> str:
119-
conn = op.get_bind()
120-
return conn.dialect.name if conn is not None else op.get_context().dialect.name
121-
122-
123-
def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None:
124-
"""
125-
Create an index if it does not already exist.
126-
127-
MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used.
128-
PostgreSQL and SQLite support it natively.
129-
"""
130-
dialect_name = get_dialect_name(op)
131-
132-
if dialect_name == "mysql":
133-
unique_kw = "UNIQUE " if unique else ""
134-
col_list = ", ".join(f"`{c}`" for c in columns)
135-
op.execute(
136-
text(f"""
137-
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
138-
CREATE PROCEDURE CreateIndexIfNotExists()
139-
BEGIN
140-
IF NOT EXISTS (
141-
SELECT 1
142-
FROM information_schema.STATISTICS
143-
WHERE
144-
TABLE_SCHEMA = DATABASE() AND
145-
TABLE_NAME = '{table_name}' AND
146-
INDEX_NAME = '{index_name}'
147-
) THEN
148-
CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list});
149-
END IF;
150-
END;
151-
CALL CreateIndexIfNotExists();
152-
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
153-
""")
154-
)
155-
else:
156-
op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True)
157-
158-
159-
def drop_index_if_exists(op, index_name, table_name) -> None:
160-
"""
161-
Drop an index if it exists.
162-
163-
Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
164-
SQLite and PostgreSQL support DROP INDEX IF EXISTS natively.
165-
MySQL requires a stored procedure since it does not support IF EXISTS for DROP INDEX.
166-
"""
167-
dialect_name = get_dialect_name(op)
168-
169-
if dialect_name == "mysql":
170-
op.execute(
171-
text(f"""
172-
CREATE PROCEDURE DropIndexIfExists()
173-
BEGIN
174-
IF EXISTS (
175-
SELECT 1
176-
FROM information_schema.STATISTICS
177-
WHERE
178-
TABLE_SCHEMA = DATABASE() AND
179-
TABLE_NAME = '{table_name}' AND
180-
INDEX_NAME = '{index_name}'
181-
) THEN
182-
DROP INDEX `{index_name}` ON `{table_name}`;
183-
END IF;
184-
END;
185-
CALL DropIndexIfExists();
186-
DROP PROCEDURE DropIndexIfExists;
187-
""")
188-
)
189-
else:
190-
# PostgreSQL and SQLite both support DROP INDEX IF EXISTS
191-
op.drop_index(index_name, table_name=table_name, if_exists=True)
192-
193-
194-
def drop_unique_constraints_on_columns(op, table_name, columns) -> None:
195-
"""
196-
Drop all unique constraints covering any of the given columns, regardless of constraint name.
197-
198-
Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
199-
SQLite falls back to batch mode and requires a live connection.
200-
"""
201-
import sqlalchemy as sa
202-
203-
dialect_name = get_dialect_name(op)
204-
205-
if dialect_name == "postgresql":
206-
cols_array = ", ".join(f"'{c}'" for c in columns)
207-
op.execute(
208-
text(f"""
209-
DO $$
210-
DECLARE r record;
211-
BEGIN
212-
FOR r IN
213-
SELECT DISTINCT tc.constraint_name
214-
FROM information_schema.table_constraints tc
215-
JOIN information_schema.key_column_usage kcu
216-
ON tc.constraint_name = kcu.constraint_name
217-
AND tc.table_schema = kcu.table_schema
218-
WHERE tc.table_name = '{table_name}'
219-
AND tc.constraint_type = 'UNIQUE'
220-
AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[])
221-
LOOP
222-
EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || ' DROP CONSTRAINT IF EXISTS '
223-
|| quote_ident(r.constraint_name);
224-
END LOOP;
225-
END $$
226-
""")
227-
)
228-
elif dialect_name == "mysql":
229-
cols_in = ", ".join(f"'{c}'" for c in columns)
230-
op.execute(
231-
text(f"""
232-
CREATE PROCEDURE DropUniqueOnColumns()
233-
BEGIN
234-
DECLARE done INT DEFAULT FALSE;
235-
DECLARE v_name VARCHAR(255);
236-
DECLARE cur CURSOR FOR
237-
SELECT DISTINCT kcu.CONSTRAINT_NAME
238-
FROM information_schema.KEY_COLUMN_USAGE kcu
239-
JOIN information_schema.TABLE_CONSTRAINTS tc
240-
ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
241-
AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA
242-
AND kcu.TABLE_NAME = tc.TABLE_NAME
243-
WHERE kcu.TABLE_NAME = '{table_name}'
244-
AND kcu.TABLE_SCHEMA = DATABASE()
245-
AND tc.CONSTRAINT_TYPE = 'UNIQUE'
246-
AND kcu.COLUMN_NAME IN ({cols_in});
247-
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
248-
OPEN cur;
249-
drop_loop: LOOP
250-
FETCH cur INTO v_name;
251-
IF done THEN LEAVE drop_loop; END IF;
252-
SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX `', v_name, '`');
253-
PREPARE s FROM @stmt;
254-
EXECUTE s;
255-
DEALLOCATE PREPARE s;
256-
END LOOP;
257-
CLOSE cur;
258-
END;
259-
CALL DropUniqueOnColumns();
260-
DROP PROCEDURE DropUniqueOnColumns;
261-
""")
262-
)
263-
else:
264-
# SQLite — batch mode rewrites the table; requires a live connection
265-
with op.batch_alter_table(table_name, schema=None) as batch_op:
266-
for uq in sa.inspect(op.get_bind()).get_unique_constraints(table_name):
267-
if any(col in uq["column_names"] for col in columns):
268-
batch_op.drop_constraint(uq["name"], type_="unique")
269-
270-
271-
def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None:
272-
"""
273-
Drop a unique constraint by name if it exists.
274-
275-
Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
276-
SQLite falls back to batch mode and requires a live connection.
277-
"""
278-
dialect_name = get_dialect_name(op)
279-
280-
if dialect_name == "postgresql":
281-
op.execute(text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS "{constraint_name}"'))
282-
elif dialect_name == "mysql":
283-
op.execute(
284-
text(f"""
285-
CREATE PROCEDURE DropUniqueIfExists()
286-
BEGIN
287-
IF EXISTS (
288-
SELECT 1
289-
FROM information_schema.TABLE_CONSTRAINTS
290-
WHERE
291-
CONSTRAINT_SCHEMA = DATABASE() AND
292-
TABLE_NAME = '{table_name}' AND
293-
CONSTRAINT_NAME = '{constraint_name}' AND
294-
CONSTRAINT_TYPE = 'UNIQUE'
295-
) THEN
296-
ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`;
297-
ELSE
298-
SELECT 1;
299-
END IF;
300-
END;
301-
CALL DropUniqueIfExists();
302-
DROP PROCEDURE DropUniqueIfExists;
303-
""")
304-
)
305-
else:
306-
# SQLite — batch mode rewrites the table; requires a live connection
307-
with op.batch_alter_table(table_name, schema=None) as batch_op:
308-
with contextlib.suppress(ValueError):
309-
batch_op.drop_constraint(constraint_name, type_="unique")

0 commit comments

Comments
 (0)