diff --git a/rust/worker/src/execution/operators/register.rs b/rust/worker/src/execution/operators/register.rs index edd414da690..498a5acc57b 100644 --- a/rust/worker/src/execution/operators/register.rs +++ b/rust/worker/src/execution/operators/register.rs @@ -51,6 +51,8 @@ pub struct RegisterInput { log: Log, schema: Option, attached_function_context: Option, + /// Input collection's pulled log offset (used for attached function completion_offset calculation) + input_pulled_log_offset: i64, } impl RegisterInput { @@ -68,6 +70,7 @@ impl RegisterInput { log: Log, schema: Option, attached_function_context: Option, + input_pulled_log_offset: i64, ) -> Self { RegisterInput { tenant, @@ -81,6 +84,7 @@ impl RegisterInput { log, schema, attached_function_context, + input_pulled_log_offset, } } } @@ -145,15 +149,11 @@ impl Operator for RegisterOperator { ) })?; - // log_position is "up to which offset we've compacted" + // input_pulled_log_offset is "up to which offset we've compacted from INPUT collection" // completion_offset is "last offset processed" - // In practice, log_position means "next offset to start compacting from" - // So to get "last offset processed", we subtract 1 - let last_offset_processed = if input.log_position > 0 { - (input.log_position - 1).max(0) as u64 - } else { - 0u64 - }; + // In practice, input_pulled_log_offset means "next offset to start compacting from" + // So to get "last offset processed"/"completion_offset", we subtract 1 + let last_offset_processed = (input.input_pulled_log_offset - 1).max(0) as u64; let attach_function_update = chroma_types::AttachedFunctionUpdateInfo { attached_function_id: attached_function.id, attached_function_run_nonce: attached_function_context.execution_nonce.0, @@ -338,8 +338,9 @@ mod tests { size_bytes_post_compaction, sysdb.clone(), log.clone(), - None, // schema - None, // attached_function_context + None, // schema + None, // attached_function_context + log_position, // input_pulled_log_offset (same as log_position for non-task compaction) ); let result = operator.run(&input).await; diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index 8a9f8b11df7..49a537c2251 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -171,14 +171,14 @@ pub struct CompactOrchestrator { input_collection_id: CollectionUuid, input_collection: OnceCell, input_segments: OnceCell>, - /// How much to pull from fetch_logs for INPUT collection - pulled_log_offset: i64, + input_pulled_log_offset: i64, // === Output Collection (write compacted data to) === /// Collection to write compacted segments to output_collection_id: OnceCell, output_collection: OnceCell, output_segments: OnceCell>, + output_pulled_log_offset: i64, // === Writers & Results === writers: OnceCell, @@ -366,10 +366,11 @@ impl CompactOrchestrator { input_collection_id, input_collection: OnceCell::new(), input_segments: OnceCell::new(), - pulled_log_offset: 0, + input_pulled_log_offset: 0, output_collection_id: output_collection_cell, output_collection: OnceCell::new(), output_segments: OnceCell::new(), + output_pulled_log_offset: 0, writers: OnceCell::new(), flush_results: Vec::new(), result_channel, @@ -742,7 +743,7 @@ impl CompactOrchestrator { let input = RegisterInput::new( collection.tenant, collection.collection_id, - self.pulled_log_offset, + self.output_pulled_log_offset, collection.version, self.flush_results.clone().into(), self.total_records_post_compaction, @@ -751,6 +752,7 @@ impl CompactOrchestrator { self.log.clone(), self.schema.clone(), self.attached_function_context.clone(), + self.input_pulled_log_offset, ); let task = wrap( @@ -873,6 +875,17 @@ impl CompactOrchestrator { )) } + /// Set input_pulled_log_offset to the given position. + /// For regular compaction (input == output), also updates output_pulled_log_offset. + /// For task compaction (input != output), output collection keeps its own log position. + fn set_input_log_offset(&mut self, log_offset: i64) { + self.input_pulled_log_offset = log_offset; + // Only update output offset if input and output are the same collection + if Some(self.input_collection_id) == self.output_collection_id.get().copied() { + self.output_pulled_log_offset = log_offset; + } + } + /// Get output_segments or return error fn get_output_segments(&self) -> Result, CompactionError> { self.output_segments @@ -1120,6 +1133,9 @@ impl Handler> for CompactOrchestrator tracing::info!("Pulled Records: {}", output.len()); match output.iter().last() { Some((rec, _)) => { - self.pulled_log_offset = rec.log_offset; - tracing::info!("Pulled Logs Up To Offset: {:?}", self.pulled_log_offset); + self.set_input_log_offset(rec.log_offset); + tracing::info!( + "Pulled Logs Up To Offset: {:?}", + self.input_pulled_log_offset + ); } None => { tracing::warn!("No logs were pulled from the log service, this can happen when the log compaction offset is behing the sysdb."); @@ -1542,7 +1561,7 @@ impl Handler> Some(collection) => collection, None => return, }; - self.pulled_log_offset = input_collection.log_position; + self.set_input_log_offset(input_collection.log_position); self.do_attached_function(output, ctx).await; } else { self.partition(output, ctx).await;