Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ arrow-row = "56.1"
arrow-schema = "56.1"
arrow-select = "56.1"
async-recursion = "1.0"
async-stream = "0.3.6"
async-trait = "0.1"
aws-config = "1.2.0"
aws-credential-types = "1.2.0"
Expand Down
23 changes: 23 additions & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ arrow-schema.workspace = true
arrow-select.workspace = true
arrow-arith.workspace = true
async-recursion.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bitvec.workspace = true
datafusion-common.workspace = true
Expand Down
49 changes: 49 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::{any::Any, ops::Bound, sync::Arc};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
use lance_core::utils::mask::RowIdTreeMap;
use lance_core::{Error, Result};
Expand Down Expand Up @@ -198,6 +200,53 @@ pub trait IndexReader: Send + Sync {
fn schema(&self) -> &lance_core::datatypes::Schema;
}

/// A stream that reads the original training data back out of the index
///
/// This is used for updating the index
pub struct IndexReaderStream {
reader: Arc<dyn IndexReader>,
batch_size: u64,
num_batches: u32,
batch_idx: u32,
}

impl IndexReaderStream {
async fn new(reader: Arc<dyn IndexReader>, batch_size: u64) -> Self {
let num_batches = reader.num_batches(batch_size).await;
Self {
reader,
batch_size,
num_batches,
batch_idx: 0,
}
}
}

impl Stream for IndexReaderStream {
type Item = BoxFuture<'static, Result<RecordBatch>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.batch_idx >= this.num_batches {
return std::task::Poll::Ready(None);
}
let batch_num = this.batch_idx;
this.batch_idx += 1;
let reader_copy = this.reader.clone();
let batch_size = this.batch_size;
let read_task = async move {
reader_copy
.read_record_batch(batch_num as u64, batch_size)
.await
}
.boxed();
std::task::Poll::Ready(Some(read_task))
}
}

/// Trait abstracting I/O away from index logic
///
/// Scalar indices are currently serialized as indexable arrow record batches stored in
Expand Down
Loading
Loading