Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Implement async version of databricks_conn in BaseDatabricksHook#55568

Open
BasPH wants to merge 18 commits intoapache:mainfrom
BasPH:async-dbx-base-hook
Open

Implement async version of databricks_conn in BaseDatabricksHook#55568
BasPH wants to merge 18 commits intoapache:mainfrom
BasPH:async-dbx-base-hook

Conversation

Copy link
Contributor

BasPH commented Sep 12, 2025 *
edited
Loading

I bumped into this error when running the DatabricksSubmitRunOperator on Airflow 3.0.6 using apache-airflow-providers-databricks==7.7.1:

ERROR - Trigger failed:
Traceback (most recent call last):

File "/usr/local/lib/python3.12/site-packages/airflow/jobs/trigge rer_job_runner.py", line 963, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/jobs/trigge rer_job_runner.py", line 1072, in run_trigger
async for event in trigger.run():

File "/usr/local/lib/python3.12/site-packages/airflow/providers/d atabricks/triggers/databricks.py", line 90, in run
run_state = await self.hook.a_get_run_state(self.run_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/providers/d atabricks/hooks/databricks.py", line 514, in a_get_run_state
response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/providers/d atabricks/hooks/databricks_base.py", line 713, in _a_do_api_call
url = self._endpoint_url(full_endpoint)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/providers/d atabricks/hooks/databricks_base.py", line 623, in _endpoint_url
port = f":{self.databricks_conn.port}" if self.databricks_conn.port else ""
^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/functools.py", line 998, in __get__
val = self.func(instance)
^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/providers/d atabricks/hooks/databricks_base.py", line 142, in databricks_conn
return self.get_connection(self.databricks_conn_id) # type: ignore[return-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/hooks/base. py", line 64, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/models/conn ection.py", line 478, in get_connection_from_secrets
conn = TaskSDKConnection.get(conn_id=conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definit ions/connection.py", line 144, in get
return _get_connection(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/sdk/executi on_time/context.py", line 160, in _get_connection
msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/airflow/jobs/trigge rer_job_runner.py", line 740, in send
return async_to_sync(self.asend)(msg)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/usr/local/lib/python3.12/site-packages/asgiref/sync.py", line 187, in __call__
raise RuntimeError(

RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
: source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator"
[2025-09-11, 15:56:36] ERROR - Task failed with exception: source="task"
TaskDeferralError: Trigger failure
File "/usr/local/lib/python3.12/site-packages/airflow/sdk/executi on_time/task_runner.py", line 920 in run

File "/usr/local/lib/python3.12/site-packages/airflow/sdk/executi on_time/task_runner.py", line 1215 in _execute_task

File "/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/o perator.py", line 1603 in resume_execution

Searching for the key message RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. led me to several related issues/PRs:

I didn't test the exact version in which deferrable mode on the DatabricksSubmitRunOperator broke, but I believe it's Airflow 3.0.3.

This PR adds an async version of the databricks_conn method and changes all async methods to use this new a_databricks_conn method for fetching the connection.

Tested by fixing all tests. I don't have a real Databricks instance to test against, but also tested this locally by monkeypatching several calls in the DatabricksHook and BaseDatabricksHook to the point where the AsyncToSync error was reached, then applied the changes from this PR, and a different error was reached because I don't have connectivity to a real Databricks instance.

Also: mypy was complaining about several usernames/passwords being None where a string was expected. I learned that an empty username/password is valid according to RFC 2617, so decided to default to "" in case it's None.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

dabla reacted with rocket emoji
BasPH changed the title Run get_connection in BaseDatabricksHook async Implement async version of get_connection in BaseDatabricksHook Sep 12, 2025
BasPH changed the title Implement async version of get_connection in BaseDatabricksHook Implement async version of databricks_conn in BaseDatabricksHook Sep 12, 2025
ashb reviewed Sep 12, 2025
ashb reviewed Sep 12, 2025
dstandish mentioned this pull request Sep 17, 2025
kaxil mentioned this pull request Sep 17, 2025
dstandish added a commit to astronomer/airflow that referenced this pull request Sep 19, 2025
In 2.x sometimes get_connection (which goes to the database) might be called without wrapping in sync_to_async.

This did not fail, though it was not good behavior, since it can block the event loop.

In 3.0, since we now route db calls through an API, triggers that do this fail. The reason is, the code to hit the API wraps the get_connection call with async_to_sync, which is forbidden in the asyncio event loop.

Related: apache#55568

(cherry picked from commit f5b1eb4)
dstandish added a commit to astronomer/airflow that referenced this pull request Sep 19, 2025
In 2.x sometimes get_connection (which goes to the database) might be called without wrapping in sync_to_async.

This did not fail, though it was not good behavior, since it can block the event loop.

In 3.0, since we now route db calls through an API, triggers that do this fail. The reason is, the code to hit the API wraps the get_connection call with async_to_sync, which is forbidden in the asyncio event loop.

Related: apache#55568

(cherry picked from commit f5b1eb4)
dstandish added a commit to astronomer/airflow that referenced this pull request Sep 19, 2025
In 2.x sometimes get_connection (which goes to the database) might be called without wrapping in sync_to_async.

This did not fail, though it was not good behavior, since it can block the event loop.

In 3.0, since we now route db calls through an API, triggers that do this fail. The reason is, the code to hit the API wraps the get_connection call with async_to_sync, which is forbidden in the asyncio event loop.

Related: apache#55568

(cherry picked from commit f5b1eb4)
kaxil added a commit to astronomer/airflow that referenced this pull request Oct 23, 2025
When deferrable operators run in the triggerer's async event loop and
synchronously access connections (e.g., via @cached_property), the
`ExecutionAPISecretsBackend` failed silently. This occurred because
`SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
when called within an existing event loop in a greenback portal context.

Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
detects this scenario and uses `greenback.await_()` to call the async
versions (aget_connection/aget_variable) as a fallback.

It was originally fixed in apache#55799 for 3.1.0
but apache#56602 introduced a bug.

Ideally all providers handle this better and have better written Triggers. Example
PR for Databricks: apache#55568

Fixes apache#57145
kaxil mentioned this pull request Oct 23, 2025
kaxil added a commit to astronomer/airflow that referenced this pull request Oct 23, 2025
When deferrable operators run in the triggerer's async event loop and
synchronously access connections (e.g., via @cached_property), the
`ExecutionAPISecretsBackend` failed silently. This occurred because
`SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
when called within an existing event loop in a greenback portal context.

Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
detects this scenario and uses `greenback.await_()` to call the async
versions (aget_connection/aget_variable) as a fallback.

It was originally fixed in apache#55799 for 3.1.0
but apache#56602 introduced a bug.

Ideally all providers handle this better and have better written Triggers. Example
PR for Databricks: apache#55568

Fixes apache#57145
kaxil added a commit to astronomer/airflow that referenced this pull request Oct 23, 2025
When deferrable operators run in the triggerer's async event loop and
synchronously access connections (e.g., via @cached_property), the
`ExecutionAPISecretsBackend` failed silently. This occurred because
`SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
when called within an existing event loop in a greenback portal context.

Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
detects this scenario and uses `greenback.await_()` to call the async
versions (aget_connection/aget_variable) as a fallback.

It was originally fixed in apache#55799 for 3.1.0
but apache#56602 introduced a bug.

Ideally all providers handle this better and have better written Triggers. Example
PR for Databricks: apache#55568

Fixes apache#57145
kaxil added a commit to astronomer/airflow that referenced this pull request Oct 23, 2025
When deferrable operators run in the triggerer's async event loop and
synchronously access connections (e.g., via @cached_property), the
`ExecutionAPISecretsBackend` failed silently. This occurred because
`SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
when called within an existing event loop in a greenback portal context.

Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
detects this scenario and uses `greenback.await_()` to call the async
versions (aget_connection/aget_variable) as a fallback.

It was originally fixed in apache#55799 for 3.1.0
but apache#56602 introduced a bug.

Ideally all providers handle this better and have better written Triggers. Example
PR for Databricks: apache#55568

Fixes apache#57145
kaxil added a commit that referenced this pull request Oct 23, 2025
When deferrable operators run in the triggerer's async event loop and
synchronously access connections (e.g., via @cached_property), the
`ExecutionAPISecretsBackend` failed silently. This occurred because
`SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
when called within an existing event loop in a greenback portal context.

Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
detects this scenario and uses `greenback.await_()` to call the async
versions (aget_connection/aget_variable) as a fallback.

It was originally fixed in #55799 for 3.1.0
but #56602 introduced a bug.

Ideally all providers handle this better and have better written Triggers. Example
PR for Databricks: #55568

Fixes #57145
kaxil added a commit that referenced this pull request Oct 23, 2025
When deferrable operators run in the triggerer's async event loop and
synchronously access connections (e.g., via @cached_property), the
`ExecutionAPISecretsBackend` failed silently. This occurred because
`SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
when called within an existing event loop in a greenback portal context.

Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
detects this scenario and uses `greenback.await_()` to call the async
versions (aget_connection/aget_variable) as a fallback.

It was originally fixed in #55799 for 3.1.0
but #56602 introduced a bug.

Ideally all providers handle this better and have better written Triggers. Example
PR for Databricks: #55568

Fixes #57145

(cherry picked from commit da32b68)
Copy link
Member

kaxil commented Oct 23, 2025

Ping @BasPH to rebase & resolve conflicts

Copy link

github-actions bot commented Dec 24, 2025

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 24, 2025
github-actions bot closed this Dec 29, 2025
Copy link

dmnpignaud commented Dec 30, 2025

Hi @BasPH thank you for the PR, this is super useful, could you re-open it please ?

potiuk removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 30, 2025
potiuk reopened this Dec 30, 2025
Copy link
Member

potiuk commented Dec 30, 2025

I reopened it

tirkarthi mentioned this pull request Jan 29, 2026
2 tasks
Copy link

github-actions bot commented Feb 14, 2026

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 14, 2026
github-actions bot closed this Feb 20, 2026
Copy link

zerodarkzone commented Feb 20, 2026

This is not solved, please reopen

kaxil reopened this Feb 20, 2026
kaxil removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 20, 2026
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Mar 2, 2026
When deferrable operators run in the triggerer's async event loop and
synchronously access connections (e.g., via @cached_property), the
`ExecutionAPISecretsBackend` failed silently. This occurred because
`SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
when called within an existing event loop in a greenback portal context.

Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
detects this scenario and uses `greenback.await_()` to call the async
versions (aget_connection/aget_variable) as a fallback.

It was originally fixed in apache/airflow#55799 for 3.1.0
but apache/airflow#56602 introduced a bug.

Ideally all providers handle this better and have better written Triggers. Example
PR for Databricks: apache/airflow#55568

Fixes apache/airflow#57145

(cherry picked from commit da32b682d1b0df5d5e2078392cf8626f8fdb00ff)

GitOrigin-RevId: f969e6374daa8469938169be16a28f7c073a5ce9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

ashb ashb left review comments

At least 1 approving review is required to merge this pull request.

Assignees

No one assigned

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

6 participants