Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions rust/worker/src/execution/operators/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct RegisterInput {
log: Log,
schema: Option<Schema>,
attached_function_context: Option<AttachedFunctionContext>,
/// Input collection's pulled log offset (used for attached function completion_offset calculation)
input_pulled_log_offset: i64,
}

impl RegisterInput {
Expand All @@ -68,6 +70,7 @@ impl RegisterInput {
log: Log,
schema: Option<Schema>,
attached_function_context: Option<AttachedFunctionContext>,
input_pulled_log_offset: i64,
) -> Self {
RegisterInput {
tenant,
Expand All @@ -81,6 +84,7 @@ impl RegisterInput {
log,
schema,
attached_function_context,
input_pulled_log_offset,
}
}
}
Expand Down Expand Up @@ -145,15 +149,11 @@ impl Operator<RegisterInput, RegisterOutput> for RegisterOperator {
)
})?;

// log_position is "up to which offset we've compacted"
// input_pulled_log_offset is "up to which offset we've compacted from INPUT collection"
// completion_offset is "last offset processed"
// In practice, log_position means "next offset to start compacting from"
// So to get "last offset processed", we subtract 1
let last_offset_processed = if input.log_position > 0 {
(input.log_position - 1).max(0) as u64
} else {
0u64
};
// In practice, input_pulled_log_offset means "next offset to start compacting from"
// So to get "last offset processed"/"completion_offset", we subtract 1
let last_offset_processed = (input.input_pulled_log_offset - 1).max(0) as u64;
let attach_function_update = chroma_types::AttachedFunctionUpdateInfo {
attached_function_id: attached_function.id,
attached_function_run_nonce: attached_function_context.execution_nonce.0,
Expand Down Expand Up @@ -338,8 +338,9 @@ mod tests {
size_bytes_post_compaction,
sysdb.clone(),
log.clone(),
None, // schema
None, // attached_function_context
None, // schema
None, // attached_function_context
log_position, // input_pulled_log_offset (same as log_position for non-task compaction)
);

let result = operator.run(&input).await;
Expand Down
37 changes: 28 additions & 9 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ pub struct CompactOrchestrator {
input_collection_id: CollectionUuid,
input_collection: OnceCell<Collection>,
input_segments: OnceCell<Vec<Segment>>,
/// How much to pull from fetch_logs for INPUT collection
pulled_log_offset: i64,
input_pulled_log_offset: i64,

// === Output Collection (write compacted data to) ===
/// Collection to write compacted segments to
output_collection_id: OnceCell<CollectionUuid>,
output_collection: OnceCell<Collection>,
output_segments: OnceCell<Vec<Segment>>,
output_pulled_log_offset: i64,

// === Writers & Results ===
writers: OnceCell<CompactWriters>,
Expand Down Expand Up @@ -366,10 +366,11 @@ impl CompactOrchestrator {
input_collection_id,
input_collection: OnceCell::new(),
input_segments: OnceCell::new(),
pulled_log_offset: 0,
input_pulled_log_offset: 0,
output_collection_id: output_collection_cell,
output_collection: OnceCell::new(),
output_segments: OnceCell::new(),
output_pulled_log_offset: 0,
writers: OnceCell::new(),
flush_results: Vec::new(),
result_channel,
Expand Down Expand Up @@ -742,7 +743,7 @@ impl CompactOrchestrator {
let input = RegisterInput::new(
collection.tenant,
collection.collection_id,
self.pulled_log_offset,
self.output_pulled_log_offset,
collection.version,
self.flush_results.clone().into(),
self.total_records_post_compaction,
Expand All @@ -751,6 +752,7 @@ impl CompactOrchestrator {
self.log.clone(),
self.schema.clone(),
self.attached_function_context.clone(),
self.input_pulled_log_offset,
);

let task = wrap(
Expand Down Expand Up @@ -873,6 +875,17 @@ impl CompactOrchestrator {
))
}

/// Set input_pulled_log_offset to the given position.
/// For regular compaction (input == output), also updates output_pulled_log_offset.
/// For task compaction (input != output), output collection keeps its own log position.
fn set_input_log_offset(&mut self, log_offset: i64) {
self.input_pulled_log_offset = log_offset;
// Only update output offset if input and output are the same collection
if Some(self.input_collection_id) == self.output_collection_id.get().copied() {
self.output_pulled_log_offset = log_offset;
}
}

/// Get output_segments or return error
fn get_output_segments(&self) -> Result<Vec<Segment>, CompactionError> {
self.output_segments
Expand Down Expand Up @@ -1120,6 +1133,9 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
return;
}

// Initialize output_pulled_log_offset from OUTPUT collection's log position
self.output_pulled_log_offset = output_collection.log_position;

// Create output segments vec from individual segment fields
let output_segments = vec![
output.output.metadata_segment.clone(),
Expand Down Expand Up @@ -1166,8 +1182,8 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
input_collection.log_position = result;
}

// Set pulled_log_offset from INPUT collection's log position
self.pulled_log_offset = input_collection.log_position;
// Initialize input_pulled_log_offset from INPUT collection's log position (last compacted offset)
self.input_pulled_log_offset = input_collection.log_position;

// Create record reader from INPUT segments (for reading existing data)
let input_record_reader = match self
Expand Down Expand Up @@ -1440,8 +1456,11 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for CompactOrchestrator
tracing::info!("Pulled Records: {}", output.len());
match output.iter().last() {
Some((rec, _)) => {
self.pulled_log_offset = rec.log_offset;
tracing::info!("Pulled Logs Up To Offset: {:?}", self.pulled_log_offset);
self.set_input_log_offset(rec.log_offset);
tracing::info!(
"Pulled Logs Up To Offset: {:?}",
self.input_pulled_log_offset
);
}
None => {
tracing::warn!("No logs were pulled from the log service, this can happen when the log compaction offset is behing the sysdb.");
Expand Down Expand Up @@ -1542,7 +1561,7 @@ impl Handler<TaskResult<SourceRecordSegmentOutput, SourceRecordSegmentError>>
Some(collection) => collection,
None => return,
};
self.pulled_log_offset = input_collection.log_position;
self.set_input_log_offset(input_collection.log_position);
self.do_attached_function(output, ctx).await;
} else {
self.partition(output, ctx).await;
Expand Down
Loading