Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Fix topological sort#56963

Merged
kaxil merged 1 commit intoapache:mainfrom
astronomer:fix-topological-sort-odering
Oct 21, 2025
Merged

Fix topological sort#56963
kaxil merged 1 commit intoapache:mainfrom
astronomer:fix-topological-sort-odering

Conversation

Copy link
Member

pierrejeambrun commented Oct 21, 2025 *
edited by kaxil
Loading

Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:

from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group


@task
def get_nums() -> list[int]:
return [1, 2, 4]


@task
def times_2(n: int) -> int:
return n * 2


@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value


@task
def log_success() -> None:
print("Processed successful!")


@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()


test()

Before

After

pierrejeambrun added this to the Airflow 3.1.1 milestone Oct 21, 2025
pierrejeambrun self-assigned this Oct 21, 2025
pierrejeambrun requested review from ashb and bolkedebruin as code owners October 21, 2025 15:57
pierrejeambrun added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Oct 21, 2025
boring-cyborg bot added the area:serialization label Oct 21, 2025
Copy link
Member

dheerajturaga commented Oct 21, 2025

I tried to fix this in #56321
however seems like your implementation is simpler. let me know if I need to close mine

Copy link
Member Author

pierrejeambrun commented Oct 21, 2025 *
edited
Loading

Yes, I Think this implementation is simpler and should be favored, also it is more in lined with the sdk implementation

dheerajturaga reacted with thumbs up emoji

Copy link
Member Author

pierrejeambrun commented Oct 21, 2025

Cc: @kaxil

kaxil approved these changes Oct 21, 2025
dheerajturaga approved these changes Oct 21, 2025
Copy link
Member

dheerajturaga commented Oct 21, 2025

Yes, I Think this implementation is simpler and should be favored, also it is more in lined with the sdk implementation

Agreed! Thanks for fixing this!

kaxil merged commit c3f53b1 into apache:main Oct 21, 2025
114 checks passed
kaxil deleted the fix-topological-sort-odering branch October 21, 2025 17:27
github-actions bot pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
return [1, 2, 4]

@task
def times_2(n: int) -> int:
return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value

@task
def log_success() -> None:
print("Processed successful!")

@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()

test()
```

### Before
Screenshot 2025-10-21 at 17 57 20https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
Screenshot 2025-10-21 at 17 56 57https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />
(cherry picked from commit c3f53b1)

Co-authored-by: Pierre Jeambrun
Copy link

github-actions bot commented Oct 21, 2025

Backport successfully created: v3-1-test

Status Branch Result
v3-1-test

kaxil pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
return [1, 2, 4]

@task
def times_2(n: int) -> int:
return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value

@task
def log_success() -> None:
print("Processed successful!")

@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()

test()
```

### Before
Screenshot 2025-10-21 at 17 57 20https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
Screenshot 2025-10-21 at 17 56 57https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />

(cherry picked from commit c3f53b1)
potiuk mentioned this pull request Nov 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

kaxil kaxil approved these changes

dheerajturaga dheerajturaga approved these changes

ashb Awaiting requested review from ashb ashb is a code owner

bolkedebruin Awaiting requested review from bolkedebruin bolkedebruin is a code owner

Labels

area:serialization backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Milestone

Airflow 3.1.1

Development

Successfully merging this pull request may close these issues.

Order tasks topologically in Grid view

3 participants