Running Airflow in a production environment always ends with one expected problem, caused by intensive task logging: Disk full of logs. There is a common ‘admin’ approach to this - running a dedicated process which recursively checks log folders for old files. If you ever had tried this, you would have noticed how much time it takes, just to find these files. To mitigate this Airflow offers quite an elegant solution: Utilizing Airflow models to query old items and convention for naming local logs described here: Logging Tasks
If you check description of TaskInstance model all required attributes for naming convention are already there:
{dag_id}/{task_id}/{logical_date}/{try_number}.log
for version >= 2.2.0 or
{dag_id}/{task_id}/{execution_date}/{try_number}.log
for versions up to 2.1.4
Default log folder is /opt/airflow/logs where logs are stored in subfolders following this naming convention. Double check your folder and notice logical or execution date format. It ends with 00:00 so this is clearly isoformat.
Let's put this together and build maintenace DAG (for 2.1.4 version):
from datetime import timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago from airflow.models import TaskInstance from airflow import settings import logging import shutil args = { 'owner': 'airflow', } LOG_DIR = '/opt/airflow/logs' MAX_DAYS = 50 def old_logs_delete(ti, **kwargs): session = settings.Session() for task in session.query(TaskInstance).filter(TaskInstance.execution_date < days_ago(MAX_DAYS)): log_dir = f"{LOG_DIR}/{task.dag_id}/{task.task_id}/{task.execution_date.isoformat()}" logging.info(f"Deleteting {log_dir}") try: shutil.rmtree(log_dir) logging.info(f"Deleted directory and log contents: {log_dir}") except OSError as e: logging.info(f"Unable to delete: {e.filename} - {e.strerror}") with DAG( dag_id='airflow-maintenance', concurrency=1, default_args=args, schedule_interval='0 22 * * *', start_date=days_ago(2), tags=['airflow', 'maintenance'], catchup=False, ) as dag: dag.doc_md = __doc__ t1 = PythonOperator( task_id="old_logs_delete", python_callable=old_logs_delete ) if __name__ == "__main__": dag.cli()