From b26776ba3c1c223daef13bbe5b4a869a6da899a8 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 13 Nov 2025 16:24:38 +0800 Subject: [PATCH 1/2] feat: strategized plan compaction --- rust/lance/src/dataset/optimize.rs | 312 ++++++++++++++++++----------- 1 file changed, 195 insertions(+), 117 deletions(-) diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index a6e0061421..b1c16c9e3e 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -205,9 +205,157 @@ impl AddAssign for CompactionMetrics { } } +/// Trait for implementing custom compaction planning strategies. +/// +/// This trait allows users to define their own compaction strategies by implementing +/// the `plan` method. The default implementation is provided by [`DefaultCompactionPlanner`]. +#[async_trait::async_trait] +pub trait CompactionPlanner: Send + Sync { + // get all fragments by default + fn get_fragments(&self, dataset: &Dataset, _options: &CompactionOptions) -> Vec { + // get_fragments should be returning fragments in sorted order (by id) + // and fragment ids should be unique + dataset.get_fragments() + } + + // no filter by default + async fn filter_fragments( + &self, + _dataset: &Dataset, + fragments: Vec, + _options: &CompactionOptions, + ) -> Result> { + Ok(fragments) + } + + async fn plan(&self, dataset: &Dataset, options: &CompactionOptions) -> Result; +} + +/// Formulate a plan to compact the files in a dataset +/// +/// The compaction plan will contain a list of tasks to execute. Each task +/// will contain approximately `target_rows_per_fragment` rows and will be +/// rewriting fragments that are adjacent in the dataset's fragment list. Some +/// tasks may contain a single fragment when that fragment has deletions that +/// are being materialized and doesn't have any neighbors that need to be +/// compacted. +#[derive(Debug, Clone, Default)] +pub struct DefaultCompactionPlanner; + +#[async_trait::async_trait] +impl CompactionPlanner for DefaultCompactionPlanner { + async fn plan(&self, dataset: &Dataset, options: &CompactionOptions) -> Result { + let fragments = self.get_fragments(dataset, options); + debug_assert!( + fragments.windows(2).all(|w| w[0].id() < w[1].id()), + "fragments in manifest are not sorted" + ); + let mut fragment_metrics = futures::stream::iter(fragments) + .map(|fragment| async move { + match collect_metrics(&fragment).await { + Ok(metrics) => Ok((fragment.metadata, metrics)), + Err(e) => Err(e), + } + }) + .buffered(dataset.object_store().io_parallelism()); + + let index_fragmaps = load_index_fragmaps(dataset).await?; + let indices_containing_frag = |frag_id: u32| { + index_fragmaps + .iter() + .enumerate() + .filter(|(_, bitmap)| bitmap.contains(frag_id)) + .map(|(pos, _)| pos) + .collect::>() + }; + + let mut candidate_bins: Vec = Vec::new(); + let mut current_bin: Option = None; + let mut i = 0; + + while let Some(res) = fragment_metrics.next().await { + let (fragment, metrics) = res?; + + let candidacy = if options.materialize_deletions + && metrics.deletion_percentage() > options.materialize_deletions_threshold + { + Some(CompactionCandidacy::CompactItself) + } else if metrics.physical_rows < options.target_rows_per_fragment { + // Only want to compact if their are neighbors to compact such that + // we can get a larger fragment. + Some(CompactionCandidacy::CompactWithNeighbors) + } else { + // Not a candidate + None + }; + + let indices = indices_containing_frag(fragment.id as u32); + + match (candidacy, &mut current_bin) { + (None, None) => {} // keep searching + (Some(candidacy), None) => { + // Start a new bin + current_bin = Some(CandidateBin { + fragments: vec![fragment], + pos_range: i..(i + 1), + candidacy: vec![candidacy], + row_counts: vec![metrics.num_rows()], + indices, + }); + } + (Some(candidacy), Some(bin)) => { + // We cannot mix "indexed" and "non-indexed" fragments and so we only consider + // the existing bin if it contains the same indices + if bin.indices == indices { + // Add to current bin + bin.fragments.push(fragment); + bin.pos_range.end += 1; + bin.candidacy.push(candidacy); + bin.row_counts.push(metrics.num_rows()); + } else { + // Index set is different. Complete previous bin and start new one + candidate_bins.push(current_bin.take().unwrap()); + current_bin = Some(CandidateBin { + fragments: vec![fragment], + pos_range: i..(i + 1), + candidacy: vec![candidacy], + row_counts: vec![metrics.num_rows()], + indices, + }); + } + } + (None, Some(_)) => { + // Bin is complete + candidate_bins.push(current_bin.take().unwrap()); + } + } + + i += 1; + } + + // Flush the last bin + if let Some(bin) = current_bin { + candidate_bins.push(bin); + } + + let final_bins = candidate_bins + .into_iter() + .filter(|bin| !bin.is_noop()) + .flat_map(|bin| bin.split_for_size(options.target_rows_per_fragment)) + .map(|bin| TaskData { + fragments: bin.fragments, + }); + + let mut compaction_plan = CompactionPlan::new(dataset.manifest.version, options.clone()); + compaction_plan.extend_tasks(final_bins); + + Ok(compaction_plan) + } +} + /// Compacts the files in the dataset without reordering them. /// -/// This does a few things: +/// By default, his does a few things: /// * Removes deleted rows from fragments. /// * Removes dropped columns from fragments. /// * Merges fragments that are too small. @@ -216,14 +364,24 @@ impl AddAssign for CompactionMetrics { /// /// If no compaction is needed, this method will not make a new version of the table. pub async fn compact_files( + dataset: &mut Dataset, + options: CompactionOptions, + remap_options: Option>, // These will be deprecated later +) -> Result { + let planner = DefaultCompactionPlanner; + compact_files_with_planner(dataset, options, remap_options, &planner).await +} + +pub async fn compact_files_with_planner( dataset: &mut Dataset, mut options: CompactionOptions, remap_options: Option>, // These will be deprecated later + planner: &dyn CompactionPlanner, ) -> Result { info!(target: TRACE_DATASET_EVENTS, event=DATASET_COMPACTING_EVENT, uri = &dataset.uri); options.validate(); - let compaction_plan: CompactionPlan = plan_compaction(dataset, &options).await?; + let compaction_plan: CompactionPlan = planner.plan(dataset, &options).await?; // If nothing to compact, don't make a commit. if compaction_plan.tasks().is_empty() { @@ -457,125 +615,12 @@ async fn load_index_fragmaps(dataset: &Dataset) -> Result> { Ok(index_fragmaps) } -/// Formulate a plan to compact the files in a dataset -/// -/// The compaction plan will contain a list of tasks to execute. Each task -/// will contain approximately `target_rows_per_fragment` rows and will be -/// rewriting fragments that are adjacent in the dataset's fragment list. Some -/// tasks may contain a single fragment when that fragment has deletions that -/// are being materialized and doesn't have any neighbors that need to be -/// compacted. pub async fn plan_compaction( dataset: &Dataset, options: &CompactionOptions, ) -> Result { - // get_fragments should be returning fragments in sorted order (by id) - // and fragment ids should be unique - let fragments = dataset.get_fragments(); - debug_assert!( - fragments.windows(2).all(|w| w[0].id() < w[1].id()), - "fragments in manifest are not sorted" - ); - let mut fragment_metrics = futures::stream::iter(fragments) - .map(|fragment| async move { - match collect_metrics(&fragment).await { - Ok(metrics) => Ok((fragment.metadata, metrics)), - Err(e) => Err(e), - } - }) - .buffered(dataset.object_store().io_parallelism()); - - let index_fragmaps = load_index_fragmaps(dataset).await?; - let indices_containing_frag = |frag_id: u32| { - index_fragmaps - .iter() - .enumerate() - .filter(|(_, bitmap)| bitmap.contains(frag_id)) - .map(|(pos, _)| pos) - .collect::>() - }; - - let mut candidate_bins: Vec = Vec::new(); - let mut current_bin: Option = None; - let mut i = 0; - - while let Some(res) = fragment_metrics.next().await { - let (fragment, metrics) = res?; - - let candidacy = if options.materialize_deletions - && metrics.deletion_percentage() > options.materialize_deletions_threshold - { - Some(CompactionCandidacy::CompactItself) - } else if metrics.physical_rows < options.target_rows_per_fragment { - // Only want to compact if their are neighbors to compact such that - // we can get a larger fragment. - Some(CompactionCandidacy::CompactWithNeighbors) - } else { - // Not a candidate - None - }; - - let indices = indices_containing_frag(fragment.id as u32); - - match (candidacy, &mut current_bin) { - (None, None) => {} // keep searching - (Some(candidacy), None) => { - // Start a new bin - current_bin = Some(CandidateBin { - fragments: vec![fragment], - pos_range: i..(i + 1), - candidacy: vec![candidacy], - row_counts: vec![metrics.num_rows()], - indices, - }); - } - (Some(candidacy), Some(bin)) => { - // We cannot mix "indexed" and "non-indexed" fragments and so we only consider - // the existing bin if it contains the same indices - if bin.indices == indices { - // Add to current bin - bin.fragments.push(fragment); - bin.pos_range.end += 1; - bin.candidacy.push(candidacy); - bin.row_counts.push(metrics.num_rows()); - } else { - // Index set is different. Complete previous bin and start new one - candidate_bins.push(current_bin.take().unwrap()); - current_bin = Some(CandidateBin { - fragments: vec![fragment], - pos_range: i..(i + 1), - candidacy: vec![candidacy], - row_counts: vec![metrics.num_rows()], - indices, - }); - } - } - (None, Some(_)) => { - // Bin is complete - candidate_bins.push(current_bin.take().unwrap()); - } - } - - i += 1; - } - - // Flush the last bin - if let Some(bin) = current_bin { - candidate_bins.push(bin); - } - - let final_bins = candidate_bins - .into_iter() - .filter(|bin| !bin.is_noop()) - .flat_map(|bin| bin.split_for_size(options.target_rows_per_fragment)) - .map(|bin| TaskData { - fragments: bin.fragments, - }); - - let mut compaction_plan = CompactionPlan::new(dataset.manifest.version, options.clone()); - compaction_plan.extend_tasks(final_bins); - - Ok(compaction_plan) + let planner = DefaultCompactionPlanner; + planner.plan(dataset, options).await } /// The result of a single compaction task. @@ -3519,4 +3564,37 @@ mod tests { plan ); } + + #[tokio::test] + async fn test_default_compaction_planner() { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + + let data = sample_data(); + let schema = data.schema(); + + // Create dataset with multiple small fragments + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 2000, + ..Default::default() + }; + let dataset = Dataset::write(reader, test_uri, Some(write_params)) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 5); + + // Test default planner + let planner = DefaultCompactionPlanner; + let options = CompactionOptions { + target_rows_per_fragment: 5000, + ..Default::default() + }; + let plan = planner.plan(&dataset, &options).await.unwrap(); + + // Should create tasks to compact small fragments + assert!(!plan.tasks.is_empty()); + assert_eq!(plan.read_version, dataset.manifest.version); + } } From 18eea6d1f8b560a6211ae50377e24ddf23798a9e Mon Sep 17 00:00:00 2001 From: YueZhang Date: Mon, 17 Nov 2025 15:18:13 +0800 Subject: [PATCH 2/2] code review --- rust/lance/src/dataset/optimize.rs | 43 +++++++++++++++++++----------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index b1c16c9e3e..37aec220bf 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -218,17 +218,22 @@ pub trait CompactionPlanner: Send + Sync { dataset.get_fragments() } - // no filter by default - async fn filter_fragments( + /// Build compaction plan. + /// + /// This method analyzes the dataset's fragments and generates a [`CompactionPlan`] + /// containing a list of compaction tasks to execute. + /// + /// # Arguments + /// + /// * `dataset` - Reference to the dataset to be compacted + /// * `options` - Compaction options including target row count, deletion thresholds, etc. + /// * `configs` - Additional configuration parameters as key-value pairs + async fn plan( &self, - _dataset: &Dataset, - fragments: Vec, - _options: &CompactionOptions, - ) -> Result> { - Ok(fragments) - } - - async fn plan(&self, dataset: &Dataset, options: &CompactionOptions) -> Result; + dataset: &Dataset, + options: &CompactionOptions, + configs: HashMap, + ) -> Result; } /// Formulate a plan to compact the files in a dataset @@ -244,7 +249,12 @@ pub struct DefaultCompactionPlanner; #[async_trait::async_trait] impl CompactionPlanner for DefaultCompactionPlanner { - async fn plan(&self, dataset: &Dataset, options: &CompactionOptions) -> Result { + async fn plan( + &self, + dataset: &Dataset, + options: &CompactionOptions, + _configs: HashMap, + ) -> Result { let fragments = self.get_fragments(dataset, options); debug_assert!( fragments.windows(2).all(|w| w[0].id() < w[1].id()), @@ -355,7 +365,7 @@ impl CompactionPlanner for DefaultCompactionPlanner { /// Compacts the files in the dataset without reordering them. /// -/// By default, his does a few things: +/// By default, this does a few things: /// * Removes deleted rows from fragments. /// * Removes dropped columns from fragments. /// * Merges fragments that are too small. @@ -381,7 +391,7 @@ pub async fn compact_files_with_planner( info!(target: TRACE_DATASET_EVENTS, event=DATASET_COMPACTING_EVENT, uri = &dataset.uri); options.validate(); - let compaction_plan: CompactionPlan = planner.plan(dataset, &options).await?; + let compaction_plan: CompactionPlan = planner.plan(dataset, &options, HashMap::new()).await?; // If nothing to compact, don't make a commit. if compaction_plan.tasks().is_empty() { @@ -620,7 +630,7 @@ pub async fn plan_compaction( options: &CompactionOptions, ) -> Result { let planner = DefaultCompactionPlanner; - planner.plan(dataset, options).await + planner.plan(dataset, options, HashMap::new()).await } /// The result of a single compaction task. @@ -3591,7 +3601,10 @@ mod tests { target_rows_per_fragment: 5000, ..Default::default() }; - let plan = planner.plan(&dataset, &options).await.unwrap(); + let plan = planner + .plan(&dataset, &options, HashMap::new()) + .await + .unwrap(); // Should create tasks to compact small fragments assert!(!plan.tasks.is_empty());