-
Notifications
You must be signed in to change notification settings - Fork 16.7k
Fix upgrade failure when xcom contains NaN in string values#57614
Fix upgrade failure when xcom contains NaN in string values#57614kaxil merged 1 commit intoapache:mainfrom
Conversation
closes: #57608
Problem
When migrating from Airflow 2.x to 3.0, the migration 0049_3_0_0_remove_pickled_data_from_xcom_table was failing.
This occurred when XCOM data contained the string "NaN" as part of a value, for example:
The migration was using a simple REPLACE() function that replaced all occurrences of 'NaN' with '"nan"', which broke the JSON structure when NaN appeared inside string values. The broken JSON would become:
And things would break.
I changed the NaN sanitization to use regex with word boundaries to match only standalone NaN tokens (like "key": NaN) while preserving NaN that appears inside string values.
- PostgreSQL: Uses
regexp_replacewith word boundaries (\bNaN\b) (https://neon.com/postgresql/postgresql-string-functions/regexp_replace) - MySQL: Uses
REGEXP_REPLACEwith word boundaries ([[:<:]]NaN[[:>:]]) (https://dev.mysql.com/doc/refman/8.4/en/regexp.html) - SQLite: Kept simple with original
REPLACE()approach as sqlite doesn't have regex support(?)
Testing
- Installed 2.11 Airflow using:
breeze start-airflow --backend postgres --executor CeleryExecutor --use-airflow-version 2.11.0 --db-reset - Ran this DAG:
from airflow.decorators import dag, task
from pendulum import datetime
import requests
# Define the basic parameters of the DAG, like schedule and start_date
@dag(
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
doc_md=__doc__,
default_args={"owner": "Astro", "retries": 3},
tags=["example"],
)
def example_astronauts():
@task
def get_astronauts(**context) -> list[dict]:
number_of_people_in_space = 1
list_of_people_in_space = [
{"craft": "Tiangong", "name": "Ye GuangfuNaN"},
]
context["ti"].xcom_push(
key="number_of_people_in_space", value=number_of_people_in_space
)
return list_of_people_in_space
@task
def print_astronaut_craft(greeting: str, person_in_space: dict) -> None:
"""
This task creates a print statement with the name of an
Astronaut in space and the craft they are flying on from
the API request results of the previous task, along with a
greeting which is hard-coded in this example.
"""
craft = person_in_space["craft"]
name = person_in_space["name"]
print(f"{name} is currently in space flying on the {craft}! {greeting}")
print_astronaut_craft.partial(greeting="Hello! :)").expand(
person_in_space=get_astronauts() # Define dependencies using TaskFlow API syntax
)
# Instantiate the DAG
example_astronauts()
- Upgraded airflow to 3.11 using breeze:
breeze start-airflow --backend postgres --executor CeleryExecutor --use-airflow-version 3.1.1
No migration error:
Performing upgrade to the metadata database postgresql+psycopg2://postgres:***@postgres/airflow
2025-10-31T10:40:09.197045Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-10-31T10:40:09.197133Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-10-31T10:40:09.197938Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-10-31T10:40:09.197991Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-10-31T10:40:09.198247Z [info ] Creating Airflow database tables from the ORM [airflow.utils.db] loc=db.py:684
2025-10-31T10:40:09.198296Z [info ] Creating context [airflow.utils.db] loc=db.py:689
2025-10-31T10:40:09.200174Z [info ] Binding engine [airflow.utils.db] loc=db.py:691
2025-10-31T10:40:09.200245Z [info ] Pool status: Pool size: 5 Connections in pool: 0 Current Overflow: -3 Current Checked out connections: 2 [airflow.utils.db] loc=db.py:693
2025-10-31T10:40:09.200284Z [info ] Creating metadata [airflow.utils.db] loc=db.py:695
2025-10-31T10:40:09.424717Z [info ] Getting alembic config [airflow.utils.db] loc=db.py:698
2025-10-31T10:40:09.425147Z [info ] Stamping migration head [airflow.utils.db] loc=db.py:705
2025-10-31T10:40:09.426209Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-10-31T10:40:09.426272Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-10-31T10:40:09.450286Z [info ] Running stamp_revision -> cc92b33c6709 [alembic.runtime.migration] loc=migration.py:622
2025-10-31T10:40:09.451776Z [info ] Airflow database tables created [airflow.utils.db] loc=db.py:708
^ Add meaningful description above
Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull
-request-guidelines) for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.