Skip to content

Scheduler crash when clearing a dagrun with conf as null due to offline migration #56707

@tirkarthi

Description

@tirkarthi

Apache Airflow version

main (development)

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

As part of Airflow 3 migration using offline sql we resorted to setting conf as null since it was not required. We noticed that clearing the dagruns migrated with conf as null causes the scheduler to crash since None is not a valid type for conf. The column is nullable and other datamodels accept None.

None not accepted

conf: Annotated[dict[str, Any], Field(default_factory=dict)]

What you think should happen instead?

No response

How to reproduce

  1. Trigger an example dag like example_callback .
  2. Set the conf as null in db since the column is nullable .
sqlite> update dag_run set conf=NULL where dag_id = "example_callback";
sqlite> select * from dag_run where dag_id = "example_callback";
                      id = 8
                  dag_id = example_callback
               queued_at = 2025-10-16 08:02:14.770940
            logical_date = 2025-10-16 08:02:13.000000
              start_date = 2025-10-16 08:02:15.161386
                end_date = 2025-10-16 08:02:18.564532
                   state = success
                  run_id = manual__2025-10-16T08:02:13+00:00
         creating_job_id = 
                run_type = manual
            triggered_by = UI
    triggering_user_name = admin
                    conf = 
     data_interval_start = 2025-10-16 08:02:13.000000
       data_interval_end = 2025-10-16 08:02:13.000000
               run_after = 2025-10-16 08:02:13.000000
last_scheduling_decision = 2025-10-16 08:02:18.561214
         log_template_id = 1
              updated_at = 2025-10-16 08:02:18.565788
            clear_number = 0
             backfill_id = 
          bundle_version = 
     scheduled_by_job_id = 15
         context_carrier = {"__var": {}, "__type": "dict"}
             span_status = ended
  created_dag_version_id = 01999970192a77c383c76f3b7ac653f1
[2025-10-16T08:04:05.714515Z] {local_executor.py:226} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2025-10-16T08:04:05.737568Z] {scheduler_job_runner.py:1068} INFO - Exited execute loop
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 52, in scheduler
    run_command_with_daemon_option(
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", line 368, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", line 397, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1052, in _execute
    self._run_scheduler_loop()
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1342, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1452, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/retries.py", line 97, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 445, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 378, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 400, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/retries.py", line 106, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1860, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1860, in <listcomp>
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1976, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/dagrun.py", line 1197, in update_state
    context_from_server=DagRunContext(
                        ^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/pydantic/main.py", line 253, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 1 validation error for DagRunContext
dag_run.conf
  Input should be a valid dictionary [type=dict_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.11/v/dict_type
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [10591]

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions