Skip to content

Commit 6067aa2

Browse files
committed
Adds memory-bound default ListFilesCache
- Moves the default ListFilesCache implementation into a new list_files_cache module - Refactors the ListFilesCache to mirror the MetadataCache by defining a new trait instead of a fixed type wrapping a trait - Applies a memory based size limit on the cache - Allows for an optional TTL on cache entries - Implements unit tests for the new implementation - New docs for new methods and some small cleanup on docs for existing cache doc strings
1 parent 7cbb443 commit 6067aa2

File tree

7 files changed

+835
-113
lines changed

7 files changed

+835
-113
lines changed

datafusion/catalog-listing/src/table.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,11 @@ impl TableProvider for ListingTable {
578578
let keep_partition_by_columns =
579579
state.config_options().execution.keep_partition_by_columns;
580580

581+
// Invalidate cache entries for this table if they exist
582+
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
583+
let _ = lfc.remove(table_path.prefix());
584+
}
585+
581586
// Sink related option, apart from format
582587
let config = FileSinkConfig {
583588
original_url: String::default(),

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ use datafusion::prelude::SessionContext;
3030
use datafusion_common::stats::Precision;
3131
use datafusion_common::DFSchema;
3232
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
33-
use datafusion_execution::cache::cache_unit::{
34-
DefaultFileStatisticsCache, DefaultListFilesCache,
35-
};
33+
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
34+
use datafusion_execution::cache::DefaultListFilesCache;
3635
use datafusion_execution::config::SessionConfig;
3736
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
3837
use datafusion_expr::{col, lit, Expr};

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use std::any::Any;
2424
use std::collections::HashMap;
2525
use std::fmt::{Debug, Formatter};
2626
use std::sync::Arc;
27+
use std::time::Duration;
28+
29+
use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT;
2730

2831
/// A cache for [`Statistics`].
2932
///
@@ -41,9 +44,19 @@ pub type FileStatisticsCache =
4144
/// command on the local filesystem. This operation can be expensive,
4245
/// especially when done over remote object stores.
4346
///
44-
/// See [`crate::runtime_env::RuntimeEnv`] for more details
45-
pub type ListFilesCache =
46-
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
47+
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
48+
pub trait ListFilesCache:
49+
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
50+
{
51+
/// Returns the cache's memory limit in bytes.
52+
fn cache_limit(&self) -> usize;
53+
54+
/// Returns the TTL (time-to-live) for cache entries, if configured.
55+
fn cache_ttl(&self) -> Option<Duration>;
56+
57+
/// Updates the cache with a new memory limit in bytes.
58+
fn update_cache_limit(&self, limit: usize);
59+
}
4760

4861
/// Generic file-embedded metadata used with [`FileMetadataCache`].
4962
///
@@ -109,7 +122,7 @@ impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
109122
}
110123
}
111124

112-
impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
125+
impl Debug for dyn ListFilesCache {
113126
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114127
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
115128
}
@@ -131,7 +144,7 @@ impl Debug for dyn FileMetadataCache {
131144
#[derive(Debug)]
132145
pub struct CacheManager {
133146
file_statistic_cache: Option<FileStatisticsCache>,
134-
list_files_cache: Option<ListFilesCache>,
147+
list_files_cache: Option<Arc<dyn ListFilesCache>>,
135148
file_metadata_cache: Arc<dyn FileMetadataCache>,
136149
}
137150

@@ -166,10 +179,22 @@ impl CacheManager {
166179
}
167180

168181
/// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
169-
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
182+
pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
170183
self.list_files_cache.clone()
171184
}
172185

186+
/// Get the memory limit of the list files cache.
187+
pub fn get_list_files_cache_limit(&self) -> usize {
188+
self.list_files_cache
189+
.as_ref()
190+
.map_or(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, |c| c.cache_limit())
191+
}
192+
193+
/// Get the TTL (time-to-live) of the list files cache.
194+
pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
195+
self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
196+
}
197+
173198
/// Get the file embedded metadata cache.
174199
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
175200
Arc::clone(&self.file_metadata_cache)
@@ -185,17 +210,24 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
185210

