diff --git a/airflow-core/src/airflow/utils/log/log_reader.py b/airflow-core/src/airflow/utils/log/log_reader.py index 6b596b7d621ff..c238c7d4d4d09 100644 --- a/airflow-core/src/airflow/utils/log/log_reader.py +++ b/airflow-core/src/airflow/utils/log/log_reader.py @@ -147,7 +147,8 @@ def read_log_stream( empty_iterations += 1 if empty_iterations >= self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS: # we have not received any logs for a while, so we stop the stream - yield "(Log stream stopped - End of log marker not found; logs may be incomplete.)\n" + # this is emitted as json to avoid breaking the ndjson stream format + yield '{"event": "Log stream stopped - End of log marker not found; logs may be incomplete."}\n' return else: # https://mypy.readthedocs.io/en/stable/typed_dict.html#supported-operations diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py b/airflow-core/tests/unit/utils/log/test_log_reader.py index 1818e1b5d6d1d..c32635d59df6a 100644 --- a/airflow-core/tests/unit/utils/log/test_log_reader.py +++ b/airflow-core/tests/unit/utils/log/test_log_reader.py @@ -268,7 +268,7 @@ def test_read_log_stream_no_end_of_log_marker(self, mock_read): log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={}) assert list(log_stream) == [ '{"timestamp":null,"event":"hello"}\n', - "(Log stream stopped - End of log marker not found; logs may be incomplete.)\n", + '{"event": "Log stream stopped - End of log marker not found; logs may be incomplete."}\n', ] assert mock_read.call_count == 11