-
Notifications
You must be signed in to change notification settings - Fork 16.6k
[v3-1-test] Fix ApprovalOperator with SimpleAuthManager when all_admi... #60116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking "Sign up for GitHub", you agree to our terms of service and privacy statement. We'll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
[v3-1-test] Fix ApprovalOperator with SimpleAuthManager when all_admi... #60116
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -201,6 +201,7 @@ The following methods aren't required to override to have a functional Airflow a | |
| * ``batch_is_authorized_pool``: Batch version of ``is_authorized_pool``. If not overridden, it will call ``is_authorized_pool`` for every single item. | ||
| * ``batch_is_authorized_variable``: Batch version of ``is_authorized_variable``. If not overridden, it will call ``is_authorized_variable`` for every single item. | ||
| * ``get_authorized_dag_ids``: Return the list of Dag IDs the user has access to. If not overridden, it will call ``is_authorized_dag`` for every single Dag available in the environment. | ||
| * ``is_authorized_hitl_task``: Return whether the user is authorized to approve or reject a Human-in-the-loop (HITL) task. Override this method to implement custom authorization logic for HITL tasks. If not overridden, it checks if the user's ID is in the assigned users list. | ||
|
|
||
| * Note: To filter the results of ``get_authorized_dag_ids``, it is recommended that you define the filtering logic in your ``filter_authorized_dag_ids`` method. For example, this may be useful if you rely on per-Dag access controls derived from one or more fields on a given Dag (e.g. Dag tags). | ||
| * This method requires an active session with the Airflow metadata database. As such, overriding the ``get_authorized_dag_ids`` method is an advanced use case, which should be considered carefully -- it is recommended you refer to the :doc:`../../database-erd-ref`. | ||
|
|
||
1 change: 1 addition & 0 deletions
airflow-core/newsfragments/59399.feature.rst
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Add ``is_authorized_hitl_task()`` method to check whether a user a is authorized to approve a HITL task |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -318,6 +318,18 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], *, user: T) - | |
| :param user: the user | ||
| """ | ||
|
|
||
| def is_authorized_hitl_task(self, *, assigned_users: set[str], user: T) -> bool: | ||
| """ | ||
| Check if a user is allowed to approve/reject a HITL task. | ||
|
|
||
| By default, checks if the user's ID is in the assigned_users set. | ||
| Auth managers can override this method to implement custom logic. | ||
|
|
||
| :param assigned_users: set of user IDs assigned to the task | ||
| :param user: the user to check authorization for | ||
| """ | ||
| return user.get_id() in assigned_users | ||
|
|
||
| def batch_is_authorized_connection( | ||
| self, | ||
| requests: Sequence[IsAuthorizedConnectionRequest], | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -272,6 +272,26 @@ def filter_authorized_menu_items( | |
| ) -> list[MenuItem]: | ||
| return menu_items | ||
|
|
||
| def is_authorized_hitl_task(self, *, assigned_users: set[str], user: SimpleAuthManagerUser) -> bool: | ||
| """ | ||
| Check if a user is allowed to approve/reject a HITL task. | ||
|
|
||
| When simple_auth_manager_all_admins=True, all authenticated users are allowed | ||
| to approve/reject any task. Otherwise, the user must be in the assigned_users set. | ||
| """ | ||
| is_simple_auth_manager_all_admins = conf.getboolean("core", "simple_auth_manager_all_admins") | ||
|
|
||
| if is_simple_auth_manager_all_admins: | ||
| # In all-admin mode, everyone is allowed | ||
| return True | ||
|
|
||
| # If no assigned_users specified, allow access | ||
| if not assigned_users: | ||
| return True | ||
|
|
||
| # Delegate to parent class for the actual authorization check | ||
| return super().is_authorized_hitl_task(assigned_users=assigned_user s, user=user) | ||
|
|
||
| def get_fastapi_app(self) -> FastAPI | None: | ||
| """ | ||
| Specify a sub FastAPI application specific to the auth manager. | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,7 +51,12 @@ | |
| UpdateHITLDetailPayload, | ||
| ) | ||
| from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc | ||
| from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag | ||
| from airflow.api_fastapi.core_api.security import ( | ||
| GetUserDep, | ||
| ReadableTIFilterDep, | ||
| get_auth_manager, | ||
| requires_access_dag, | ||
| ) | ||
| from airflow.api_fastapi.logging.decorators import action_logging | ||
| from airflow.models.dag_version import DagVersion | ||
| from airflow.models.dagrun import DagRun | ||
|
|
@@ -155,7 +160,9 @@ def update_hitl_detail( | |
| user_id = str(user_id) | ||
| hitl_user = HITLUser(id=user_id, name=user_name) | ||
| if hitl_detail_model.assigned_users: | ||
| if hitl_user not in hitl_detail_model.assigned_users: | ||
| # Convert assigned_users list to set of user IDs for authorization check | ||
| assigned_user_ids = {assigned_user["id"] for assigned_user in hitl_detail_model.assigned_users} | ||
| if not get_auth_manager().is_authorized_hitl_task(assigned_users=as signed_user_ids, user=user): | ||
| log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id) | ||
| raise HTTPException( | ||
| status.HTTP_403_FORBIDDEN, | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -268,3 +268,25 @@ def test_filter_authorized_menu_items(self, auth_manager): | |
| items, user=SimpleAuthManagerUser(username="test", role=None) | ||
| ) | ||
| assert results == items | ||
|
|
||
| @pytest.mark.parametrize( | ||
| ("all_admins", "user_id", "assigned_users", "expected"), | ||
| [ | ||
| # When simple_auth_manager_all_admins=True, any user should be allowed | ||
| (True, "user1", {"user2"}, True), | ||
| (True, "user2", {"user2"}, True), | ||
| (True, "admin", {"test_user"}, True), | ||
| # When simple_auth_manager_all_admins=False, user must be in assigned_users | ||
| (False, "user1", {"user1"}, True), | ||
| (False, "user2", {"user1"}, False), | ||
| (False, "admin", {"test_user"}, False), | ||
| # When no assigned_users, allow access | ||
| (False, "user1", set(), True), | ||
| ], | ||
| ) | ||
| def test_is_authorized_hitl_task(self, auth_manager, all_admins, user_id, assigned_users, expected): | ||
| """Test is_authorized_hitl_task method with different configurations.""" | ||
| with conf_vars({("core", "simple_auth_manager_all_admins"): str(all_admins)}): | ||
| user = SimpleAuthManagerUser(username=user_id, role="user") | ||
| result = auth_manager.is_authorized_hitl_task(assigned_users=assigned _users, user=user) | ||
| assert result == expected | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.