186211
#[derive(Clone)]
187212
pub struct CacheManagerConfig {
188-
/// Enable cache of files statistics when listing files.
189-
/// Avoid get same file statistics repeatedly in same datafusion session.
190-
/// Default is disable. Fow now only supports Parquet files.
213+
/// Enable caching of file statistics when listing files.
214+
/// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
215+
/// Default is disabled. Currently only Parquet files are supported.
191216
pub table_files_statistics_cache: Option<FileStatisticsCache>,
192-
/// Enable cache of file metadata when listing files.
193-
/// This setting avoids listing file meta of the same path repeatedly
194-
/// in same session, which may be expensive in certain situations (e.g. remote object storage).
217+
/// Enable caching of file metadata when listing files.
218+
/// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
219+
/// expensive in certain situations (e.g. remote object storage), for objects under paths that
220+
/// are cached.
195221
/// Note that if this option is enabled, DataFusion will not see any updates to the underlying
196-
/// location.
197-
/// Default is disable.
198-
pub list_files_cache: Option<ListFilesCache>,
222+
/// storage for at least `list_files_cache_ttl` duration.
223+
/// Default is disabled.
224+
pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
225+
/// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
226+
pub list_files_cache_limit: usize,
227+
/// The duration the list files cache will consider an entry valid after insertion. Note that
228+
/// changes to the underlying storage system, such as adding or removing data, will not be
229+
/// visible until an entry expires. Default: None (infinite).
230+
pub list_files_cache_ttl: Option<Duration>,
199231
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
200232
/// data file (e.g., Parquet footer and page metadata).
201233
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
@@ -209,6 +241,8 @@ impl Default for CacheManagerConfig {
209241
Self {
210242
table_files_statistics_cache: Default::default(),
211243
list_files_cache: Default::default(),
244+
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
245+
list_files_cache_ttl: None,
212246
file_metadata_cache: Default::default(),
213247
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
214248
}
@@ -228,13 +262,32 @@ impl CacheManagerConfig {
228262
}
229263

230264
/// Set the cache for listing files.
231-
///
265+
///
232266
/// Default is `None` (disabled).
233-
pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
267+
pub fn with_list_files_cache(
268+
mut self,
269+
cache: Option<Arc<dyn ListFilesCache>>,
270+
) -> Self {
234271
self.list_files_cache = cache;
235272
self
236273
}
237274

275+
/// Sets the limit of the list files cache, in bytes.
276+
///
277+
/// Default: 1MiB (1,048,576 bytes).
278+
pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
279+
self.list_files_cache_limit = limit;
280+
self
281+
}
282+
283+
/// Sets the TTL (time-to-live) for entries in the list files cache.
284+
///
285+
/// Default: None (infinite).
286+
pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self {
287+
self.list_files_cache_ttl = Some(ttl);
288+
self
289+
}
290+
238291
/// Sets the cache for file-embedded metadata.
239292
///
240293
/// Default is a [`DefaultFilesMetadataCache`].

datafusion/execution/src/cache/cache_unit.rs

Lines changed: 1 addition & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -107,71 +107,6 @@ impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
107107
}
108108
}
109109

110-
/// Default implementation of [`ListFilesCache`]
111-
///
112-
/// Collected files metadata for listing files.
113-
///
114-
/// Cache is not invalided until user calls [`Self::remove`] or [`Self::clear`].
115-
///
116-
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
117-
#[derive(Default)]
118-
pub struct DefaultListFilesCache {
119-
statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
120-
}
121-
122-
impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
123-
type Extra = ObjectMeta;
124-
125-
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
126-
self.statistics.get(k).map(|x| Arc::clone(x.value()))
127-
}
128-
129-
fn get_with_extra(
130-
&self,
131-
_k: &Path,
132-
_e: &Self::Extra,
133-
) -> Option<Arc<Vec<ObjectMeta>>> {
134-
panic!("Not supported DefaultListFilesCache get_with_extra")
135-
}
136-
137-
fn put(
138-
&self,
139-
key: &Path,
140-
value: Arc<Vec<ObjectMeta>>,
141-
) -> Option<Arc<Vec<ObjectMeta>>> {
142-
self.statistics.insert(key.clone(), value)
143-
}
144-
145-
fn put_with_extra(
146-
&self,
147-
_key: &Path,
148-
_value: Arc<Vec<ObjectMeta>>,
149-
_e: &Self::Extra,
150-
) -> Option<Arc<Vec<ObjectMeta>>> {
151-
panic!("Not supported DefaultListFilesCache put_with_extra")
152-
}
153-
154-
fn remove(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
155-
self.statistics.remove(k).map(|x| x.1)
156-
}
157-
158-
fn contains_key(&self, k: &Path) -> bool {
159-
self.statistics.contains_key(k)
160-
}
161-
162-
fn len(&self) -> usize {
163-
self.statistics.len()
164-
}
165-
166-
fn clear(&self) {
167-
self.statistics.clear()
168-
}
169-
170-
fn name(&self) -> String {
171-
"DefaultListFilesCache".to_string()
172-
}
173-
}
174-
175110
/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
176111
struct DefaultFilesMetadataCacheState {
177112
lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
@@ -433,7 +368,7 @@ mod tests {
433368
FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
434369
};
435370
use crate::cache::cache_unit::{
436-
DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache,
371+
DefaultFileStatisticsCache, DefaultFilesMetadataCache,
437372
};
438373
use crate::cache::CacheAccessor;
439374
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
@@ -486,28 +421,6 @@ mod tests {
486421
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
487422
}
488423

489-
#[test]
490-
fn test_list_file_cache() {
491-
let meta = ObjectMeta {
492-
location: Path::from("test"),
493-
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
494-
.unwrap()
495-
.into(),
496-
size: 1024,
497-
e_tag: None,
498-
version: None,
499-
};
500-
501-
let cache = DefaultListFilesCache::default();
502-
assert!(cache.get(&meta.location).is_none());
503-
504-
cache.put(&meta.location, vec![meta.clone()].into());
505-
assert_eq!(
506-
cache.get(&meta.location).unwrap().first().unwrap().clone(),
507-
meta.clone()
508-
);
509-
}
510-
511424
pub struct TestFileMetadata {
512425
metadata: String,
513426
}

0 commit comments

Comments
 (0)