Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Update the log cleaner DAG, and add a Python-based alternative log cleaner DAG #142

Merged
merged 21 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/push_pull_request_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Change dir owner to working user
run: sudo chown -R $USER:$USER $GITHUB_WORKSPACE
- uses: actions/checkout@v2
- name: Build the stack
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ airflow.cfg
airflow.db
logs/
unittests.cfg
!*/util/test_resources/logs/*

# large SI sample data
si_samples_*
104 changes: 104 additions & 0 deletions openverse_catalog/dags/airflow_log_cleanup_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the task logs to avoid those getting too big. By default, this will also
clean child process logs from the 'scheduler' directory.

Can remove all log files by setting "maxLogAgeInDays" to -1.
If you want to test the DAG in the Airflow Web UI, you can also set
enableDelete to `false`, and then you will see a list of log folders
that can be deleted, but will not actually delete them.

This should all go on one line:
```airflow dags trigger --conf
'{"maxLogAgeInDays":-1, "enableDelete": "false"}' airflow_log_cleanup```
--conf options:
maxLogAgeInDays:<INT> - Optional
enableDelete:<BOOLEAN> - Optional
"""
import logging
from datetime import timedelta, datetime

import jinja2
from airflow.configuration import conf
from airflow.models import DAG
from airflow.operators.python import PythonOperator

import util.operator_util as ops
from util import log_cleanup

logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
level=logging.INFO
)

logger = logging.getLogger(__name__)

DAG_ID = 'airflow_log_cleanup'
BASE_LOG_FOLDER = conf.get("logging", "BASE_LOG_FOLDER").rstrip("/")
# Whether the job should delete the logs or not. Included if you want to
# temporarily avoid deleting the logs
DEFAULT_MAX_LOG_AGE_IN_DAYS = 7
ENABLE_DELETE = True
CONCURRENCY = 1
# should we send someone an email when this DAG fails?
ALERT_EMAIL_ADDRESSES = ''
DAG_DEFAULT_ARGS = {
'owner': 'data-eng-admin',
'depends_on_past': False,
'start_date': datetime(2020, 6, 15),
'template_undefined': jinja2.Undefined,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1),
}


def get_log_cleaner_operator(
dag,
base_log_folder,
):
return PythonOperator(
task_id='log_cleaner_operator',
python_callable=log_cleanup.clean_up,
op_args=[
base_log_folder,
"{{ params.get('maxLogAgeInDays') }}",
"{{ params.get('enableDelete') }}"
],
dag=dag
)


def create_dag(
dag_id=DAG_ID,
args=DAG_DEFAULT_ARGS,
concurrency=CONCURRENCY,
max_active_runs=CONCURRENCY,
):
dag = DAG(
dag_id=dag_id,
default_args=args,
concurrency=concurrency,
schedule_interval='@weekly',
max_active_runs=max_active_runs,
# If this was True, airflow would run this DAG in the beginning
# for each day from the start day to now
catchup=False,
# Use the docstring at the top of the file as md docs in the UI
doc_md=__doc__,
)

with dag:
start_task = ops.get_log_operator(dag, dag.dag_id, 'Starting')
run_task = get_log_cleaner_operator(
dag,
BASE_LOG_FOLDER,
)
end_task = ops.get_log_operator(dag, dag.dag_id, 'Finished')

start_task >> run_task >> end_task

return dag


globals()[DAG_ID] = create_dag()
183 changes: 183 additions & 0 deletions openverse_catalog/dags/util/log_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import argparse
import logging
import shutil
from datetime import timedelta, datetime
from pathlib import Path
from typing import List, Union, Dict, Tuple

logging.basicConfig(
format='%(asctime)s: [%(levelname)s - Log cleanup] %(message)s',
level=logging.DEBUG)

logger = logging.getLogger(__name__)

MAX_LOG_AGE_IN_DAYS = 7
ENABLE_DELETE = True


def is_older_than_cutoff(file_or_folder: Path, cutoff: int):
last_modified = file_or_folder.stat().st_mtime
cutoff_time = datetime.now() - timedelta(days=cutoff)

return datetime.fromtimestamp(last_modified) <= cutoff_time


def dir_size_in_mb(dir_paths: Union[List[Path], Path]):
if not isinstance(dir_paths, List):
dir_paths = [dir_paths]
size_in_bytes = sum(sum(f.stat().st_size for f in folder.glob('**/*')
if f.is_file()) for folder in dir_paths)
return size_in_bytes / (1024 * 1024)


