Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Fixup to structlog migration - logging folder not being created correctly. #55431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking "Sign up for GitHub", you agree to our terms of service and privacy statement. We'll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
ashb merged 2 commits into apache:main from astronomer:fix-log-folder-not-created
Sep 9, 2025
Merged

Fixup to structlog migration - logging folder not being created correctly. #55431

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion airflow-core/src/airflow/logging_config.py
View file
Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def load_logging_config() -> tuple[dict[str, Any], str]:


def configure_logging():
from airflow._shared.logging.structlog import configure_logging
from airflow._shared.logging import configure_logging, init_log_folder

logging_config, logging_class_path = load_logging_config()
try:
Expand All @@ -100,6 +100,18 @@ def configure_logging():

validate_logging_config()

new_folder_permissions = int(
conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"),
8,
)

base_log_folder = conf.get("logging", "base_log_folder")

return init_log_folder(
base_log_folder,
new_folder_permissions=new_folder_permissions,
)

return logging_class_path


Expand Down
8 changes: 7 additions & 1 deletion shared/logging/src/airflow_shared/logging/__init__.py
View file
Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@
# under the License.
from __future__ import annotations

from .structlog import configure_logging as configure_logging
__all__ = [
"configure_logging",
"init_log_file",
"init_log_folder",
]

from .structlog import configure_logging, init_log_file, init_log_folder
57 changes: 57 additions & 0 deletions shared/logging/src/airflow_shared/logging/structlog.py
View file
Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import sys
from collections.abc import Callable, Mapping, Sequence
from functools import cache, cached_property, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast

import pygtrie
Expand Down Expand Up @@ -505,6 +506,62 @@ def configure_logging(
logging.config.dictConfig(config)


def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: int):
"""
Prepare the log folder and ensure its mode is as configured.

To handle log writing when tasks are impersonated, the log files need to
be writable by the user that runs the Airflow command and the user
that is impersonated. This is mainly to handle corner cases with the
SubDagOperator. When the SubDagOperator is run, all of the operators
run under the impersonated user and create appropriate log files
as the impersonated user. However, if the user manually runs tasks
of the SubDagOperator through the UI, then the log files are created
by the user that runs the Airflow command. For example, the Airflow
run command may be run by the `airflow_sudoable` user, but the Airflow
tasks may be run by the `airflow` user. If the log files are not
writable by both users, then it's possible that re-running a task
via the UI (or vice versa) results in a permission error as the task
tries to write to a log file created by the other user.

We leave it up to the user to manage their permissions by exposing configuration for both
new folders and new log files. Default is to make new log folders and files group-writeable
to handle most common impersonation use cases. The requirement in this case will be to make
sure that the same group is set as default group for both - impersonated user and main airflow
user.
"""
directory = Path(directory)
for parent in reversed(Path(directory).parents):
parent.mkdir(mode=new_folder_permissions, exist_ok=True)
directory.mkdir(mode=new_folder_permissions, exist_ok=True)


def init_log_file(
base_log_folder: str | os.PathLike[str],
local_relative_path: str | os.PathLike[str],
*,
new_folder_permissions: int = 0o775,
new_file_permissions: int = 0o664,
) -> Path:
"""
Ensure log file and parent directories are created with the correct permissions.

Any directories that are missing are created with the right permission bits.

See above ``init_log_folder`` method for more detailed explanation.
"""
full_path = Path(base_log_folder, local_relative_path)
init_log_folder(full_path.parent, new_folder_permissions)

try:
full_path.touch(new_file_permissions)
except OSError as e:
log = structlog.get_logger(__name__)
log.warning("OSError while changing ownership of the log file. %s", e)

return full_path


if __name__ == "__main__":
configure_logging(
# json_output=True,
Expand Down
50 changes: 7 additions & 43 deletions task-sdk/src/airflow/sdk/log.py
View file
Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -222,46 +222,14 @@ def logger_at_level(name: str, level: int) -> Logger:
)


def _prepare_log_folder(directory: Path, mode: int):
"""
Prepare the log folder and ensure its mode is as configured.

To handle log writing when tasks are impersonated, the log files need to
be writable by the user that runs the Airflow command and the user
that is impersonated. This is mainly to handle corner cases with the
SubDagOperator. When the SubDagOperator is run, all of the operators
run under the impersonated user and create appropriate log files
as the impersonated user. However, if the user manually runs tasks
of the SubDagOperator through the UI, then the log files are created
by the user that runs the Airflow command. For example, the Airflow
run command may be run by the `airflow_sudoable` user, but the Airflow
tasks may be run by the `airflow` user. If the log files are not
writable by both users, then it's possible that re-running a task
via the UI (or vice versa) results in a permission error as the task
tries to write to a log file created by the other user.

We leave it up to the user to manage their permissions by exposing configuration for both
new folders and new log files. Default is to make new log folders and files group-writeable
to handle most common impersonation use cases. The requirement in this case will be to make
sure that the same group is set as default group for both - impersonated user and main airflow
user.
"""
for parent in reversed(directory.parents):
parent.mkdir(mode=mode, exist_ok=True)
directory.mkdir(mode=mode, exist_ok=True)


def init_log_file(local_relative_path: str) -> Path:
"""
Ensure log file and parent directories are created.

Any directories that are missing are created with the right permission bits.

See above ``_prepare_log_folder`` method for more detailed explanation.
"""
# NOTE: This is duplicated from airflow.utils.log.file_task_handler:FileTaskHandler._init_fi le, but we
# want to remove that
from airflow.configuration import conf
from airflow.sdk._shared.logging import init_log_file

new_file_permissions = int(
conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"),
Expand All @@ -273,17 +241,13 @@ def init_log_file(local_relative_path: str) -> Path:
)

base_log_folder = conf.get("logging", "base_log_folder")
full_path = Path(base_log_folder, local_relative_path)

_prepare_log_folder(full_path.parent, new_folder_permissions)

try:
full_path.touch(new_file_permissions)
except OSError as e:
log = structlog.get_logger(__name__)
log.warning("OSError while changing ownership of the log file. %s", e)

return full_path
return init_log_file(
base_log_folder,
local_relative_path,
new_folder_permissions=new_folder_permissions,
new_file_permissions=new_file_permissions,
)


def load_remote_log_handler() -> RemoteLogIO | None:
Expand Down