Skip to content

Commit a1ac061

Browse files
fix: pre-warm listing file statistics cache during listing table creation
Signed-off-by: bharath-techie <[email protected]>
1 parent fb14d7c commit a1ac061

File tree

1 file changed

+99
-5
lines changed

1 file changed

+99
-5
lines changed

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,12 @@ impl TableProviderFactory for ListingTableFactory {
190190
.with_definition(cmd.definition.clone())
191191
.with_constraints(cmd.constraints.clone())
192192
.with_column_defaults(cmd.column_defaults.clone());
193+
194+
// Pre-warm statistics cache if collect_statistics is enabled
195+
if session_state.config().collect_statistics() {
196+
let _ = table.list_files_for_scan(state, &[], None).await?;
197+
}
198+
193199
Ok(Arc::new(table))
194200
}
195201
}
@@ -205,17 +211,21 @@ fn get_extension(path: &str) -> String {
205211

206212
#[cfg(test)]
207213
mod tests {
214+
use super::*;
215+
use crate::{
216+
datasource::file_format::csv::CsvFormat, execution::context::SessionContext,
217+
test_util::parquet_test_data,
218+
};
219+
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
220+
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
221+
use datafusion_execution::cache::CacheAccessor;
208222
use datafusion_execution::config::SessionConfig;
223+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
209224
use glob::Pattern;
210225
use std::collections::HashMap;
211226
use std::fs;
212227
use std::path::PathBuf;
213228

214-
use super::*;
215-
use crate::{
216-
datasource::file_format::csv::CsvFormat, execution::context::SessionContext,
217-
};
218-
219229
use datafusion_common::parsers::CompressionTypeVariant;
220230
use datafusion_common::{Constraints, DFSchema, TableReference};
221231

@@ -519,4 +529,88 @@ mod tests {
519529
let listing_options = listing_table.options();
520530
assert!(listing_options.table_partition_cols.is_empty());
521531
}
532+
533+
#[tokio::test]
534+
async fn test_statistics_cache_prewarming() {
535+
let factory = ListingTableFactory::new();
536+
537+
// Test with collect_statistics enabled
538+
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
539+
let cache_config = CacheManagerConfig::default()
540+
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
541+
let runtime = RuntimeEnvBuilder::new()
542+
.with_cache_manager(cache_config)
543+
.build_arc()
544+
.unwrap();
545+
546+
let mut config = SessionConfig::new();
547+
config.options_mut().execution.collect_statistics = true;
548+
let context = SessionContext::new_with_config_rt(config, runtime);
549+
let state = context.state();
550+
let name = TableReference::bare("test");
551+
552+
let cmd = CreateExternalTable {
553+
name,
554+
location: parquet_test_data(),
555+
file_type: "parquet".to_string(),
556+
schema: Arc::new(DFSchema::empty()),
557+
table_partition_cols: vec![],
558+
if_not_exists: false,
559+
or_replace: false,
560+
temporary: false,
561+
definition: None,
562+
order_exprs: vec![],
563+
unbounded: false,
564+
options: HashMap::new(),
565+
constraints: Constraints::default(),
566+
column_defaults: HashMap::new(),
567+
};
568+
569+
let _table_provider = factory.create(&state, &cmd).await.unwrap();
570+
571+
assert!(
572+
file_statistics_cache.len() > 0,
573+
"Statistics cache should be pre-warmed when collect_statistics is enabled"
574+
);
575+
576+
// Test with collect_statistics disabled
577+
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
578+
let cache_config = CacheManagerConfig::default()
579+
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
580+
let runtime = RuntimeEnvBuilder::new()
581+
.with_cache_manager(cache_config)
582+
.build_arc()
583+
.unwrap();
584+
585+
let mut config = SessionConfig::new();
586+
config.options_mut().execution.collect_statistics = false;
587+
let context = SessionContext::new_with_config_rt(config, runtime);
588+
let state = context.state();
589+
let name = TableReference::bare("test");
590+
591+
let cmd = CreateExternalTable {
592+
name,
593+
location: parquet_test_data(),
594+
file_type: "parquet".to_string(),
595+
schema: Arc::new(DFSchema::empty()),
596+
table_partition_cols: vec![],
597+
if_not_exists: false,
598+
or_replace: false,
599+
temporary: false,
600+
definition: None,
601+
order_exprs: vec![],
602+
unbounded: false,
603+
options: HashMap::new(),
604+
constraints: Constraints::default(),
605+
column_defaults: HashMap::new(),
606+
};
607+
608+
let _table_provider = factory.create(&state, &cmd).await.unwrap();
609+
610+
assert_eq!(
611+
file_statistics_cache.len(),
612+
0,
613+
"Statistics cache should not be pre-warmed when collect_statistics is disabled"
614+
);
615+
}
522616
}

0 commit comments

Comments
 (0)