Airflow Log Maintenance

(30. 10. 2021) Apache Airflow

Running Airflow in a production environment always ends with one expected problem, caused by intensive task logging: Disk full of logs. There is a common ‘admin’ approach to this - running a dedicated process which recursively checks log folders for old files. If you ever had tried this, you would have noticed how much time it takes, just to find these files. To mitigate this Airflow offers quite an elegant solution: Utilizing Airflow models to query old items and convention for naming local logs described here: Logging Tasks

If you check description of TaskInstance model all required attributes for naming convention are already there:

{dag_id}/{task_id}/{logical_date}/{try_number}.log

for version >= 2.2.0 or

{dag_id}/{task_id}/{execution_date}/{try_number}.log

for versions up to 2.1.4

Default log folder is /opt/airflow/logs where logs are stored in subfolders following this naming convention. Double check your folder and notice logical or execution date format. It ends with 00:00 so this is clearly isoformat.

Let's put this together and build maintenace DAG (for 2.1.4 version):

 
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import TaskInstance
from airflow import settings
import logging
import shutil

args = {
    'owner': 'airflow',
}


LOG_DIR = '/opt/airflow/logs'
MAX_DAYS = 50


def old_logs_delete(ti, **kwargs):
    session = settings.Session()
    for task in session.query(TaskInstance).filter(TaskInstance.execution_date < days_ago(MAX_DAYS)):
        log_dir = f"{LOG_DIR}/{task.dag_id}/{task.task_id}/{task.execution_date.isoformat()}"
        logging.info(f"Deleteting {log_dir}")
        try:
            shutil.rmtree(log_dir)
            logging.info(f"Deleted directory and log contents: {log_dir}")
        except OSError as e:
            logging.info(f"Unable to delete: {e.filename} - {e.strerror}")


with DAG(
    dag_id='airflow-maintenance',
    concurrency=1,
    default_args=args,
    schedule_interval='0 22 * * *',
    start_date=days_ago(2),
    tags=['airflow', 'maintenance'],
    catchup=False,
) as dag:

    dag.doc_md = __doc__

    t1 = PythonOperator(
        task_id="old_logs_delete",
        python_callable=old_logs_delete
    )


if __name__ == "__main__":
    dag.cli()

Volné pozice

O autorovi

Ivan Ignáth BI - Team leader