-
Notifications
You must be signed in to change notification settings - Fork 16.6k
[v3-1-test] Fix: TriggerDagRunOperator stuck in deferred state with reset_dag_run (#57756) (#57968)#58333
Conversation
When TriggerDagRunOperator is used with deferrable=True, wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database preserves the
original logical_date from the first run. However, on subsequent runs after clearing,
the operator calculates a NEW logical_date based on the current time. The DagStateTrigger
was being created with this newly calculated logical_date, causing a mismatch when
querying the database - the trigger looked for a DAG run with the new logical_date
but the database contained the original logical_date, causing the query to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is sufficient and
globally unique - Added test test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_ only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use identical logic
for determining DAG run completion - querying by run_id and state only, without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c5)
Co-authored-by: Mykola Shyshov mykola.shyshov@gmail.com
When TriggerDagRunOperator is used with deferrable=True, wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database preserves the
original logical_date from the first run. However, on subsequent runs after clearing,
the operator calculates a NEW logical_date based on the current time. The DagStateTrigger
was being created with this newly calculated logical_date, causing a mismatch when
querying the database - the trigger looked for a DAG run with the new logical_date
but the database contained the original logical_date, causing the query to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is sufficient and
globally unique
- Added test test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_ only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use identical logic
for determining DAG run completion - querying by run_id and state only, without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c5)
Co-authored-by: Mykola Shyshov
When TriggerDagRunOperator is used with deferrable=True, wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database preserves the
original logical_date from the first run. However, on subsequent runs after clearing,
the operator calculates a NEW logical_date based on the current time. The DagStateTrigger
was being created with this newly calculated logical_date, causing a mismatch when
querying the database - the trigger looked for a DAG run with the new logical_date
but the database contained the original logical_date, causing the query to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is sufficient and
globally unique
- Added test test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_ only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use identical logic
for determining DAG run completion - querying by run_id and state only, without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c5)
Co-authored-by: Mykola Shyshov
When TriggerDagRunOperator is used with deferrable=True, wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database preserves the
original logical_date from the first run. However, on subsequent runs after clearing,
the operator calculates a NEW logical_date based on the current time. The DagStateTrigger
was being created with this newly calculated logical_date, causing a mismatch when
querying the database - the trigger looked for a DAG run with the new logical_date
but the database contained the original logical_date, causing the query to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is sufficient and
globally unique
- Added test test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_ only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use identical logic
for determining DAG run completion - querying by run_id and state only, without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c5)
Co-authored-by: Mykola Shyshov
When TriggerDagRunOperator is used with deferrable=True, wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database preserves the
original logical_date from the first run. However, on subsequent runs after clearing,
the operator calculates a NEW logical_date based on the current time. The DagStateTrigger
was being created with this newly calculated logical_date, causing a mismatch when
querying the database - the trigger looked for a DAG run with the new logical_date
but the database contained the original logical_date, causing the query to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is sufficient and
globally unique
- Added test test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_ only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use identical logic
for determining DAG run completion - querying by run_id and state only, without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c5)
Co-authored-by: Mykola Shyshov
When TriggerDagRunOperator is used with deferrable=True, wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database preserves the
original logical_date from the first run. However, on subsequent runs after clearing,
the operator calculates a NEW logical_date based on the current time. The DagStateTrigger
was being created with this newly calculated logical_date, causing a mismatch when
querying the database - the trigger looked for a DAG run with the new logical_date
but the database contained the original logical_date, causing the query to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is sufficient and
globally unique
- Added test test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_ only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use identical logic
for determining DAG run completion - querying by run_id and state only, without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c5)
Co-authored-by: Mykola Shyshov
When TriggerDagRunOperator is used with deferrable=True, wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database preserves the
original logical_date from the first run. However, on subsequent runs after clearing,
the operator calculates a NEW logical_date based on the current time. The DagStateTrigger
was being created with this newly calculated logical_date, causing a mismatch when
querying the database - the trigger looked for a DAG run with the new logical_date
but the database contained the original logical_date, causing the query to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is sufficient and
globally unique
- Added test test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_ only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use identical logic
for determining DAG run completion - querying by run_id and state only, without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c5)
Co-authored-by: Mykola Shyshov