Skip to content

Commit ddef2fb

Browse files
authored
Update airflow_db_cleanup.py to also clean sessions table (#10775)
* Update airflow_db_cleanup.py to also clean sessions table * Update airflow_db_cleanup.py
1 parent ef6bca5 commit ddef2fb

1 file changed

Lines changed: 27 additions & 1 deletion

File tree

composer/workflows/airflow_db_cleanup.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
from airflow.version import version as airflow_version
6969

7070
import dateutil.parser
71-
from sqlalchemy import and_, func
71+
from sqlalchemy import and_, func, text
7272
from sqlalchemy.exc import ProgrammingError
7373
from sqlalchemy.orm import load_only
7474

@@ -461,16 +461,42 @@ def cleanup_function(**context):
461461
session.close()
462462

463463

464+
def cleanup_sessions():
465+
session = settings.Session()
466+
467+
try:
468+
logging.info("Deleting sessions...")
469+
before = len(session.execute(text("SELECT * FROM session WHERE expiry > now()::timestamp(0);")).mappings().all())
470+
session.execute(text("DELETE FROM session WHERE expiry > now()::timestamp(0);"))
471+
after = len(session.execute(text("SELECT * FROM session WHERE expiry > now()::timestamp(0);")).mappings().all())
472+
logging.info("Deleted {} expired sessions.".format(before-after))
473+
except Exception as e:
474+
logging.error(e)
475+
476+
session.commit()
477+
session.close()
478+
479+
464480
def analyze_db():
465481
session = settings.Session()
466482
session.execute("ANALYZE")
467483
session.commit()
484+
session.close()
468485

469486

470487
analyze_op = PythonOperator(
471488
task_id="analyze_query", python_callable=analyze_db, provide_context=True, dag=dag
472489
)
473490

491+
cleanup_session_op = PythonOperator(
492+
task_id="cleanup_sessions",
493+
python_callable=cleanup_sessions,
494+
provide_context=True,
495+
dag=dag
496+
)
497+
498+
cleanup_session_op.set_downstream(analyze_op)
499+
474500
for db_object in DATABASE_OBJECTS:
475501
cleanup_op = PythonOperator(
476502
task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),

0 commit comments

Comments
 (0)