-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Fixup to structlog migration - logging folder not being created correctly. #55431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking "Sign up for GitHub", you agree to our terms of service and privacy statement. We'll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Fixup to structlog migration - logging folder not being created correctly. #55431
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,7 +85,7 @@ def load_logging_config() -> tuple[dict[str, Any], str]: | |
|
|
||
|
|
||
| def configure_logging(): | ||
| from airflow._shared.logging.structlog import configure_logging | ||
| from airflow._shared.logging import configure_logging, init_log_folder | ||
|
|
||
| logging_config, logging_class_path = load_logging_config() | ||
| try: | ||
|
|
@@ -100,6 +100,18 @@ def configure_logging(): | |
|
|
||
| validate_logging_config() | ||
|
|
||
| new_folder_permissions = int( | ||
| conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), | ||
| 8, | ||
| ) | ||
|
|
||
| base_log_folder = conf.get("logging", "base_log_folder") | ||
|
|
||
| return init_log_folder( | ||
| base_log_folder, | ||
| new_folder_permissions=new_folder_permissions, | ||
| ) | ||
|
|
||
| return logging_class_path | ||
|
|
||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,4 +16,10 @@ | |
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| from .structlog import configure_logging as configure_logging | ||
| __all__ = [ | ||
| "configure_logging", | ||
| "init_log_file", | ||
| "init_log_folder", | ||
| ] | ||
|
|
||
| from .structlog import configure_logging, init_log_file, init_log_folder | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import sys | ||
| from collections.abc import Callable, Mapping, Sequence | ||
| from functools import cache, cached_property, partial | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast | ||
|
|
||
| import pygtrie | ||
|
|
@@ -505,6 +506,62 @@ def configure_logging( | |
| logging.config.dictConfig(config) | ||
|
|
||
|
|
||
| def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: int): | ||
| """ | ||
| Prepare the log folder and ensure its mode is as configured. | ||
|
|
||
| To handle log writing when tasks are impersonated, the log files need to | ||
| be writable by the user that runs the Airflow command and the user | ||
| that is impersonated. This is mainly to handle corner cases with the | ||
| SubDagOperator. When the SubDagOperator is run, all of the operators | ||
| run under the impersonated user and create appropriate log files | ||
| as the impersonated user. However, if the user manually runs tasks | ||
| of the SubDagOperator through the UI, then the log files are created | ||
| by the user that runs the Airflow command. For example, the Airflow | ||
| run command may be run by the `airflow_sudoable` user, but the Airflow | ||
| tasks may be run by the `airflow` user. If the log files are not | ||
| writable by both users, then it's possible that re-running a task | ||
| via the UI (or vice versa) results in a permission error as the task | ||
| tries to write to a log file created by the other user. | ||
|
|
||
| We leave it up to the user to manage their permissions by exposing configuration for both | ||
| new folders and new log files. Default is to make new log folders and files group-writeable | ||
| to handle most common impersonation use cases. The requirement in this case will be to make | ||
| sure that the same group is set as default group for both - impersonated user and main airflow | ||
| user. | ||
| """ | ||
| directory = Path(directory) | ||
| for parent in reversed(Path(directory).parents): | ||
| parent.mkdir(mode=new_folder_permissions, exist_ok=True) | ||
| directory.mkdir(mode=new_folder_permissions, exist_ok=True) | ||
|
|
||
|
|
||
| def init_log_file( | ||
| base_log_folder: str | os.PathLike[str], | ||
| local_relative_path: str | os.PathLike[str], | ||
| *, | ||
| new_folder_permissions: int = 0o775, | ||
| new_file_permissions: int = 0o664, | ||
| ) -> Path: | ||
| """ | ||
| Ensure log file and parent directories are created with the correct permissions. | ||
|
|
||
| Any directories that are missing are created with the right permission bits. | ||
|
|
||
| See above ``init_log_folder`` method for more detailed explanation. | ||
| """ | ||
| full_path = Path(base_log_folder, local_relative_path) | ||
| init_log_folder(full_path.parent, new_folder_permissions) | ||
|
|
||
| try: | ||
| full_path.touch(new_file_permissions) | ||
| except OSError as e: | ||
| log = structlog.get_logger(__name__) | ||
| log.warning("OSError while changing ownership of the log file. %s", e) | ||
|
|
||
| return full_path | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| configure_logging( | ||
| # json_output=True, | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -222,46 +222,14 @@ def logger_at_level(name: str, level: int) -> Logger: | |
| ) | ||
|
|
||
|
|
||
| def _prepare_log_folder(directory: Path, mode: int): | ||
| """ | ||
| Prepare the log folder and ensure its mode is as configured. | ||
|
|
||
| To handle log writing when tasks are impersonated, the log files need to | ||
| be writable by the user that runs the Airflow command and the user | ||
| that is impersonated. This is mainly to handle corner cases with the | ||
| SubDagOperator. When the SubDagOperator is run, all of the operators | ||
| run under the impersonated user and create appropriate log files | ||
| as the impersonated user. However, if the user manually runs tasks | ||
| of the SubDagOperator through the UI, then the log files are created | ||
| by the user that runs the Airflow command. For example, the Airflow | ||
| run command may be run by the `airflow_sudoable` user, but the Airflow | ||
| tasks may be run by the `airflow` user. If the log files are not | ||
| writable by both users, then it's possible that re-running a task | ||
| via the UI (or vice versa) results in a permission error as the task | ||
| tries to write to a log file created by the other user. | ||
|
|
||
| We leave it up to the user to manage their permissions by exposing configuration for both | ||
| new folders and new log files. Default is to make new log folders and files group-writeable | ||
| to handle most common impersonation use cases. The requirement in this case will be to make | ||
| sure that the same group is set as default group for both - impersonated user and main airflow | ||
| user. | ||
| """ | ||
| for parent in reversed(directory.parents): | ||
| parent.mkdir(mode=mode, exist_ok=True) | ||
| directory.mkdir(mode=mode, exist_ok=True) | ||
|
|
||
|
|
||
| def init_log_file(local_relative_path: str) -> Path: | ||
| """ | ||
| Ensure log file and parent directories are created. | ||
|
|
||
| Any directories that are missing are created with the right permission bits. | ||
|
|
||
| See above ``_prepare_log_folder`` method for more detailed explanation. | ||
| """ | ||
| # NOTE: This is duplicated from airflow.utils.log.file_task_handler:FileTaskHandler._init_fi le, but we | ||
| # want to remove that | ||
| from airflow.configuration import conf | ||
| from airflow.sdk._shared.logging import init_log_file | ||
|
|
||
| new_file_permissions = int( | ||
| conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), | ||
|
|
@@ -273,17 +241,13 @@ def init_log_file(local_relative_path: str) -> Path: | |
| ) | ||
|
|
||
| base_log_folder = conf.get("logging", "base_log_folder") | ||
| full_path = Path(base_log_folder, local_relative_path) | ||
|
|
||
| _prepare_log_folder(full_path.parent, new_folder_permissions) | ||
|
|
||
| try: | ||
| full_path.touch(new_file_permissions) | ||
| except OSError as e: | ||
| log = structlog.get_logger(__name__) | ||
| log.warning("OSError while changing ownership of the log file. %s", e) | ||
|
|
||
| return full_path | ||
| return init_log_file( | ||
| base_log_folder, | ||
| local_relative_path, | ||
| new_folder_permissions=new_folder_permissions, | ||
| new_file_permissions=new_file_permissions, | ||
| ) | ||
|
|
||
|
|
||
| def load_remote_log_handler() -> RemoteLogIO | None: | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.