def get_folders_to_delete(
dag_log_folder: Path, max_log_age_in_days: int
) -> List[Path]:
"""Returns a list of log folders that are older `than max_log_age_in_days`
The folder structure is as follows:
`{dag_id}/{task_id}/{timestamp}/{try}.log`
This function iterates over all `{timestamp}` folders, detects the
ones that are older than the cutoff, and appends them to the result.
:param dag_log_folder: Log folder for a DAG
:param max_log_age_in_days: Logs that are older than this will be returned
:return: List of old log folders that can be deleted
"""
task_log_folders = [_ for _ in Path.iterdir(dag_log_folder)
if Path.is_dir(_)]
folders_to_delete = []
for task_log_folder in task_log_folders:
run_log_folders_to_delete = [
folder for folder in Path.iterdir(task_log_folder)
if (Path.is_dir(folder)
and is_older_than_cutoff(folder, max_log_age_in_days))
]
folders_to_delete.extend(run_log_folders_to_delete)
return folders_to_delete


def delete_folders(folders_to_delete: List[Path]) -> None:
for dag_log_folder in folders_to_delete:
logger.info(f"Deleting {dag_log_folder}")
shutil.rmtree(dag_log_folder)


def get_params(
log_age: Union[int, str], enable_delete: Union[bool, str], params: Dict
) -> Tuple[int, bool]:
if not isinstance(log_age, int):
log_age_param = params.get('maxLogAgeInDays')
try:
log_age = int(log_age_param)
logger.info(f"Maximum log age overwritten with the dag run "
f"parameter of {log_age_param} days")
except TypeError:
log_age = MAX_LOG_AGE_IN_DAYS
if not isinstance(enable_delete, bool):
enable_delete_param = params.get('enableDelete', ENABLE_DELETE)
if isinstance(enable_delete_param, bool):
enable_delete = enable_delete_param
else:
enable_delete = {
'true': True, 'false': False
}.get(enable_delete_param.lower(), ENABLE_DELETE)

return log_age, enable_delete


def clean_up(
base_log_folder: Union[str, Path],
max_log_age_in_days: Union[int, str],
should_delete: Union[bool, str],
**kwargs,
) -> list[Path]:
"""Finds all log folders that were modified more than
`max_log_age_in_days` days ago, and
deletes them, if `should_delete` is True, or
logs them, if `should_delete` is False.

:param base_log_folder: the folder in which dag log folders
are located.
:param max_log_age_in_days: Logs older than this number of
days will be cleaned up. Can be set manually, or be 'None',
in which case the default value is used.
:param should_delete: Will delete the old log folders if True, and
only log the folders that need to be deleted if set to False.
Can be set manually, or be 'None', in which case the default value
is used.
"""
log_base = Path(base_log_folder)

max_log_age_in_days, should_delete = get_params(
max_log_age_in_days, should_delete, kwargs.get('params', {})
)
logger.info(f"Cleaning up log files, using parameters:"
f"\nBASE_LOG_FOLDER: {base_log_folder}"
f"\nMAX_LOG_AGE_IN_DAYS: {max_log_age_in_days}"
f"\nENABLE_DELETE: {should_delete}")
log_folders = [item for item in Path.iterdir(log_base)
if Path.is_dir(item)]
size_before = dir_size_in_mb(log_base)
folders_to_delete = []
for dag_log_folder in log_folders:
if dag_log_folder.name == 'dag_processor_manager':
continue
elif dag_log_folder.name == 'scheduler':
# Scheduler creates a folder for each date, and keeps
# schedule logs for each dag in a separate file
scheduler_log_folders_to_delete = [
date_log_dir
for date_log_dir in Path.iterdir(dag_log_folder)
if (Path.is_dir(date_log_dir)
and not Path.is_symlink(date_log_dir)
and is_older_than_cutoff(
date_log_dir, max_log_age_in_days
))
]
folders_to_delete.extend(scheduler_log_folders_to_delete)
else:
task_log_folders_to_delete = get_folders_to_delete(
dag_log_folder, max_log_age_in_days
)
folders_to_delete.extend(task_log_folders_to_delete)
size_to_delete = dir_size_in_mb(folders_to_delete)
if should_delete:
delete_folders(folders_to_delete)
size_after = dir_size_in_mb(log_base)
logger.info(f"Deleted {size_to_delete:.2f}MB in "
f"{len(folders_to_delete)} folders\n"
f"Log directory size before: {size_before:.2f}MB, "
f"after: {size_after:.2f} MB")
else:
logger.info(f"Found {len(folders_to_delete)} log folders to delete: "
f"{[str(folder) for folder in folders_to_delete]}"
f"\nRun this DAG with ENABLE_DELETE set to True "
f"to free {size_to_delete:.2f} MB.")
return folders_to_delete


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Log Cleanup Job',
add_help=True
)
parser.add_argument(
'--maxLogAgeInDays',
help='Logs older than maxLogAgeInDays days will be deleted. '
'Default is 7.')
parser.add_argument(
'--enableDelete',
help='Will only log the folders that need to be deleted if '
'set to False. Default is True'
)
args = parser.parse_args()
if args.maxLogAgeInDays:
log_age_in_days_arg = args.maxLogAgeInDays
else:
log_age_in_days_arg = 7
if args.enableDelete:
enable_delete = args.enableDelete
else:
enable_delete = True
log_folder = Path(__file__).parent / 'test_resources' / 'logs'
Path.mkdir(log_folder, parents=True, exist_ok=True)
clean_up(log_folder, log_age_in_days_arg, enable_delete)
83 changes: 83 additions & 0 deletions openverse_catalog/dags/util/test_log_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from datetime import datetime, timedelta
from pathlib import Path

