Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1434,15 +1434,13 @@ impl LogServer {
Status::new(err.code().into(), format!("Failed to load cursor: {}", err))
})?;
// This is the existing compaction_offset, which is the next record to compact.
let offset = witness
.map(|x| x.cursor.position)
.unwrap_or(LogPosition::from_offset(1));
tracing::event!(Level::INFO, offset = ?offset);
let cursor = witness.map(|x| x.cursor.position);
tracing::event!(Level::INFO, offset = ?cursor);
wal3::copy(
&storage,
&options,
&log_reader,
offset,
cursor.unwrap_or(LogPosition::from_offset(1)),
target_prefix.clone(),
)
.await
Expand All @@ -1463,34 +1461,47 @@ impl LogServer {
})?
.ok_or_else(|| Status::internal("Unable to find copied manifest"))?;
let first_copied_offset = new_manifest.oldest_timestamp();
if offset < first_copied_offset {
return Err(Status::internal(format!(
"Compaction cursor [{offset:?}] is behind start of manifest [{first_copied_offset:?}]",
)));
}
// This is the next record to insert, so we'll have to adjust downwards.
let max_offset = new_manifest.next_write_timestamp();
if max_offset < offset {
return Err(Status::new(
chroma_error::ErrorCodes::Internal.into(),
format!("max_offset={:?} < offset={:?}", max_offset, offset),
));
if let Some(cursor) = cursor {
if cursor < first_copied_offset {
return Err(Status::internal(format!(
"Compaction cursor {} is behind start of manifest {}",
cursor.offset(),
first_copied_offset.offset()
)));
}
if max_offset < cursor {
return Err(Status::new(
chroma_error::ErrorCodes::Internal.into(),
format!(
"Compaction cursor {} is after end of manifest {}",
cursor.offset(),
max_offset.offset()
),
));
}
}
if offset != max_offset {

let cursor = cursor.unwrap_or(LogPosition::from_offset(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

For clarity and to avoid potential confusion from variable shadowing, consider using a different name for the unwrapped cursor. The variable cursor is used as an Option<LogPosition>, then shadowed inside the if let as a LogPosition, and then shadowed again here as a LogPosition. Using a new name like effective_cursor would make the type change explicit and the code easier to follow.

For example:

        let effective_cursor = cursor.unwrap_or(LogPosition::from_offset(1));
        if effective_cursor != max_offset {
            let mark_dirty = MarkDirty {
                collection_id: target_collection_id,
                dirty_log: self.dirty_log.clone(),
            };
            let _ = mark_dirty
                .mark_dirty(effective_cursor, (max_offset - effective_cursor) as usize)
                .await;
        }

        let compaction_offset = (effective_cursor - 1u64).offset();
        let enumeration_offset = (max_offset - 1u64).offset();
        tracing::event!(Level::INFO, compaction_offset, enumeration_offset);
Context for Agents
[**BestPractice**]

For clarity and to avoid potential confusion from variable shadowing, consider using a different name for the unwrapped cursor. The variable `cursor` is used as an `Option<LogPosition>`, then shadowed inside the `if let` as a `LogPosition`, and then shadowed again here as a `LogPosition`. Using a new name like `effective_cursor` would make the type change explicit and the code easier to follow.

For example:
```rust
        let effective_cursor = cursor.unwrap_or(LogPosition::from_offset(1));
        if effective_cursor != max_offset {
            let mark_dirty = MarkDirty {
                collection_id: target_collection_id,
                dirty_log: self.dirty_log.clone(),
            };
            let _ = mark_dirty
                .mark_dirty(effective_cursor, (max_offset - effective_cursor) as usize)
                .await;
        }

        let compaction_offset = (effective_cursor - 1u64).offset();
        let enumeration_offset = (max_offset - 1u64).offset();
        tracing::event!(Level::INFO, compaction_offset, enumeration_offset);
```

File: rust/log-service/src/lib.rs
Line: 1486

if cursor != max_offset {
let mark_dirty = MarkDirty {
collection_id: target_collection_id,
dirty_log: self.dirty_log.clone(),
};
let _ = mark_dirty
.mark_dirty(offset, (max_offset - offset) as usize)
.mark_dirty(cursor, (max_offset - cursor) as usize)
.await;
}
tracing::event!(Level::INFO, compaction_offset =? offset.offset() - 1, enumeration_offset =? (max_offset - 1u64).offset());

let compaction_offset = (cursor - 1u64).offset();
let enumeration_offset = (max_offset - 1u64).offset();
tracing::event!(Level::INFO, compaction_offset, enumeration_offset);
Ok(Response::new(ForkLogsResponse {
// NOTE: The upstream service expects the last compacted offset as compaction offset
compaction_offset: (offset - 1u64).offset(),
compaction_offset,
// NOTE: The upstream service expects the last uncompacted offset as enumeration offset
enumeration_offset: (max_offset - 1u64).offset(),
enumeration_offset,
}))
}

Expand Down
Loading