@@ -2605,6 +2605,81 @@ def test_task_instance_history_is_created_when_ti_goes_for_retry(self, dag_maker
26052605 # the new try_id should be different from what's recorded in tih
26062606 assert tih [0 ].task_instance_id == try_id
26072607
2608+ @pytest .mark .parametrize (
2609+ ("first_ti" , "second_ti" ),
2610+ [
2611+ pytest .param (
2612+ ("dag_1" , "run_1" , "task_1" , - 1 ),
2613+ ("dag_2" , "run_1" , "task_1" , - 1 ),
2614+ id = "tasks_with_different_dags" ,
2615+ ),
2616+ pytest .param (
2617+ ("dag_1" , "run_1" , "task_1" , - 1 ),
2618+ ("dag_1" , "run_2" , "task_1" , - 1 ),
2619+ id = "tasks_with_different_runs" ,
2620+ ),
2621+ # There are no cases with equal dag_id/run_id because create_task_instance()
2622+ # creates a DagRun each time, and DagRun has a unique (dag_id, run_id) constraint.
2623+ ],
2624+ )
2625+ def test_get_task_instance_disambiguates_by_dag_id_and_run_id (
2626+ self , create_task_instance , session , first_ti , second_ti
2627+ ):
2628+ dag_id_1 , run_id_1 , task_id_1 , map_index_1 = first_ti
2629+ dag_id_2 , run_id_2 , task_id_2 , map_index_2 = second_ti
2630+
2631+ ti1 = create_task_instance (
2632+ dag_id = dag_id_1 ,
2633+ run_id = run_id_1 ,
2634+ task_id = task_id_1 ,
2635+ map_index = map_index_1 ,
2636+ session = session ,
2637+ )
2638+ ti2 = create_task_instance (
2639+ dag_id = dag_id_2 ,
2640+ run_id = run_id_2 ,
2641+ task_id = task_id_2 ,
2642+ map_index = map_index_2 ,
2643+ session = session ,
2644+ )
2645+
2646+ # Regression setup for #64957: if dag_id is ignored, this lookup key becomes ambiguous.
2647+ if dag_id_1 != dag_id_2 :
2648+ ambiguous_count = session .scalar (
2649+ select (func .count ())
2650+ .select_from (TI )
2651+ .filter_by (run_id = run_id_1 , task_id = task_id_1 , map_index = map_index_1 )
2652+ )
2653+ assert ambiguous_count == 2 , "Setup failure: expected two TIs matching without dag_id filter"
2654+
2655+ # This case does not target the original regression directly (run_id was already filtered),
2656+ # but we keep it as defense-in-depth against future changes.
2657+ found_1 = TI .get_task_instance (
2658+ dag_id = dag_id_1 ,
2659+ run_id = run_id_1 ,
2660+ task_id = task_id_1 ,
2661+ map_index = map_index_1 ,
2662+ session = session ,
2663+ )
2664+ found_2 = TI .get_task_instance (
2665+ dag_id = dag_id_2 ,
2666+ run_id = run_id_2 ,
2667+ task_id = task_id_2 ,
2668+ map_index = map_index_2 ,
2669+ session = session ,
2670+ )
2671+
2672+ assert found_1 is not None
2673+ assert found_2 is not None
2674+
2675+ assert found_1 .id == ti1 .id
2676+ assert found_2 .id == ti2 .id
2677+
2678+ # Keep dag_id assertions explicit to document the regression intent (#64957):
2679+ # get_task_instance() must disambiguate identical run/task/map_index by dag_id.
2680+ assert found_1 .dag_id == dag_id_1
2681+ assert found_2 .dag_id == dag_id_2
2682+
26082683
26092684@pytest .mark .parametrize ("pool_override" , [None , "test_pool2" ])
26102685@pytest .mark .parametrize ("queue_by_policy" , [None , "forced_queue" ])
0 commit comments