-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Fix grid view task ordering by correcting topological_sort implementation#56321
Fix grid view task ordering by correcting topological_sort implementation#56321dheerajturaga wants to merge 2 commits intoapache:mainfrom
Conversation
The SerializedTaskGroup.topological_sort() method had a critical bug in its
topological sorting algorithm. After checking if an upstream dependency's parent
task group was still in the unsorted graph, the code failed to verify whether
such a parent was found before proceeding. This caused the else clause to execute
even when nodes had unresolved parent task group dependencies, resulting in tasks
being sorted out of dependency order in the grid view.
The fix adds the missing logic:
- Check if a parent task group dependency exists (if tg:) and break if found
- Track progress with an acyclic flag to detect cycles or stuck states
- Break the loop if no nodes are resolved in an iteration
Also added the missing hierarchical_alphabetical_sort() method to support the
alternative grid_view_sorting_order configuration option.
This ensures tasks are displayed in the correct dependency order in the grid view,
matching how they are executed.
Before:
After:
8e50430 to
0befe88
Compare
|
@ashb , Im not sure if the CI check fails are related to my change... |
|
Did this change in 3.1? The test failures look unrelated -- try rebasing to see if that fixes it |
Looks like the task ordering changed between 3.0.6 and 3.1 Reg test failures, I already rebased, I suspect CI is broken at the moment |
#55169 (comment) bit us? |
0befe88 to
0263a87
Compare
The SerializedTaskGroup.topological_sort() method had a critical bug in its
topological sorting algorithm. After checking if an upstream dependency's parent
task group was still in the unsorted graph, the code failed to verify whether
such a parent was found before proceeding. This caused the else clause to execute
even when nodes had unresolved parent task group dependencies, resulting in tasks
being sorted out of dependency order in the grid view.
The fix adds the missing logic:
1. Check if a parent task group dependency exists (if tg:) and break if found
2. Track progress with an acyclic flag to detect cycles or stuck states
3. Break the loop if no nodes are resolved in an iteration
Also added the missing hierarchical_alphabetical_sort() method to support the
alternative grid_view_sorting_order configuration option.
This ensures tasks are displayed in the correct dependency order in the grid view,
matching how they are executed.
The SerializedTaskGroup.topological_sort() method had critical bugs that caused
tasks to display in incorrect order in the grid view:
1. Missing logic to check if parent task group dependencies exist before adding
nodes to the sorted list, causing premature sorting of dependent tasks.
2. Failed to handle task groups differently from tasks when checking upstream
dependencies. Task groups use upstream_group_ids/upstream_task_ids attributes,
while tasks use upstream_list. The original implementation only checked
upstream_list, causing task groups to appear to have no dependencies.
The fix:
- Added missing hierarchical_alphabetical_sort() method to support the alternative
grid_view_sorting_order configuration option
- Fixed topological_sort() to properly detect upstream dependencies for both tasks
(via upstream_list) and task groups (via upstream_group_ids/upstream_task_ids)
- Added check after the parent task group search loop to break if a dependency
was found
- Added acyclic flag tracking and handling for cycle/stuck states
Updated unit tests to reflect correct topological ordering where tasks appear
after their dependencies rather than in arbitrary order.
This ensures tasks are displayed in correct dependency order in the grid view,
matching how they are executed.
0263a87 to
a99c496
Compare
potiuk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the other PR, I think #56963 should be favored, the implementation is simpler and more consistent with the sdk counterpart
Closes #56321
Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.
Adjusted test, which indeed were not in the correct topological order.
Testing dag code:
```python
from __future__ import annotations
import datetime
import pendulum
from airflow.sdk import dag, task, task_group
@task
def get_nums() -> list[int]:
return [1, 2, 4]
@task
def times_2(n: int) -> int:
return n * 2
@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value
@task
def log_success() -> None:
print("Processed successful!")
@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()
test()
```
### Before
### After
(cherry picked from commit c3f53b1)
Co-authored-by: Pierre Jeambrun
Closes apache#56321
Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.
Adjusted test, which indeed were not in the correct topological order.
Testing dag code:
```python
from __future__ import annotations
import datetime
import pendulum
from airflow.sdk import dag, task, task_group
@task
def get_nums() -> list[int]:
return [1, 2, 4]
@task
def times_2(n: int) -> int:
return n * 2
@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value
@task
def log_success() -> None:
print("Processed successful!")
@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()
test()
```
### Before
### After
Closes #56321
Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.
Adjusted test, which indeed were not in the correct topological order.
Testing dag code:
```python
from __future__ import annotations
import datetime
import pendulum
from airflow.sdk import dag, task, task_group
@task
def get_nums() -> list[int]:
return [1, 2, 4]
@task
def times_2(n: int) -> int:
return n * 2
@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value
@task
def log_success() -> None:
print("Processed successful!")
@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()
test()
```
### Before
### After
(cherry picked from commit c3f53b1)
Closes apache/airflow#56321
Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.
Adjusted test, which indeed were not in the correct topological order.
Testing dag code:
```python
from __future__ import annotations
import datetime
import pendulum
from airflow.sdk import dag, task, task_group
@task
def get_nums() -> list[int]:
return [1, 2, 4]
@task
def times_2(n: int) -> int:
return n * 2
@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value
@task
def log_success() -> None:
print("Processed successful!")
@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()
test()
```
### Before
### After
(cherry picked from commit c3f53b1d598a55df42ba588fbd1dd10fab2f2ae8)
GitOrigin-RevId: ccc33ffd109b64c6e41512d9cbaa38c53cabef7d