Skip to content

Commit acfaee5

Browse files
authored
feat(memory): optimize collection preallocation where capacity is known (#3895)
# Description Following [this](#3826), this work implements a first round of preallocating collections when their size is known, to avoid unnecessary reallocations. This applies to `Vec`, `HashMap` (with_capacity), and other dynamically sized collections. Preallocating memory improves efficiency and reduces overhead when the final size is known. Signed-off-by: Florian Valeye <[email protected]>
1 parent 4106d43 commit acfaee5

File tree

11 files changed

+26
-24
lines changed

11 files changed

+26
-24
lines changed

crates/core/src/delta_datafusion/find_files.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,9 @@ async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaR
290290
.collect_vec();
291291

292292
let batch = snapshot.add_actions_table(true)?;
293-
let mut arrays = Vec::new();
294-
let mut fields = Vec::new();
295-
296293
let schema = batch.schema();
294+
let mut arrays = Vec::with_capacity(schema.fields().len());
295+
let mut fields = Vec::with_capacity(schema.fields().len());
297296

298297
arrays.push(
299298
batch

crates/core/src/delta_datafusion/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ impl DeltaDataChecker {
597597

598598
/// Return true if all the nullability checks are valid
599599
fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
600-
let mut violations = Vec::new();
600+
let mut violations = Vec::with_capacity(self.non_nullable_columns.len());
601601
for col in self.non_nullable_columns.iter() {
602602
if let Some(arr) = record_batch.column_by_name(col) {
603603
if arr.null_count() > 0 {
@@ -633,7 +633,7 @@ impl DeltaDataChecker {
633633
let table_name: String = uuid::Uuid::new_v4().to_string();
634634
self.ctx.register_table(&table_name, Arc::new(table))?;
635635

636-
let mut violations: Vec<String> = Vec::new();
636+
let mut violations: Vec<String> = Vec::with_capacity(checks.len());
637637

638638
for check in checks {
639639
if check.get_name().contains('.') {

crates/core/src/delta_datafusion/table_provider.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ impl<'a> DeltaScanBuilder<'a> {
438438
)?;
439439

440440
let logical_schema = if let Some(used_columns) = self.projection {
441-
let mut fields = vec![];
441+
let mut fields = Vec::with_capacity(used_columns.len());
442442
for idx in used_columns {
443443
fields.push(logical_schema.field(*idx).to_owned());
444444
}
@@ -517,9 +517,9 @@ impl<'a> DeltaScanBuilder<'a> {
517517

518518
// needed to enforce limit and deal with missing statistics
519519
// rust port of https://github.com/delta-io/delta/pull/1495
520-
let mut pruned_without_stats = vec![];
520+
let mut pruned_without_stats = Vec::new();
521521
let mut rows_collected = 0;
522-
let mut files = vec![];
522+
let mut files = Vec::with_capacity(num_containers);
523523

524524
let file_actions: Vec<_> = self
525525
.snapshot

crates/core/src/kernel/snapshot/iterators.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,8 @@ where
334334
Scalar::Timestamp(v) => Scalar::Timestamp(func(v)),
335335
Scalar::TimestampNtz(v) => Scalar::TimestampNtz(func(v)),
336336
Scalar::Struct(struct_data) => {
337-
let mut fields = Vec::new();
338-
let mut scalars = Vec::new();
337+
let mut fields = Vec::with_capacity(struct_data.fields().len());
338+
let mut scalars = Vec::with_capacity(struct_data.values().len());
339339

340340
for (field, value) in struct_data.fields().iter().zip(struct_data.values().iter()) {
341341
fields.push(field.clone());

crates/core/src/operations/merge/barrier.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::{
1616
task::{Context, Poll},
1717
};
1818

19-
use arrow::array::{builder::UInt64Builder, ArrayRef, RecordBatch};
19+
use arrow::array::{builder::UInt64Builder, Array, ArrayRef, RecordBatch};
2020
use arrow::datatypes::SchemaRef;
2121
use dashmap::DashSet;
2222
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -253,7 +253,7 @@ impl Stream for MergeBarrierStream {
253253
// However this approach exposes the cost of hashing so we want to minimize that as much as possible.
254254
// A map from an arrow dictionary key to the correct index of `file_partition` is created for each batch that's processed.
255255
// This ensures we only need to hash each file path at most once per batch.
256-
let mut key_map = Vec::new();
256+
let mut key_map = Vec::with_capacity(file_dictionary.len());
257257

258258
for file_name in file_dictionary.values().into_iter() {
259259
let key = match file_name {
@@ -273,9 +273,11 @@ impl Stream for MergeBarrierStream {
273273
key_map.push(key)
274274
}
275275

276-
let mut indices: Vec<_> = (0..(self.file_partitions.len()))
277-
.map(|_| UInt64Builder::with_capacity(batch.num_rows()))
278-
.collect();
276+
let mut indices: Vec<_> =
277+
Vec::with_capacity(self.file_partitions.len());
278+
for _ in 0..self.file_partitions.len() {
279+
indices.push(UInt64Builder::with_capacity(batch.num_rows()));
280+
}
279281

280282
for (idx, key) in file_dictionary.keys().iter().enumerate() {
281283
match key {

crates/core/src/operations/write/execution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ pub(crate) async fn write_execution_plan_v2(
327327
});
328328

329329
// spawn one worker per partition stream to drive DataFusion concurrently
330-
let mut worker_handles = Vec::new();
330+
let mut worker_handles = Vec::with_capacity(partition_streams.len());
331331
let scan_start = std::time::Instant::now();
332332
for mut partition_stream in partition_streams {
333333
let tx_clone = tx.clone();
@@ -444,7 +444,7 @@ pub(crate) async fn write_execution_plan_v2(
444444
});
445445

446446
// spawn partition workers that split batches and send to appropriate writer channel
447-
let mut worker_handles = Vec::new();
447+
let mut worker_handles = Vec::with_capacity(partition_streams.len());
448448
let scan_start = std::time::Instant::now();
449449
for mut partition_stream in partition_streams {
450450
let txn = tx_normal.clone();

crates/core/src/operations/write/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,8 @@ impl std::future::IntoFuture for WriteBuilder {
508508
}
509509
}
510510
if let Some(new_schema) = new_schema {
511-
let mut schema_evolution_projection = Vec::new();
511+
let mut schema_evolution_projection =
512+
Vec::with_capacity(new_schema.fields().len());
512513
for field in new_schema.fields() {
513514
// If field exist in source data, we cast to new datatype
514515
if source_schema.index_of(field.name()).is_ok() {

crates/core/src/writer/json.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
375375
#[instrument(skip(self), fields(writer_count = 0))]
376376
async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError> {
377377
let writers = std::mem::take(&mut self.arrow_writers);
378-
let mut actions = Vec::new();
378+
let mut actions = Vec::with_capacity(writers.len());
379379

380380
Span::current().record("writer_count", writers.len());
381381

@@ -432,8 +432,8 @@ fn quarantine_failed_parquet_rows(
432432
arrow_schema: Arc<ArrowSchema>,
433433
values: Vec<Value>,
434434
) -> Result<(Vec<Value>, Vec<BadValue>), DeltaWriterError> {
435-
let mut good: Vec<Value> = Vec::new();
436-
let mut bad: Vec<BadValue> = Vec::new();
435+
let mut good: Vec<Value> = Vec::with_capacity(values.len());
436+
let mut bad: Vec<BadValue> = Vec::with_capacity(values.len());
437437

438438
for value in values {
439439
let record_batch =

crates/core/src/writer/record_batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
274274
/// Writes the existing parquet bytes to storage and resets internal state to handle another file.
275275
async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError> {
276276
let writers = std::mem::take(&mut self.arrow_writers);
277-
let mut actions = Vec::new();
277+
let mut actions = Vec::with_capacity(writers.len());
278278

279279
for (_, writer) in writers {
280280
let metadata = writer.arrow_writer.close()?;

crates/core/src/writer/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub(crate) fn record_batch_without_partitions(
7575
record_batch: &RecordBatch,
7676
partition_columns: &[String],
7777
) -> Result<RecordBatch, DeltaWriterError> {
78-
let mut non_partition_columns = Vec::new();
78+
let mut non_partition_columns = Vec::with_capacity(record_batch.schema().fields().len());
7979
for (i, field) in record_batch.schema().fields().iter().enumerate() {
8080
if !partition_columns.contains(field.name()) {
8181
non_partition_columns.push(i);

0 commit comments

Comments
 (0)