diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index cbe538ae35cc..c25e9ff141bb 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -578,6 +578,11 @@ impl TableProvider for ListingTable { let keep_partition_by_columns = state.config_options().execution.keep_partition_by_columns; + // Invalidate cache entries for this table if they exist + if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() { + let _ = lfc.remove(table_path.prefix()); + } + // Sink related option, apart from format let config = FileSinkConfig { original_url: String::default(), diff --git a/datafusion/core/tests/datasource/object_store_access.rs b/datafusion/core/tests/datasource/object_store_access.rs index 4a2b68fb11b0..2e1b1484076d 100644 --- a/datafusion/core/tests/datasource/object_store_access.rs +++ b/datafusion/core/tests/datasource/object_store_access.rs @@ -98,6 +98,59 @@ async fn create_multi_file_csv_file() { ); } +#[tokio::test] +async fn multi_query_multi_file_csv_file() { + let test = Test::new().with_multi_file_csv().await; + assert_snapshot!( + test.query("select * from csv_table").await, + @r" + ------- Query Output (6 rows) ------- + +---------+-------+-------+ + | c1 | c2 | c3 | + +---------+-------+-------+ + | 0.0 | 0.0 | true | + | 0.00003 | 5e-12 | false | + | 0.00001 | 1e-12 | true | + | 0.00003 | 5e-12 | false | + | 0.00002 | 2e-12 | true | + | 0.00003 | 5e-12 | false | + +---------+-------+-------+ + ------- Object Store Request Summary ------- + RequestCountingObjectStore() + Total Requests: 4 + - LIST prefix=data + - GET (opts) path=data/file_0.csv + - GET (opts) path=data/file_1.csv + - GET (opts) path=data/file_2.csv + " + ); + + // the second query should re-use the cached LIST results and should not reissue LIST + assert_snapshot!( + test.query("select * from csv_table").await, + @r" + ------- Query Output (6 rows) ------- + +---------+-------+-------+ + | c1 | c2 | c3 | + +---------+-------+-------+ + | 0.0 | 0.0 | true | + | 0.00003 | 5e-12 | false | + | 0.00001 | 1e-12 | true | + | 0.00003 | 5e-12 | false | + | 0.00002 | 2e-12 | true | + | 0.00003 | 5e-12 | false | + +---------+-------+-------+ + ------- Object Store Request Summary ------- + RequestCountingObjectStore() + Total Requests: 4 + - LIST prefix=data + - GET (opts) path=data/file_0.csv + - GET (opts) path=data/file_1.csv + - GET (opts) path=data/file_2.csv + " + ); +} + #[tokio::test] async fn query_multi_csv_file() { let test = Test::new().with_multi_file_csv().await; diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index db2715956d2f..22f015d802aa 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -30,9 +30,8 @@ use datafusion::prelude::SessionContext; use datafusion_common::stats::Precision; use datafusion_common::DFSchema; use datafusion_execution::cache::cache_manager::CacheManagerConfig; -use datafusion_execution::cache::cache_unit::{ - DefaultFileStatisticsCache, DefaultListFilesCache, -}; +use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; +use datafusion_execution::cache::DefaultListFilesCache; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{col, lit, Expr}; diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 98be9ce34b28..4b93cf1dcd5a 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -24,6 +24,9 @@ use std::any::Any; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use std::time::Duration; + +use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT; /// A cache for [`Statistics`]. /// @@ -41,9 +44,19 @@ pub type FileStatisticsCache = /// command on the local filesystem. This operation can be expensive, /// especially when done over remote object stores. /// -/// See [`crate::runtime_env::RuntimeEnv`] for more details -pub type ListFilesCache = - Arc>, Extra = ObjectMeta>>; +/// See [`crate::runtime_env::RuntimeEnv`] for more details. +pub trait ListFilesCache: + CacheAccessor>, Extra = ObjectMeta> +{ + /// Returns the cache's memory limit in bytes. + fn cache_limit(&self) -> usize; + + /// Returns the TTL (time-to-live) for cache entries, if configured. + fn cache_ttl(&self) -> Option; + + /// Updates the cache with a new memory limit in bytes. + fn update_cache_limit(&self, limit: usize); +} /// Generic file-embedded metadata used with [`FileMetadataCache`]. /// @@ -109,7 +122,7 @@ impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { } } -impl Debug for dyn CacheAccessor>, Extra = ObjectMeta> { +impl Debug for dyn ListFilesCache { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) } @@ -131,7 +144,7 @@ impl Debug for dyn FileMetadataCache { #[derive(Debug)] pub struct CacheManager { file_statistic_cache: Option, - list_files_cache: Option, + list_files_cache: Option>, file_metadata_cache: Arc, } @@ -166,10 +179,22 @@ impl CacheManager { } /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path. - pub fn get_list_files_cache(&self) -> Option { + pub fn get_list_files_cache(&self) -> Option> { self.list_files_cache.clone() } + /// Get the memory limit of the list files cache. + pub fn get_list_files_cache_limit(&self) -> usize { + self.list_files_cache + .as_ref() + .map_or(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, |c| c.cache_limit()) + } + + /// Get the TTL (time-to-live) of the list files cache. + pub fn get_list_files_cache_ttl(&self) -> Option { + self.list_files_cache.as_ref().and_then(|c| c.cache_ttl()) + } + /// Get the file embedded metadata cache. pub fn get_file_metadata_cache(&self) -> Arc { Arc::clone(&self.file_metadata_cache) @@ -185,17 +210,24 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M #[derive(Clone)] pub struct CacheManagerConfig { - /// Enable cache of files statistics when listing files. - /// Avoid get same file statistics repeatedly in same datafusion session. - /// Default is disable. Fow now only supports Parquet files. + /// Enable caching of file statistics when listing files. + /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session. + /// Default is disabled. Currently only Parquet files are supported. pub table_files_statistics_cache: Option, - /// Enable cache of file metadata when listing files. - /// This setting avoids listing file meta of the same path repeatedly - /// in same session, which may be expensive in certain situations (e.g. remote object storage). + /// Enable caching of file metadata when listing files. + /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be + /// expensive in certain situations (e.g. remote object storage), for objects under paths that + /// are cached. /// Note that if this option is enabled, DataFusion will not see any updates to the underlying - /// location. - /// Default is disable. - pub list_files_cache: Option, + /// storage for at least `list_files_cache_ttl` duration. + /// Default is disabled. + pub list_files_cache: Option>, + /// Limit of the `list_files_cache`, in bytes. Default: 1MiB. + pub list_files_cache_limit: usize, + /// The duration the list files cache will consider an entry valid after insertion. Note that + /// changes to the underlying storage system, such as adding or removing data, will not be + /// visible until an entry expires. Default: None (infinite). + pub list_files_cache_ttl: Option, /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a /// data file (e.g., Parquet footer and page metadata). /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. @@ -209,6 +241,8 @@ impl Default for CacheManagerConfig { Self { table_files_statistics_cache: Default::default(), list_files_cache: Default::default(), + list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, + list_files_cache_ttl: None, file_metadata_cache: Default::default(), metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT, } @@ -228,13 +262,32 @@ impl CacheManagerConfig { } /// Set the cache for listing files. - /// + /// /// Default is `None` (disabled). - pub fn with_list_files_cache(mut self, cache: Option) -> Self { + pub fn with_list_files_cache( + mut self, + cache: Option>, + ) -> Self { self.list_files_cache = cache; self } + /// Sets the limit of the list files cache, in bytes. + /// + /// Default: 1MiB (1,048,576 bytes). + pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self { + self.list_files_cache_limit = limit; + self + } + + /// Sets the TTL (time-to-live) for entries in the list files cache. + /// + /// Default: None (infinite). + pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self { + self.list_files_cache_ttl = Some(ttl); + self + } + /// Sets the cache for file-embedded metadata. /// /// Default is a [`DefaultFilesMetadataCache`]. diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 0f9cff54d587..d55645a9bde0 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -107,71 +107,6 @@ impl CacheAccessor> for DefaultFileStatisticsCache { } } -/// Default implementation of [`ListFilesCache`] -/// -/// Collected files metadata for listing files. -/// -/// Cache is not invalided until user calls [`Self::remove`] or [`Self::clear`]. -/// -/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache -#[derive(Default)] -pub struct DefaultListFilesCache { - statistics: DashMap>>, -} - -impl CacheAccessor>> for DefaultListFilesCache { - type Extra = ObjectMeta; - - fn get(&self, k: &Path) -> Option>> { - self.statistics.get(k).map(|x| Arc::clone(x.value())) - } - - fn get_with_extra( - &self, - _k: &Path, - _e: &Self::Extra, - ) -> Option>> { - panic!("Not supported DefaultListFilesCache get_with_extra") - } - - fn put( - &self, - key: &Path, - value: Arc>, - ) -> Option>> { - self.statistics.insert(key.clone(), value) - } - - fn put_with_extra( - &self, - _key: &Path, - _value: Arc>, - _e: &Self::Extra, - ) -> Option>> { - panic!("Not supported DefaultListFilesCache put_with_extra") - } - - fn remove(&self, k: &Path) -> Option>> { - self.statistics.remove(k).map(|x| x.1) - } - - fn contains_key(&self, k: &Path) -> bool { - self.statistics.contains_key(k) - } - - fn len(&self) -> usize { - self.statistics.len() - } - - fn clear(&self) { - self.statistics.clear() - } - - fn name(&self) -> String { - "DefaultListFilesCache".to_string() - } -} - /// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. struct DefaultFilesMetadataCacheState { lru_queue: LruQueue)>, @@ -433,7 +368,7 @@ mod tests { FileMetadata, FileMetadataCache, FileMetadataCacheEntry, }; use crate::cache::cache_unit::{ - DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache, + DefaultFileStatisticsCache, DefaultFilesMetadataCache, }; use crate::cache::CacheAccessor; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -486,28 +421,6 @@ mod tests { assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); } - #[test] - fn test_list_file_cache() { - let meta = ObjectMeta { - location: Path::from("test"), - last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") - .unwrap() - .into(), - size: 1024, - e_tag: None, - version: None, - }; - - let cache = DefaultListFilesCache::default(); - assert!(cache.get(&meta.location).is_none()); - - cache.put(&meta.location, vec![meta.clone()].into()); - assert_eq!( - cache.get(&meta.location).unwrap().first().unwrap().clone(), - meta.clone() - ); - } - pub struct TestFileMetadata { metadata: String, } diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs new file mode 100644 index 000000000000..2bec2a1e70fc --- /dev/null +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -0,0 +1,731 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use datafusion_common::instant::Instant; +use object_store::{path::Path, ObjectMeta}; + +use crate::cache::{cache_manager::ListFilesCache, lru_queue::LruQueue, CacheAccessor}; + +/// Default implementation of [`ListFilesCache`] +/// +/// Caches file metadata for file listing operations. +/// +/// # Internal details +/// +/// The `memory_limit` parameter controls the maximum size of the cache, which uses a Least +/// Recently Used eviction algorithm. When adding a new entry, if the total number of entries in +/// the cache exceeds `memory_limit`, the least recently used entries are evicted until the total +/// size is lower than the `memory_limit`. +/// +/// # `Extra` Handling +/// +/// Users should use the [`Self::get`] and [`Self::put`] methods. The +/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call +/// `get` and `put`, respectively. +#[derive(Default)] +pub struct DefaultListFilesCache { + state: Mutex, +} + +impl DefaultListFilesCache { + /// Creates a new instance of [`DefaultListFilesCache`]. + /// + /// # Arguments + /// * `memory_limit` - The maximum size of the cache, in bytes. + /// * `ttl` - The TTL (time-to-live) of entries in the cache. + pub fn new(memory_limit: usize, ttl: Option) -> Self { + Self { + state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)), + } + } + + /// Returns the cache's memory limit in bytes. + pub fn cache_limit(&self) -> usize { + self.state.lock().unwrap().memory_limit + } + + /// Updates the cache with a new memory limit in bytes. + pub fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } + + /// Returns the TTL (time-to-live) applied to cache entries. + pub fn cache_ttl(&self) -> Option { + self.state.lock().unwrap().ttl + } +} + +struct ListFilesEntry { + metas: Arc>, + size_bytes: usize, + expires: Option, +} + +impl ListFilesEntry { + fn try_new(metas: Arc>, ttl: Option) -> Option { + let size_bytes = (metas.capacity() * size_of::()) + + metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?; + + Some(Self { + metas, + size_bytes, + expires: ttl.map(|t| Instant::now() + t), + }) + } +} + +/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. +fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { + let mut size = object_meta.location.as_ref().len(); + + if let Some(e) = &object_meta.e_tag { + size += e.len(); + } + if let Some(v) = &object_meta.version { + size += v.len(); + } + + size +} + +/// The default memory limit for the [`DefaultListFilesCache`] +pub(super) const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +/// Handles the inner state of the [`DefaultListFilesCache`] struct. +pub struct DefaultListFilesCacheState { + lru_queue: LruQueue, + memory_limit: usize, + memory_used: usize, + ttl: Option, +} + +impl Default for DefaultListFilesCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, + memory_used: 0, + ttl: None, + } + } +} + +impl DefaultListFilesCacheState { + fn new(memory_limit: usize, ttl: Option) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + ttl, + } + } + + /// Returns the respective entry from the cache, if it exists and the entry has not expired. + /// If the entry exists it becomes the most recently used. If the entry has expired it is + /// removed from the cache + fn get(&mut self, key: &Path) -> Option>> { + let entry = self.lru_queue.get(key)?; + + match entry.expires { + Some(exp) if Instant::now() > exp => { + self.remove(key); + None + } + _ => Some(Arc::clone(&entry.metas)), + } + } + + /// Checks if the respective entry is currently cached. If the entry has expired it is removed + /// from the cache. + /// The LRU queue is not updated. + fn contains_key(&mut self, k: &Path) -> bool { + let Some(entry) = self.lru_queue.peek(k) else { + return false; + }; + + match entry.expires { + Some(exp) if Instant::now() > exp => { + self.remove(k); + false + } + _ => true, + } + } + + /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. + /// If the key is already in the cache, the previous entry is returned. + /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. + fn put( + &mut self, + key: &Path, + value: Arc>, + ) -> Option>> { + let entry = ListFilesEntry::try_new(value, self.ttl)?; + let entry_size = entry.size_bytes; + + // no point in trying to add this value to the cache if it cannot fit entirely + if entry_size > self.memory_limit { + return None; + } + + // if the key is already in the cache, the old value is removed + let old_value = self.lru_queue.put(key.clone(), entry); + self.memory_used += entry_size; + + if let Some(entry) = &old_value { + self.memory_used -= entry.size_bytes; + } + + self.evict_entries(); + + old_value.map(|v| v.metas) + } + + /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + if let Some(removed) = self.lru_queue.pop() { + self.memory_used -= removed.1.size_bytes; + } else { + // cache is empty while memory_used > memory_limit, cannot happen + debug_assert!( + false, + "cache is empty while memory_used > memory_limit, cannot happen" + ); + return; + } + } + } + + /// Removes an entry from the cache and returns it, if it exists. + fn remove(&mut self, k: &Path) -> Option>> { + if let Some(entry) = self.lru_queue.remove(k) { + self.memory_used -= entry.size_bytes; + Some(entry.metas) + } else { + None + } + } + + /// Returns the number of entries currently cached. + fn len(&self) -> usize { + self.lru_queue.len() + } + + /// Removes all entries from the cache. + fn clear(&mut self) { + self.lru_queue.clear(); + self.memory_used = 0; + } +} + +impl ListFilesCache for DefaultListFilesCache { + fn cache_limit(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_limit + } + + fn cache_ttl(&self) -> Option { + let state = self.state.lock().unwrap(); + state.ttl + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } +} + +impl CacheAccessor>> for DefaultListFilesCache { + type Extra = ObjectMeta; + + fn get(&self, k: &Path) -> Option>> { + let mut state = self.state.lock().unwrap(); + state.get(k) + } + + fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option>> { + self.get(k) + } + + fn put( + &self, + key: &Path, + value: Arc>, + ) -> Option>> { + let mut state = self.state.lock().unwrap(); + state.put(key, value) + } + + fn put_with_extra( + &self, + key: &Path, + value: Arc>, + _e: &Self::Extra, + ) -> Option>> { + self.put(key, value) + } + + fn remove(&self, k: &Path) -> Option>> { + let mut state = self.state.lock().unwrap(); + state.remove(k) + } + + fn contains_key(&self, k: &Path) -> bool { + let mut state = self.state.lock().unwrap(); + state.contains_key(k) + } + + fn len(&self) -> usize { + let state = self.state.lock().unwrap(); + state.len() + } + + fn clear(&self) { + let mut state = self.state.lock().unwrap(); + state.clear(); + } + + fn name(&self) -> String { + String::from("DefaultListFilesCache") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::DateTime; + use std::thread; + + /// Helper function to create a test ObjectMeta with a specific path and location string size + fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta { + // Create a location string of the desired size by padding with zeros + let location_str = if location_size > path.len() { + format!("{}{}", path, "0".repeat(location_size - path.len())) + } else { + path.to_string() + }; + + ObjectMeta { + location: Path::from(location_str), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + } + } + + /// Helper function to create a vector of ObjectMeta with at least meta_size bytes + fn create_test_list_files_entry( + path: &str, + count: usize, + meta_size: usize, + ) -> (Path, Arc>, usize) { + let metas: Vec = (0..count) + .map(|i| create_test_object_meta(&format!("file{i}"), meta_size)) + .collect(); + let metas = Arc::new(metas); + + // Calculate actual size using the same logic as ListFilesEntry::try_new + let size = (metas.capacity() * size_of::()) + + metas.iter().map(meta_heap_bytes).sum::(); + + (Path::from(path), metas, size) + } + + #[test] + fn test_basic_operations() { + let cache = DefaultListFilesCache::default(); + let path = Path::from("test_path"); + + // Initially cache is empty + assert!(cache.get(&path).is_none()); + assert!(!cache.contains_key(&path)); + assert_eq!(cache.len(), 0); + + // Put an entry + let meta = create_test_object_meta("file1", 50); + let value = Arc::new(vec![meta.clone()]); + cache.put(&path, Arc::clone(&value)); + + // Entry should be retrievable + assert!(cache.contains_key(&path)); + assert_eq!(cache.len(), 1); + let retrieved = cache.get(&path).unwrap(); + assert_eq!(retrieved.len(), 1); + assert_eq!(retrieved[0].location, meta.location); + + // Remove the entry + let removed = cache.remove(&path).unwrap(); + assert_eq!(removed.len(), 1); + assert!(!cache.contains_key(&path)); + assert_eq!(cache.len(), 0); + + // Put multiple entries + let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, _) = create_test_list_files_entry("path2", 3, 50); + cache.put(&path1, value1); + cache.put(&path2, value2); + assert_eq!(cache.len(), 2); + + // Clear all entries + cache.clear(); + assert_eq!(cache.len(), 0); + assert!(!cache.contains_key(&path1)); + assert!(!cache.contains_key(&path2)); + } + + #[test] + fn test_lru_eviction_basic() { + let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + + // Set cache limit to exactly fit all three entries + let cache = DefaultListFilesCache::new(size * 3, None); + + // All three entries should fit + cache.put(&path1, value1); + cache.put(&path2, value2); + cache.put(&path3, value3); + assert_eq!(cache.len(), 3); + assert!(cache.contains_key(&path1)); + assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&path3)); + + // Adding a new entry should evict path1 (LRU) + let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); + cache.put(&path4, value4); + + assert_eq!(cache.len(), 3); + assert!(!cache.contains_key(&path1)); // Evicted + assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&path4)); + } + + #[test] + fn test_lru_ordering_after_access() { + let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + + // Set cache limit to fit exactly three entries + let cache = DefaultListFilesCache::new(size * 3, None); + + cache.put(&path1, value1); + cache.put(&path2, value2); + cache.put(&path3, value3); + assert_eq!(cache.len(), 3); + + // Access path1 to move it to front (MRU) + // Order is now: path2 (LRU), path3, path1 (MRU) + cache.get(&path1); + + // Adding a new entry should evict path2 (the LRU) + let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); + cache.put(&path4, value4); + + assert_eq!(cache.len(), 3); + assert!(cache.contains_key(&path1)); // Still present (recently accessed) + assert!(!cache.contains_key(&path2)); // Evicted (was LRU) + assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&path4)); + } + + #[test] + fn test_reject_too_large() { + let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + + // Set cache limit to fit both entries + let cache = DefaultListFilesCache::new(size * 2, None); + + cache.put(&path1, value1); + cache.put(&path2, value2); + assert_eq!(cache.len(), 2); + + // Try to add an entry that's too large to fit in the cache + let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000); + cache.put(&path_large, value_large); + + // Large entry should not be added + assert!(!cache.contains_key(&path_large)); + assert_eq!(cache.len(), 2); + assert!(cache.contains_key(&path1)); + assert!(cache.contains_key(&path2)); + } + + #[test] + fn test_multiple_evictions() { + let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + + // Set cache limit for exactly 3 entries + let cache = DefaultListFilesCache::new(size * 3, None); + + cache.put(&path1, value1); + cache.put(&path2, value2); + cache.put(&path3, value3); + assert_eq!(cache.len(), 3); + + // Add a large entry that requires evicting 2 entries + let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200); + cache.put(&path_large, value_large); + + // path1 and path2 should be evicted (both LRU), path3 and path_large remain + assert_eq!(cache.len(), 2); + assert!(!cache.contains_key(&path1)); // Evicted + assert!(!cache.contains_key(&path2)); // Evicted + assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&path_large)); + } + + #[test] + fn test_cache_limit_resize() { + let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + + let cache = DefaultListFilesCache::new(size * 3, None); + + // Add three entries + cache.put(&path1, value1); + cache.put(&path2, value2); + cache.put(&path3, value3); + assert_eq!(cache.len(), 3); + + // Resize cache to only fit one entry + cache.update_cache_limit(size); + + // Should keep only the most recent entry (path3, the MRU) + assert_eq!(cache.len(), 1); + assert!(cache.contains_key(&path3)); + // Earlier entries (LRU) should be evicted + assert!(!cache.contains_key(&path1)); + assert!(!cache.contains_key(&path2)); + } + + #[test] + fn test_entry_update_with_size_change() { + let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100); + + let cache = DefaultListFilesCache::new(size * 3, None); + + // Add three entries + cache.put(&path1, value1); + cache.put(&path2, value2); + cache.put(&path3, value3_v1); + assert_eq!(cache.len(), 3); + + // Update path3 with same size - should not cause eviction + let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100); + cache.put(&path3, value3_v2); + + assert_eq!(cache.len(), 3); + assert!(cache.contains_key(&path1)); + assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&path3)); + + // Update path3 with larger size that requires evicting path1 (LRU) + let (_, value3_v3, _) = create_test_list_files_entry("path3", 1, 200); + cache.put(&path3, value3_v3); + + assert_eq!(cache.len(), 2); + assert!(!cache.contains_key(&path1)); // Evicted (was LRU) + assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&path3)); + } + + #[test] + fn test_cache_with_ttl() { + let ttl = Duration::from_millis(100); + let cache = DefaultListFilesCache::new(10000, Some(ttl)); + + let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, _) = create_test_list_files_entry("path2", 2, 50); + + cache.put(&path1, value1); + cache.put(&path2, value2); + + // Entries should be accessible immediately + assert!(cache.get(&path1).is_some()); + assert!(cache.get(&path2).is_some()); + assert!(cache.contains_key(&path1)); + assert!(cache.contains_key(&path2)); + assert_eq!(cache.len(), 2); + + // Wait for TTL to expire + thread::sleep(Duration::from_millis(150)); + + // Entries should now return None and be removed when observed through get or contains_key + assert!(cache.get(&path1).is_none()); + assert_eq!(cache.len(), 1); // path1 was removed by get() + assert!(!cache.contains_key(&path2)); + assert_eq!(cache.len(), 0); // path2 was removed by contains_key() + } + + #[test] + fn test_cache_with_ttl_and_lru() { + let ttl = Duration::from_millis(200); + let cache = DefaultListFilesCache::new(1000, Some(ttl)); + + let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); + + cache.put(&path1, value1); + thread::sleep(Duration::from_millis(50)); + cache.put(&path2, value2); + thread::sleep(Duration::from_millis(50)); + + // path3 should evict path1 due to size limit + cache.put(&path3, value3); + assert!(!cache.contains_key(&path1)); // Evicted by LRU + assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&path3)); + + // Wait for path2 to expire + thread::sleep(Duration::from_millis(150)); + assert!(!cache.contains_key(&path2)); // Expired + assert!(cache.contains_key(&path3)); // Still valid + } + + #[test] + fn test_meta_heap_bytes_calculation() { + // Test with minimal ObjectMeta (no e_tag, no version) + let meta1 = ObjectMeta { + location: Path::from("test"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: None, + }; + assert_eq!(meta_heap_bytes(&meta1), 4); // Just the location string "test" + + // Test with e_tag + let meta2 = ObjectMeta { + location: Path::from("test"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: Some("etag123".to_string()), + version: None, + }; + assert_eq!(meta_heap_bytes(&meta2), 4 + 7); // location (4) + e_tag (7) + + // Test with version + let meta3 = ObjectMeta { + location: Path::from("test"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: Some("v1.0".to_string()), + }; + assert_eq!(meta_heap_bytes(&meta3), 4 + 4); // location (4) + version (4) + + // Test with both e_tag and version + let meta4 = ObjectMeta { + location: Path::from("test"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: Some("tag".to_string()), + version: Some("ver".to_string()), + }; + assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); // location (4) + e_tag (3) + version (3) + } + + #[test] + fn test_entry_creation() { + // Test with empty vector + let empty_vec: Arc> = Arc::new(vec![]); + let entry = ListFilesEntry::try_new(empty_vec, None); + assert!(entry.is_none()); + + // Validate entry size + let metas: Vec = (0..5) + .map(|i| create_test_object_meta(&format!("file{i}"), 30)) + .collect(); + let metas = Arc::new(metas); + let entry = ListFilesEntry::try_new(metas, None).unwrap(); + assert_eq!(entry.metas.len(), 5); + // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes + let expected_size = + (entry.metas.capacity() * size_of::()) + (entry.metas.len() * 30); + assert_eq!(entry.size_bytes, expected_size); + + // Test with TTL + let meta = create_test_object_meta("file", 50); + let ttl = Duration::from_secs(10); + let entry = ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl)).unwrap(); + let created = Instant::now(); + assert!(entry.expires.unwrap() > created); + } + + #[test] + fn test_memory_tracking() { + let cache = DefaultListFilesCache::new(1000, None); + + // Verify cache starts with 0 memory used + { + let state = cache.state.lock().unwrap(); + assert_eq!(state.memory_used, 0); + } + + // Add entry and verify memory tracking + let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100); + cache.put(&path1, value1); + { + let state = cache.state.lock().unwrap(); + assert_eq!(state.memory_used, size1); + } + + // Add another entry + let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200); + cache.put(&path2, value2); + { + let state = cache.state.lock().unwrap(); + assert_eq!(state.memory_used, size1 + size2); + } + + // Remove first entry and verify memory decreases + cache.remove(&path1); + { + let state = cache.state.lock().unwrap(); + assert_eq!(state.memory_used, size2); + } + + // Clear and verify memory is 0 + cache.clear(); + { + let state = cache.state.lock().unwrap(); + assert_eq!(state.memory_used, 0); + } + } +} diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 6e0737154dec..461542732ae0 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -19,10 +19,15 @@ pub mod cache_manager; pub mod cache_unit; pub mod lru_queue; -/// The cache accessor, users usually working on this interface while manipulating caches. -/// This interface does not get `mut` references and thus has to handle its own -/// locking via internal mutability. It can be accessed via multiple concurrent queries -/// during planning and execution. +mod list_files_cache; + +pub use list_files_cache::DefaultListFilesCache; + +/// A trait that can be implemented to provide custom cache behavior for the caches managed by +/// [`cache_manager::CacheManager`]. +/// +/// Implementations must handle their own locking via internal mutability, as methods do not +/// take mutable references and may be accessed by multiple concurrent queries. pub trait CacheAccessor: Send + Sync { // Extra info but not part of the cache key or cache value. type Extra: Clone; diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index abcbe897d2d3..82e93167f58f 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -33,12 +33,12 @@ use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry}; use datafusion_common::{config::ConfigEntry, Result}; use object_store::ObjectStore; -use std::path::PathBuf; use std::sync::Arc; use std::{ fmt::{Debug, Formatter}, num::NonZeroUsize, }; +use std::{path::PathBuf, time::Duration}; use url::Url; #[derive(Clone)] @@ -387,6 +387,18 @@ impl RuntimeEnvBuilder { self } + /// Specifies the memory limit for the object list cache, in bytes. + pub fn with_object_list_cache_limit(mut self, limit: usize) -> Self { + self.cache_manager = self.cache_manager.with_list_files_cache_limit(limit); + self + } + + /// Specifies the duration entries in the object list cache will be considered valid. + pub fn with_object_list_cache_ttl(mut self, ttl: Duration) -> Self { + self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl); + self + } + /// Build a RuntimeEnv pub fn build(self) -> Result { let Self { @@ -428,6 +440,10 @@ impl RuntimeEnvBuilder { .cache_manager .get_file_statistic_cache(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), + list_files_cache_limit: runtime_env + .cache_manager + .get_list_files_cache_limit(), + list_files_cache_ttl: runtime_env.cache_manager.get_list_files_cache_ttl(), file_metadata_cache: Some( runtime_env.cache_manager.get_file_metadata_cache(), ), diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 7fb12353b922..c555937837c6 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -40,6 +40,23 @@ directly on the `Field`. For example: + let field = Arc::clone(df_schema.field("my_column")); ``` +### ListingTableProvider now caches `LIST` commands + +In prior versions, `ListingTableProvider` would issue `LIST` commands to +the underlying object store each time it needed to list files for a query. +To improve performance, `ListingTableProvider` now caches the results of +`LIST` commands for the lifetime of the `ListingTableProvider` instance. + +Note that by default the cache has no expiration time, so if files are added or removed +from the underlying object store, the `ListingTableProvider` will not see +those changes until the `ListingTableProvider` instance is dropped and recreated. + +You will be able to configure the maximum cache size and cache expiration time via a configuration option: + +See for more details. + +Note that the internal API has changed to use a trait `ListFilesCache` instead of a type alias. + ### Removal of `pyarrow` feature The `pyarrow` feature flag has been removed. This feature has been migrated to