Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Triggerer crash when migrating Airflow 2 to 3 with async dagrun already in deferred state #55713

Closed
Closed
Triggerer crash when migrating Airflow 2 to 3 with async dagrun already in deferred state #55713
Assignees
Labels
Milestone

Description

Apache Airflow version

3.1.0b2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
return func(*args, **kwargs)
File "/opt/airflow/airflow-core/src/airflow/cli/commands/triggerer_command.py", line 69, in triggerer
run_command_with_daemon_option(
File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
callback()
File "/opt/airflow/airflow-core/src/airflow/cli/commands/triggerer_command.py", line 72, in <lambda>
callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
File "/opt/airflow/airflow-core/src/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run
run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 368, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 397, in execute_job
ret = execute_callable()
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 170, in _execute
self.trigger_runner.run()
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 526, in run
self.load_triggers()
File "/opt/airflow/airflow-core/src/airflow/traces/tracer.py", line 58, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 549, in load_triggers
self.update_triggers(set(ids))
File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 653, in update_triggers
ser_ti = workloads.TaskInstance.model_validate(
File "/usr/python/lib/python3.10/site-packages/pydantic/main.py", line 705, in model_validate
return cls.__pydantic_validator__.validate_python(
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskInstance
dag_version_id
UUID input should be a string, bytes or UUID object [type=uuid_type, input_value=None, input_type=NoneType]
For further information visit https://errors.pydantic.dev/2.11/v/uuid_type
INFO: Shutting down

Note: Works fine if the async dag was not in deferred state during migration.

What you think should happen instead?

No response

How to reproduce

  1. Start airflow with AF version 2.11.0. (breeze start-airflow --executor CeleryExecutor --backend postgres --use-airflow-version 2.11.0)
  2. Trigger the below dag
  3. Migrate to 3.1.0b2 (breeze start-airflow --executor CeleryExecutor --backend postgres)
  4. Notice the error in triggerer logs
from datetime import datetime
from airflow import DAG
from airflow.utils.dates import days_ago

from airflow.sensors.date_time import DateTimeSensorAsync


with (DAG("async_trigger_sleep", schedule_interval=None, start_date=days_ago(1),tags=["async_migration"])
) as dag:
DateTimeSensorAsync(
task_id="wait_for_start",
target_time=datetime(2028, 12, 10),

)

Works fine with the below dag:

from airflow import DAG
from pendulum import today
from airflow.sensors.date_time import DateTimeSensorAsync

with DAG(
"async_trigger_sleep",
schedule=None,
start_date=today("UTC").add(days=-1),
tags=["async_migration"],
) as dag:
DateTimeSensorAsync(
task_id="wait_for_start",
# wait for 1 minute from the DAG execution date
target_time="{{ (execution_date + macros.timedelta(minutes=1)).isoformat() }}",
)

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions