Skip to content

Commit 12db1ea

Browse files
authored
[BUG]: Fix offset flushed to output collection after fn invocation (#5770)
## Description of changes _Summarize the changes made by this PR._ We were flushing the input collection's pulled offset to the output collection's compacted offset field during function invocation. This change fixes that. - Improvements & Bug fixes - ^^^ - New functionality - ... ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 5ca19f7 commit 12db1ea

File tree

2 files changed

+39
-19
lines changed

2 files changed

+39
-19
lines changed

rust/worker/src/execution/operators/register.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pub struct RegisterInput {
5151
log: Log,
5252
schema: Option<Schema>,
5353
attached_function_context: Option<AttachedFunctionContext>,
54+
/// Input collection's pulled log offset (used for attached function completion_offset calculation)
55+
input_pulled_log_offset: i64,
5456
}
5557

5658
impl RegisterInput {
@@ -68,6 +70,7 @@ impl RegisterInput {
6870
log: Log,
6971
schema: Option<Schema>,
7072
attached_function_context: Option<AttachedFunctionContext>,
73+
input_pulled_log_offset: i64,
7174
) -> Self {
7275
RegisterInput {
7376
tenant,
@@ -81,6 +84,7 @@ impl RegisterInput {
8184
log,
8285
schema,
8386
attached_function_context,
87+
input_pulled_log_offset,
8488
}
8589
}
8690
}
@@ -145,15 +149,11 @@ impl Operator<RegisterInput, RegisterOutput> for RegisterOperator {
145149
)
146150
})?;
147151

