Skip to content

Commit adb01b9

Browse files
github-actions[bot]holmuk
authored andcommitted
[v3-2-test] Fix missing dag_id in get_task_instance (#64957) (#64968) (#65067)
(cherry picked from commit 4ecbd59) Co-authored-by: holmuk <20281580+holmuk@users.noreply.github.com>
1 parent 0979f14 commit adb01b9

2 files changed

Lines changed: 76 additions & 0 deletions

File tree

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,7 @@ def get_task_instance(
792792
select(TaskInstance)
793793
.options(lazyload(TaskInstance.dag_run)) # lazy load dag run to avoid locking it
794794
.filter_by(
795+
dag_id=dag_id,
795796
run_id=run_id,
796797
task_id=task_id,
797798
map_index=map_index,

airflow-core/tests/unit/models/test_taskinstance.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2605,6 +2605,81 @@ def test_task_instance_history_is_created_when_ti_goes_for_retry(self, dag_maker
26052605
# the new try_id should be different from what's recorded in tih
26062606
assert tih[0].task_instance_id == try_id
26072607

2608+
@pytest.mark.parametrize(
2609+
("first_ti", "second_ti"),
2610+
[
2611+
pytest.param(
2612+
("dag_1", "run_1", "task_1", -1),
2613+
("dag_2", "run_1", "task_1", -1),
2614+
id="tasks_with_different_dags",
2615+
),
2616+
pytest.param(
2617+
("dag_1", "run_1", "task_1", -1),
2618+
("dag_1", "run_2", "task_1", -1),
2619+
id="tasks_with_different_runs",
2620+
),
2621+
# There are no cases with equal dag_id/run_id because create_task_instance()
2622+
# creates a DagRun each time, and DagRun has a unique (dag_id, run_id) constraint.
2623+
],
2624+
)
2625+
def test_get_task_instance_disambiguates_by_dag_id_and_run_id(
2626+
self, create_task_instance, session, first_ti, second_ti
2627+
):
2628+
dag_id_1, run_id_1, task_id_1, map_index_1 = first_ti
2629+
dag_id_2, run_id_2, task_id_2, map_index_2 = second_ti
2630+
2631+
ti1 = create_task_instance(
2632+
dag_id=dag_id_1,
2633+
run_id=run_id_1,
2634+
task_id=task_id_1,
2635+
map_index=map_index_1,
2636+
session=session,
2637+
)
2638+
ti2 = create_task_instance(
2639+
dag_id=dag_id_2,
2640+
run_id=run_id_2,
2641+
task_id=task_id_2,
2642+
map_index=map_index_2,
2643+
session=session,
2644+
)
2645+
2646+
# Regression setup for #64957: if dag_id is ignored, this lookup key becomes ambiguous.
2647+
if dag_id_1 != dag_id_2:
2648+
ambiguous_count = session.scalar(
2649+
select(func.count())
2650+
.select_from(TI)
2651+
.filter_by(run_id=run_id_1, task_id=task_id_1, map_index=map_index_1)
2652+
)
2653+
assert ambiguous_count == 2, "Setup failure: expected two TIs matching without dag_id filter"
2654+
2655+
# This case does not target the original regression directly (run_id was already filtered),
2656+
# but we keep it as defense-in-depth against future changes.
2657+
found_1 = TI.get_task_instance(
2658+
dag_id=dag_id_1,
2659+
run_id=run_id_1,
2660+
task_id=task_id_1,
2661+
map_index=map_index_1,
2662+
session=session,
2663+
)
2664+
found_2 = TI.get_task_instance(
2665+
dag_id=dag_id_2,
2666+
run_id=run_id_2,
2667+
task_id=task_id_2,
2668+
map_index=map_index_2,
2669+
session=session,
2670+
)
2671+
2672+
assert found_1 is not None
2673+
assert found_2 is not None
2674+
2675+
assert found_1.id == ti1.id
2676+
assert found_2.id == ti2.id
2677+
2678+
# Keep dag_id assertions explicit to document the regression intent (#64957):
2679+
# get_task_instance() must disambiguate identical run/task/map_index by dag_id.
2680+
assert found_1.dag_id == dag_id_1
2681+
assert found_2.dag_id == dag_id_2
2682+
26082683

26092684
@pytest.mark.parametrize("pool_override", [None, "test_pool2"])
26102685
@pytest.mark.parametrize("queue_by_policy", [None, "forced_queue"])

0 commit comments

Comments
 (0)