-
Notifications
You must be signed in to change notification settings - Fork 484
feat: late materialization of vectors in filtered vector search #5205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: late materialization of vectors in filtered vector search #5205
Conversation
| /// materialization to avoid fetching vector data for rows that will be filtered out. | ||
| /// If the filter selects more rows than this threshold, we do a single scan with vectors | ||
| /// to avoid the random access overhead of the take operation. | ||
| pub const LATE_MATERIALIZE_SELECTIVITY_THRESHOLD: f64 = 0.005; // 0.5% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to be conservative we are gonna want to set this to the appropriate value for cloud storage. This will be too low for some kinds of storage, but should be an improvement over current behavior in all cases (which wouldn't be the case if we went too high).
I'm not sure if 0.005 is right, I think the real number may be lower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also related to this - this mechanism is very blunt. There are ways we can do much better by estimating row widths and incorporating the math on the actual IOs we perform, but I'm not aware we have an approach for that today (essentially a cost model).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on further consideration, I don't think we want this at all. We should just always late materialize here. The reason is that takes and scans are both going to do lots of small object storage requests and at the limit they become pretty indistinguishable. That is why we see convergence in the results across even high selectivities:
Selectivity Rows Late Mat Full Scan Winner Speedup
------------------------------------------------------------------------------------------
0.01% 100 0.0372s 3.8490s late_mat 103.55x
0.05% 500 0.0659s 3.8437s late_mat 58.36x
0.10% 1,000 0.1098s 3.7994s late_mat 34.61x
0.20% 2,000 0.1987s 3.8193s late_mat 19.22x
0.50% 5,000 0.4633s 3.8555s late_mat 8.32x
1.00% 10,000 0.7899s 3.8544s late_mat 4.88x
2.00% 20,000 1.3052s 3.9145s late_mat 3.00x
5.00% 50,000 2.0625s 4.0943s late_mat 1.99x
10.00% 100,000 2.5498s 4.4041s late_mat 1.73x
20.00% 200,000 4.2165s 5.2398s late_mat 1.24x
40.00% 400,000 5.9357s 7.1053s late_mat 1.20x
80.00% 800,000 8.1393s 10.8025s late_mat 1.33x
I'll rip this out and simplify the patch a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements late materialization for filtered vector searches to reduce I/O overhead when filters are highly selective. When executing KNN searches on unindexed data with selective filters, the new approach first scans filter columns to check selectivity, then either uses late materialization (collect row IDs, then fetch vectors) for selective filters or falls back to a full scan for non-selective ones.
Key changes:
- Added adaptive late materialization with configurable selectivity threshold (default 0.5%)
- Implemented early termination when filter selectivity exceeds threshold to avoid collecting all row IDs
- Added stats collection for late materialization strategy decisions
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| rust/lance/src/dataset/scanner.rs | Implements adaptive late materialization logic, adds threshold configuration, updates KNN search paths to use late materialization when appropriate, and adds comprehensive test coverage |
| rust/lance/src/dataset.rs | Removes extraneous None parameters in function calls (unrelated cleanup) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
KNN search is performed when a vector index is not present. When a table is partially covered by a vector index, we perform a union of an ANN search over the indexed data, and a KNN search over the unindexed data. If the table is completely unindexed it is just a KNN search on the data. Prior to this commit, when we would execute the KNN portion of a filtered vector search, we would perform a scan of all columns and remove results that did not match the filter. For large vectors, this amounts to a lot of overfetch from storage. When filters are selective, it is more efficient to read the filter column (typically much smaller than the vector), apply the filter, and then select matching vectors by row ID. This patch implements that strategy as well as an adaptive mechanism for deciding when to apply it. There is a new configuration concept in the scanner for specifying the filter selectivity at which it will be cheaper to do a scan. We will compute a target rowcount based on that threshold and scan the filter column for matches. If we encounter more matches than the target, we will give up and switch to a scan.
55fac97 to
b02d08b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e366bd0 to
c2e4d42
Compare
|
I have got a script here: https://gist.github.com/wkalt/a40d824f3770dfc46f01bcbbb722804b that analyzes various selectivity thresholds for the storage that you point it at. It uses an unindexed table with a scalar and 1024d vector column, with 1M rows. I have some local modifications that expose the tuning parameter through the python SDK for testing. It's unclear to me if we will actually want those. The queries here are KNN vector search with a scalar filter. This is the output it reports on my minio (NVME-backed over LAN): Here is what it produces on my local NVME: In both of the instances above, late materialization wins ever time, and the speedup trend reverses at the end. I am thinking we probably have some inefficiencies in the full scan path that account for this. I still need to test on slow S3 storage; was hoping to replicate that with minio but I think it's too fast. |
|
I also have this script which motivated the investigation: https://gist.github.com/wkalt/aa62e5fc4cb1b90cdaa3c9ffce7099a9 On current lancedb, it outputs this for me: On lancedb patched with this lance change, it produces this: The partially/unindexed cases now use this optimization, and all plans are fast. |
| /// # Errors | ||
| /// | ||
| /// Returns an error if the threshold is not finite or is outside the range [0.0, 1.0]. | ||
| pub fn late_materialize_selectivity_threshold(&mut self, threshold: f64) -> Result<&mut Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we expose this in the python SDK? I'm not sure if we tend to mirror exactly or not. Similar question for lancedb SDK.
I think it's useful to expose and I have done it locally for testing, but it's also an implementation detail that will likely change as our optimizer becomes more sophisticated.
wjones127
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main concern is we should validate filtering on nested columns doesn't break.
rust/lance/src/dataset/scanner.rs
Outdated
| vector_column: &str, | ||
| skip_recheck: bool, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| let mut filter_columns_set: BTreeSet<String> = BTreeSet::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column names feels a bit fragile. It might be nice to collect field ids. Does this work if we are filtering on nested columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like FilterPlan should have a method fn full_projection(&self) -> Projection or something like that to make this easy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, I think we want to move this logic within FilteredReadExec. I don't think we want to have create_plan() actually be running part of the plan in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's how we handled late materialization in the scan with V1 files:
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5205 +/- ##
==========================================
+ Coverage 81.98% 82.07% +0.09%
==========================================
Files 341 342 +1
Lines 141199 141709 +510
Branches 141199 141709 +510
==========================================
+ Hits 115758 116311 +553
+ Misses 21628 21539 -89
- Partials 3813 3859 +46
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
24e0d88 to
825a666
Compare
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of moving late materialization into the filtered read. It simplifies the scanner logic and it makes sense. This node reads from a source, with a filter. Late materialization is part of that.
Minor nit: this approach makes a lot of sense for small selectivity thresholds. If the selectivity threshold is large (or the dataset is very large) then it seems like we could do a lot of work before we start scanning expensive columns. Because we need to read in at least N * s rows (where N is total rows and s is selectivity threshold). Since we can't return data to the user until we have all columns this means the time to first batch increases.
What do you think of this alternative approach? It might be more complicated, and potentially prone to filter bias in the beginning / end of the data.
- Read the first 10K rows (the 10K is counted pre-filter here, and we could pick a different number)
- Estimate the selectivity by calculating # of matched rows / 10K
- Compare estimated selectivity with selectivity threshold to determine if we scan or take
The 10K is just some heuristic. Larger values will take more time before we start reading expensive columns, smaller values will be more prone to estimation error and less able to handle very small selectivity thresholds.
| def late_materialize_selectivity_threshold( | ||
| self, threshold: float | ||
| ) -> ScannerBuilder: | ||
| """ | ||
| Set the selectivity threshold for late materialization in filtered KNN searches. | ||
| When a filter is present in a KNN search, Lance first executes it to measure selectivity. | ||
| If the filter selects fewer than this percentage of rows, Lance uses late materialization | ||
| (scan scalars first, then take vectors for filtered rows only). If the filter selects | ||
| this percentage or more rows, Lance does a single scan with both filter and vector columns | ||
| to avoid the random access overhead of the take operation. | ||
| The optimal value depends on your storage medium: | ||
| - **Object storage (S3, GCS, Azure)**: Use a low threshold like 0.005 (0.5%) since | ||
| random access is very expensive | ||
| - **Local SSD**: Can use a higher threshold like 0.05 (5%) since random access is cheaper | ||
| - **NVMe**: Can use even higher thresholds like 0.1 (10%) | ||
| The default is 0.005 (0.5%), which is conservative for object storage. | ||
| Parameters | ||
| ---------- | ||
| threshold : float | ||
| The selectivity threshold as a fraction (e.g., 0.005 for 0.5%) | ||
| Returns | ||
| ------- | ||
| ScannerBuilder | ||
| Returns self for method chaining | ||
| """ | ||
| if not isinstance(threshold, (int, float)): | ||
| raise TypeError( | ||
| f"late_materialize_selectivity_threshold must be a number, got {type(threshold)}" | ||
| ) | ||
| if not (0.0 <= threshold <= 1.0): | ||
| raise ValueError( | ||
| f"late_materialize_selectivity_threshold must be between 0.0 and 1.0 (inclusive), got {threshold}" | ||
| ) | ||
| self._late_materialize_selectivity_threshold = float(threshold) | ||
| return self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a property of the ObjectStore? We already have fields like io_parallelism, max_iop_size, and block_size which describe how an object store should be interacted with. I think this could be added to the list. Then the defaults will work for most users. Users with a custom storage scenario will need to create a custom object store.
This way, we hide the parameter from most users, and get a better default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that seems like a great idea, thanks.
| /// Adaptive late materialization for filtered vector scans. | ||
| /// | ||
| /// When a filter is present, this method: | ||
| /// 1. Scans with scalar columns first to check selectivity | ||
| /// 2. If selective (< threshold): uses late materialization (collect row IDs, then take vectors) | ||
| /// 3. If not selective (>= threshold): does a full scan with both filter and vector columns | ||
| /// | ||
| /// This avoids expensive random access for non-selective filters while benefiting | ||
| /// from late materialization for selective ones. | ||
| async fn adaptive_column_scan( | ||
| &self, | ||
| filter_plan: &FilterPlan, | ||
| frags: Option<Arc<Vec<Fragment>>>, | ||
| take_column: &str, | ||
| skip_recheck: bool, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| // FilteredRead doesn't support v1/legacy files, so fall back for legacy datasets | ||
| if self.dataset.is_legacy_storage() { | ||
| let mut filter_cols: BTreeSet<String> = BTreeSet::new(); | ||
| if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { | ||
| filter_cols.extend(Planner::column_names_in_expr(refine_expr)); | ||
| } | ||
| if let Some(full_expr) = filter_plan.full_expr.as_ref() { | ||
| filter_cols.extend(Planner::column_names_in_expr(full_expr)); | ||
| } | ||
| let mut filter_cols: Vec<String> = filter_cols.into_iter().collect(); | ||
| filter_cols.sort(); | ||
| return self | ||
| .full_scan_with_filter(filter_plan, frags, take_column, filter_cols, skip_recheck) | ||
| .await; | ||
| } | ||
|
|
||
| // Build full projection (filter columns + vector column) | ||
| let mut filter_cols: BTreeSet<String> = BTreeSet::new(); | ||
| if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { | ||
| filter_cols.extend(Planner::column_names_in_expr(refine_expr)); | ||
| } | ||
| if let Some(full_expr) = filter_plan.full_expr.as_ref() { | ||
| filter_cols.extend(Planner::column_names_in_expr(full_expr)); | ||
| } | ||
| let mut filter_cols: Vec<String> = filter_cols.into_iter().collect(); | ||
| filter_cols.sort(); | ||
| let mut full_projection = self | ||
| .dataset | ||
| .empty_projection() | ||
| .with_row_id() | ||
| .union_columns(&filter_cols, OnMissing::Error)? | ||
| .union_column(take_column, OnMissing::Error)?; | ||
| full_projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; | ||
|
|
||
| // Use new_filtered_read but add adaptive config | ||
| let threshold = self | ||
| .late_materialize_selectivity_threshold | ||
| .unwrap_or(LATE_MATERIALIZE_SELECTIVITY_THRESHOLD); | ||
|
|
||
| let total_count = self.get_total_row_count(frags.as_ref()); | ||
|
|
||
| let mut plan = self | ||
| .new_filtered_read( | ||
| filter_plan, | ||
| full_projection, | ||
| /*make_deletions_null=*/ false, | ||
| frags.clone(), | ||
| /*scan_range=*/ None, | ||
| ) | ||
| .await?; | ||
|
|
||
| // Unwrap FilteredReadExec to add adaptive config | ||
| if let Some(filtered_exec) = plan.as_any().downcast_ref::<FilteredReadExec>() { | ||
| let mut opts = filtered_exec.options().clone(); | ||
| opts.adaptive_expensive_column = Some(AdaptiveColumnConfig { | ||
| expensive_column: take_column.to_string(), | ||
| threshold, | ||
| total_row_count: total_count, | ||
| }); | ||
|
|
||
| plan = Arc::new(FilteredReadExec::try_new( | ||
| filtered_exec.dataset().clone(), | ||
| opts, | ||
| filtered_exec.index_input().cloned(), | ||
| )?); | ||
| } | ||
|
|
||
| // Apply refine filter if needed | ||
| if !skip_recheck { | ||
| if let Some(refine_expr) = &filter_plan.refine_expr { | ||
| plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); | ||
| } | ||
| } | ||
|
|
||
| Ok(plan) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the adaptive column scan both here and in filtered read? I wasn't sure if Will's comment had been addressed yet or not and, if it has, did we still need something in scanner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry. I think I started moving it into filtered_read and stopped halfway through. I think it's better in there.
| fn obtain_adaptive_stream( | ||
| &self, | ||
| partition: usize, | ||
| context: Arc<TaskContext>, | ||
| ) -> SendableRecordBatchStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very big method (admittedly, pot calling kettle black scenario here), can we break it up?
| // Concatenate all cheap batches into one | ||
| use arrow_select::concat::concat_batches; | ||
| let cheap_schema = cheap_batches[0].schema(); | ||
| let cheap_batch_combined = | ||
| concat_batches(&cheap_schema, &cheap_batches).map_err(|e| Error::Arrow { | ||
| message: format!("Failed to concatenate cheap batches: {}", e), | ||
| location: location!(), | ||
| })?; | ||
|
|
||
| // Collect all expensive batches | ||
| let mut expensive_batches = Vec::new(); | ||
| while let Some(batch_result) = expensive_stream.next().await { | ||
| let batch = batch_result.map_err(Error::from)?; | ||
| expensive_batches.push(batch); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this materializing the entire read into memory? We will need an approach that can work iteratively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, there is always the dumb easy alternative (what we do today) which is just pick one of take/scan and always use it for expensive columns. I'm not actually sure the two should have different performance characteristics anymore.
Nevermind, I kind of confused myself. There is still some possibility here, but it isn't as simple as I described. We can talk it through offline but I'm not sure we need to worry about it.
@westonpace regarding the risk of reading a lot - yeah, I should do some better math to actually quantify what it'll look like. I think in general, in cases where this approach becomes expensive the scanning approach is also expensive. But there are various thresholds to consider and also maybe a question of whether the materialized filter results are too expensive to materialize for the superlarge table case. I also think the time to first batch in the previous setup is very dependent on the location in the table of the first matching record. If you are lucky you get one immediately and scanning is a win, but if you are not lucky then using the approach here could win out even if you need to read a lot of (cheaper) rows. For your heuristic I think the main concern is what you say -- it's really biased toward the first 10K rows of the table. I think that could be a serious limitation. Another thing I think I need to examine is how things work when the filter is very large/complicated. |
|
another idea that occurs to me for limiting the downside once we have a better analysis, is we could limit this to filter expressions with only one or two columns, or also put some heuristics in place around aborting early if the expected size of the materialized X% is assessed to be too large. My guess is the majority of filters are on small data types and small values, and don't include very many conditions. |
KNN search is performed when a vector index is not present. When a table is partially covered by a vector index, we perform a union of an ANN search over the indexed data, and a KNN search over the unindexed data. If the table is completely unindexed it is just a KNN search on the data.
Prior to this commit, when we would execute the KNN portion of a filtered vector search, we would perform a scan of all columns and remove results that did not match the filter. For large vectors, this amounts to a lot of overfetch from storage.
When filters are selective, it is more efficient to read the filter column (typically much smaller than the vector), apply the filter, and then select matching vectors by row ID.
This patch implements that strategy as well as an adaptive mechanism for deciding when to apply it. There is a new configuration concept in the scanner for specifying the filter selectivity at which it will be cheaper to do a scan. We will compute a target rowcount based on that threshold and scan the filter column for matches. If we encounter more matches than the target, we will give up and switch to a scan.