148-
// log_position is "up to which offset we've compacted"
152+
// input_pulled_log_offset is "up to which offset we've compacted from INPUT collection"
149153
// completion_offset is "last offset processed"
150-
// In practice, log_position means "next offset to start compacting from"
151-
// So to get "last offset processed", we subtract 1
152-
let last_offset_processed = if input.log_position > 0 {
153-
(input.log_position - 1).max(0) as u64
154-
} else {
155-
0u64
156-
};
154+
// In practice, input_pulled_log_offset means "next offset to start compacting from"
155+
// So to get "last offset processed"/"completion_offset", we subtract 1
156+
let last_offset_processed = (input.input_pulled_log_offset - 1).max(0) as u64;
157157
let attach_function_update = chroma_types::AttachedFunctionUpdateInfo {
158158
attached_function_id: attached_function.id,
159159
attached_function_run_nonce: attached_function_context.execution_nonce.0,
@@ -338,8 +338,9 @@ mod tests {
338338
size_bytes_post_compaction,
339339
sysdb.clone(),
340340
log.clone(),
341-
None, // schema
342-
None, // attached_function_context
341+
None, // schema
342+
None, // attached_function_context
343+
log_position, // input_pulled_log_offset (same as log_position for non-task compaction)
343344
);
344345

345346
let result = operator.run(&input).await;

rust/worker/src/execution/orchestration/compact.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,14 @@ pub struct CompactOrchestrator {
171171
input_collection_id: CollectionUuid,
172172
input_collection: OnceCell<Collection>,
173173
input_segments: OnceCell<Vec<Segment>>,
174-
/// How much to pull from fetch_logs for INPUT collection
175-
pulled_log_offset: i64,
174+
input_pulled_log_offset: i64,
176175

177176
// === Output Collection (write compacted data to) ===
178177
/// Collection to write compacted segments to
179178
output_collection_id: OnceCell<CollectionUuid>,
180179
output_collection: OnceCell<Collection>,
181180
output_segments: OnceCell<Vec<Segment>>,
181+
output_pulled_log_offset: i64,
182182

183183
// === Writers & Results ===
184184
writers: OnceCell<CompactWriters>,
@@ -366,10 +366,11 @@ impl CompactOrchestrator {
366366
input_collection_id,
367367
input_collection: OnceCell::new(),
368368
input_segments: OnceCell::new(),
369-
pulled_log_offset: 0,
369+
input_pulled_log_offset: 0,
370370
output_collection_id: output_collection_cell,
371371
output_collection: OnceCell::new(),
372372
output_segments: OnceCell::new(),
373+
output_pulled_log_offset: 0,
373374
writers: OnceCell::new(),
374375
flush_results: Vec::new(),
375376
result_channel,
@@ -742,7 +743,7 @@ impl CompactOrchestrator {
742743
let input = RegisterInput::new(
743744
collection.tenant,
744745
collection.collection_id,
745-
self.pulled_log_offset,
746+
self.output_pulled_log_offset,
746747
collection.version,
747748
self.flush_results.clone().into(),
748749
self.total_records_post_compaction,
@@ -751,6 +752,7 @@ impl CompactOrchestrator {
751752
self.log.clone(),
752753
self.schema.clone(),
753754
self.attached_function_context.clone(),
755+
self.input_pulled_log_offset,
754756
);
755757

756758
let task = wrap(
@@ -873,6 +875,17 @@ impl CompactOrchestrator {
873875
))
874876
}
875877

878+
/// Set input_pulled_log_offset to the given position.
879+
/// For regular compaction (input == output), also updates output_pulled_log_offset.
880+
/// For task compaction (input != output), output collection keeps its own log position.
881+
fn set_input_log_offset(&mut self, log_offset: i64) {
882+
self.input_pulled_log_offset = log_offset;
883+
// Only update output offset if input and output are the same collection
884+
if Some(self.input_collection_id) == self.output_collection_id.get().copied() {
885+
self.output_pulled_log_offset = log_offset;
886+
}
887+
}
888+
876889
/// Get output_segments or return error
877890
fn get_output_segments(&self) -> Result<Vec<Segment>, CompactionError> {
878891
self.output_segments
@@ -1120,6 +1133,9 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
11201133
return;
11211134
}
11221135

1136+
// Initialize output_pulled_log_offset from OUTPUT collection's log position
1137+
self.output_pulled_log_offset = output_collection.log_position;
1138+
11231139
// Create output segments vec from individual segment fields
11241140
let output_segments = vec![
11251141
output.output.metadata_segment.clone(),
@@ -1166,8 +1182,8 @@ impl Handler<TaskResult<GetCollectionAndSegmentsOutput, GetCollectionAndSegments
11661182
input_collection.log_position = result;
11671183
}
11681184

1169-
// Set pulled_log_offset from INPUT collection's log position
1170-
self.pulled_log_offset = input_collection.log_position;
1185+
// Initialize input_pulled_log_offset from INPUT collection's log position (last compacted offset)
1186+
self.input_pulled_log_offset = input_collection.log_position;
11711187

11721188
// Create record reader from INPUT segments (for reading existing data)
11731189
let input_record_reader = match self
@@ -1440,8 +1456,11 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for CompactOrchestrator
14401456
tracing::info!("Pulled Records: {}", output.len());
14411457
match output.iter().last() {
14421458
Some((rec, _)) => {
1443-
self.pulled_log_offset = rec.log_offset;
1444-
tracing::info!("Pulled Logs Up To Offset: {:?}", self.pulled_log_offset);
1459+
self.set_input_log_offset(rec.log_offset);
1460+
tracing::info!(
1461+
"Pulled Logs Up To Offset: {:?}",
1462+
self.input_pulled_log_offset
1463+
);
14451464
}
14461465
None => {
14471466
tracing::warn!("No logs were pulled from the log service, this can happen when the log compaction offset is behing the sysdb.");
@@ -1542,7 +1561,7 @@ impl Handler<TaskResult<SourceRecordSegmentOutput, SourceRecordSegmentError>>
15421561
Some(collection) => collection,
15431562
None => return,
15441563
};
1545-
self.pulled_log_offset = input_collection.log_position;
1564+
self.set_input_log_offset(input_collection.log_position);
15461565
self.do_attached_function(output, ctx).await;
15471566
} else {
15481567
self.partition(output, ctx).await;

0 commit comments

Comments
 (0)