fix(session-db): advance flush cursor per-message to prevent duplicate writes on partial failure#19407
Closed
Bartok9 wants to merge 2 commits intoNousResearch:mainfrom
Closed
Conversation
…e writes on partial failure
`_flush_messages_to_session_db` iterates `messages[flush_from:]` and calls
`SessionDB.append_message` for each row, then sets
`self._last_flushed_db_idx = len(messages)` AFTER the loop completes. If
any individual `append_message` raises mid-loop (typical triggers: SQLite
"database is locked" from concurrent Hermes processes sharing the same
state.db, transient disk-full, schema-evolution race), control jumps to
the broad `except Exception` clause without the cursor having advanced.
The next flush — usually called from the next exit path of the same
agent run a few hundred ms later, since `_persist_session` fans out to
multiple call sites — re-evaluates `flush_from = max(start_idx,
self._last_flushed_db_idx)`, gets the original (un-advanced) value, and
re-writes the rows that DID commit before the failure. The user observes
their transcript silently growing duplicates: each user/assistant pair
appears twice, FTS5 surfaces the same message in search results twice,
and `message_count` on the session row drifts to 2× the true
conversation length. Compounds across runs because every retry doubles
the unflushed window again.
The fix is mechanical: advance `self._last_flushed_db_idx = flush_from + i + 1`
inside the loop immediately after each successful `append_message` call,
and remove the now-redundant post-loop assignment. With per-row
advancement, a mid-loop failure on row N+1 leaves the cursor at N, so
the next flush correctly skips rows 0..N and only re-attempts N+1
onward — no duplicates of the rows that succeeded.
Equivalent to the old behavior in the success path: when the loop
completes normally, the final per-iter assignment writes
`flush_from + (len(messages[flush_from:]) - 1) + 1 = len(messages)`,
matching the prior post-loop value.
Test coverage: new
`tests/run_agent/test_860_dedup.py::TestFlushDeduplication
::test_flush_advances_cursor_per_message_on_partial_failure` builds a
5-message conversation, monkey-patches `db.append_message` to raise
`sqlite3.OperationalError("database is locked")` on the 3rd invocation,
calls flush, then asserts:
1. The first 2 messages were committed (cursor at 2, not 0).
2. After the broken provider is replaced and flush re-runs with the
same message list, the 3rd message onward gets written exactly once.
3. Total session rows == len(messages) — no duplicates of rows 1 and 2.
4. Row order/content match the original send order.
Verified bug repro: against an unmodified upstream `run_agent.py` the
new test fails at the cursor assertion ("got 0, expected 2"). With the
fix applied, all 4 existing dedup tests + 1 new regression test pass,
plus 21 adjacent compression/persistence tests pass with no
regressions.
Targeted run:
`pytest tests/run_agent/test_860_dedup.py
tests/run_agent/test_compression_persistence.py
tests/run_agent/test_413_compression.py
tests/run_agent/test_compress_focus_plugin_fallback.py -o addopts=''`
→ **31 passed**.
Fixes NousResearch#12563
Co-authored-by: Cursor <cursoragent@cursor.com>
Collaborator
|
Related to PR #17146 (flush cursor recovery from persisted prefix) — both address SessionDB flush cursor bugs but from different failure modes. |
1 similar comment
Collaborator
|
Related to PR #17146 (flush cursor recovery from persisted prefix) — both address SessionDB flush cursor bugs but from different failure modes. |
The original test tried to instantiate AIAgent without the required patches (get_tool_definitions, check_toolset_requirements, OpenAI) which caused CI failures. Rewrote using the same fixture pattern as test_413_compression.py and replaced SessionDB dependency with a MagicMock so tests don't depend on internal DB schema. Covers the same scenarios including the NousResearch#12563 partial-failure cursor regression test.
Contributor
Author
|
Changes present in upstream main after rebase. Closing as resolved. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
_flush_messages_to_session_db(run_agent.py) iteratesmessages[flush_from:]and callsSessionDB.append_messagefor each row, then setsself._last_flushed_db_idx = len(messages)after the loop completes. If any individualappend_messageraises mid-loop — typical triggers: SQLite "database is locked" from concurrent Hermes processes sharing the samestate.db, transient disk-full, schema-evolution race — control jumps to the broadexcept Exceptionclause without the cursor having advanced.The next flush — usually called from the next exit path of the same agent run a few hundred ms later, since
_persist_sessionfans out to multiple call sites — re-evaluatesflush_from = max(start_idx, self._last_flushed_db_idx), gets the original (un-advanced) value, and re-writes the rows that DID commit before the failure. The user observes their transcript silently growing duplicates: each user/assistant pair appears twice, FTS5 surfaces the same message in search results twice, andmessage_counton the session row drifts to 2× the true conversation length. Compounds across runs because every retry doubles the unflushed window again.What changed
The fix is mechanical: advance
self._last_flushed_db_idx = flush_from + i + 1inside the loop immediately after each successfulappend_messagecall, and remove the now-redundant post-loop assignment. With per-row advancement, a mid-loop failure on row N+1 leaves the cursor at N, so the next flush correctly skips rows 0..N and only re-attempts N+1 onward — no duplicates of the rows that succeeded.Equivalent to the prior behaviour in the success path: when the loop completes normally, the final per-iter assignment writes
flush_from + (len(messages[flush_from:]) - 1) + 1 = len(messages), matching the old post-loop value. The 4 existing dedup tests confirm this — they all still pass unchanged.Test coverage
New
tests/run_agent/test_860_dedup.py::TestFlushDeduplication::test_flush_advances_cursor_per_message_on_partial_failurebuilds a 5-message conversation, monkey-patchesdb.append_messageto raisesqlite3.OperationalError(\"database is locked\")on the 3rd invocation, calls flush, then asserts:Verified bug repro: against an unmodified upstream
run_agent.pythe new test fails at the cursor assertion ("got 0, expected 2"). With the fix applied, all 4 existing dedup tests + 1 new regression test pass, plus 21 adjacent compression/persistence tests pass with no regressions.Targeted run:
```
pytest tests/run_agent/test_860_dedup.py \
tests/run_agent/test_compression_persistence.py \
tests/run_agent/test_413_compression.py \
tests/run_agent/test_compress_focus_plugin_fallback.py \
-o addopts=''
```
→ 31 passed.
Fixes #12563
Made with Cursor