Skip to content

Commit 285a4d8

Browse files
authored
[ENH] Update fork log validation (#5778)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - Update the fork log validation logic. We only validate that the compaction cursor is in the range of copied logs if it exists. Previously, we enforce this validation by assuming the compaction cursor is 1, which is not likely the case. - New functionality - N/A ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent d007e73 commit 285a4d8

File tree

1 file changed

+31
-20
lines changed

1 file changed

+31
-20
lines changed

rust/log-service/src/lib.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,15 +1434,13 @@ impl LogServer {
14341434
Status::new(err.code().into(), format!("Failed to load cursor: {}", err))
14351435
})?;
14361436
// This is the existing compaction_offset, which is the next record to compact.
1437-
let offset = witness
1438-
.map(|x| x.cursor.position)
1439-
.unwrap_or(LogPosition::from_offset(1));
1440-
tracing::event!(Level::INFO, offset = ?offset);
1437+
let cursor = witness.map(|x| x.cursor.position);
1438+
tracing::event!(Level::INFO, offset = ?cursor);
14411439
wal3::copy(
14421440
&storage,
14431441
&options,
14441442
&log_reader,
1445-
offset,
1443+
cursor.unwrap_or(LogPosition::from_offset(1)),
14461444
target_prefix.clone(),
14471445
)
14481446
.await
@@ -1463,34 +1461,47 @@ impl LogServer {
14631461
})?
14641462
.ok_or_else(|| Status::internal("Unable to find copied manifest"))?;
14651463
let first_copied_offset = new_manifest.oldest_timestamp();
1466-
if offset < first_copied_offset {
1467-
return Err(Status::internal(format!(
1468-
"Compaction cursor [{offset:?}] is behind start of manifest [{first_copied_offset:?}]",
1469-
)));
1470-
}
14711464
// This is the next record to insert, so we'll have to adjust downwards.
14721465
let max_offset = new_manifest.next_write_timestamp();
1473-
if max_offset < offset {
1474-
return Err(Status::new(
1475-
chroma_error::ErrorCodes::Internal.into(),
1476-
format!("max_offset={:?} < offset={:?}", max_offset, offset),
1477-
));
1466+
if let Some(cursor) = cursor {
1467+
if cursor < first_copied_offset {
1468+
return Err(Status::internal(format!(
1469+
"Compaction cursor {} is behind start of manifest {}",
1470+
cursor.offset(),
1471+
first_copied_offset.offset()
1472+
)));
1473+
}
1474+
if max_offset < cursor {
1475+
return Err(Status::new(
1476+
chroma_error::ErrorCodes::Internal.into(),
1477+
format!(
1478+
"Compaction cursor {} is after end of manifest {}",
1479+
cursor.offset(),
1480+
max_offset.offset()
1481+
),
1482+
));
1483+
}
14781484
}
1479-
if offset != max_offset {
1485+
1486+
let cursor = cursor.unwrap_or(LogPosition::from_offset(1));
1487+
if cursor != max_offset {
14801488
let mark_dirty = MarkDirty {
14811489
collection_id: target_collection_id,
14821490
dirty_log: self.dirty_log.clone(),
14831491
};
14841492
let _ = mark_dirty
1485-
.mark_dirty(offset, (max_offset - offset) as usize)
1493+
.mark_dirty(cursor, (max_offset - cursor) as usize)
14861494
.await;
14871495
}
1488-
tracing::event!(Level::INFO, compaction_offset =? offset.offset() - 1, enumeration_offset =? (max_offset - 1u64).offset());
1496+
1497+
let compaction_offset = (cursor - 1u64).offset();
1498+
let enumeration_offset = (max_offset - 1u64).offset();
1499+
tracing::event!(Level::INFO, compaction_offset, enumeration_offset);
14891500
Ok(Response::new(ForkLogsResponse {
14901501
// NOTE: The upstream service expects the last compacted offset as compaction offset
1491-
compaction_offset: (offset - 1u64).offset(),
1502+
compaction_offset,
14921503
// NOTE: The upstream service expects the last uncompacted offset as enumeration offset
1493-
enumeration_offset: (max_offset - 1u64).offset(),
1504+
enumeration_offset,
14941505
}))
14951506
}
14961507

0 commit comments

Comments
 (0)