from util import log_cleanup

TEST_LOGS_FOLDER = Path(__file__).parent / "test_resources" / 'logs'
# Total number of logs in the `logs_folder` created
INITIAL_LOG_FILE_COUNT = 13
# Number of logs folders in the `test_resources/logs_folder`
# that are older than August 20
OLD_LOG_FOLDER_COUNT = 2
# 1 log file (`dag_process_manager.log`) is not deleted
NON_DELETED_FILE_COUNT = 1
ENABLE_DELETE = False

OLD_TIMESTAMP = datetime.fromisoformat('2021-08-10')
RECENT_TIMESTAMP = datetime.fromisoformat('2021-08-20')

logs_folder = Path(__file__).parent / 'test_resources' / 'logs'


def is_older_than_cutoff_by_name(file_or_folder: Path, cutoff: int):
fname = file_or_folder.name
last_modified_dt = datetime.fromisoformat(fname[:10])
last_modified = last_modified_dt.timestamp()
cutoff_time = datetime.now() - timedelta(days=cutoff)
return datetime.fromtimestamp(last_modified) <= cutoff_time


def calculate_cutoffs():
NOW = datetime.now()
one_day_before = OLD_TIMESTAMP - timedelta(days=1)
one_day_after = OLD_TIMESTAMP + timedelta(days=1)
delta_before = (NOW - one_day_before).days
delta_after = (NOW - one_day_after).days
return delta_before, delta_after


# Normally, the age of log file or folder is detected using the
# system modification date. However, it is difficult to do that
# in CI, so in tests, the log folder name (which is a timestamp)
# is used.
log_cleanup.is_older_than_cutoff = is_older_than_cutoff_by_name
cutoffs_in_days = calculate_cutoffs()


def test_log_cleaner_leaves_new_files():
""" If all the log files are newer than the maxLogAgeInDays,
no log files are deleted"""
log_files_count = len(list(Path.glob(logs_folder, '**/*.log')))
assert log_files_count == INITIAL_LOG_FILE_COUNT

deleted_folders = log_cleanup.clean_up(logs_folder, cutoffs_in_days[0], ENABLE_DELETE)
deleted_count = len(deleted_folders)
expected_count = 0

assert deleted_count == expected_count


def test_log_cleaner_deletes_only_old_files():
"""Log cleaner deletes all the log files that are older than
maxLogAgeInDays, but leaves the files that are newer"""
deleted_folders = log_cleanup.clean_up(logs_folder, cutoffs_in_days[1], ENABLE_DELETE)
deleted_count = len(deleted_folders)

expected_log_count = OLD_LOG_FOLDER_COUNT

print(f"Deleted folders: {deleted_folders}\ncutoff: {cutoffs_in_days[1]}")
assert deleted_count == expected_log_count


def test_log_cleaner_deletes_all_but_one_files_if_max_is_minus_1():
"""If maxLogAgeInDays is set to -1, all log files except for
`dag_processor_manager.log` are deleted
Need to find out if the `dag_processor_manager.log` is recreated every day,
or it just uses single log file for all time.
"""

deleted_folders = log_cleanup.clean_up(logs_folder, -1, ENABLE_DELETE)
deleted_folder_count = len(deleted_folders)
expected_count = 4

assert deleted_folder_count == expected_count
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Dags processor manager log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Test log 1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Test log 2
Loading