Skip to content

Commit f2f4df8

Browse files
committed
POC for DefaultListFilesCache
- Implements a POC version of a default ListFilesCache - Refactors the existing ListFilesCache to mirror the MetadataCache by defining a new trait instead of a fixed type wrapping a trait - Bounds the size of the cache based on number of entries - Expires entries in the cache after a default timeout duration
1 parent f1ecacc commit f2f4df8

File tree

5 files changed

+284
-45
lines changed

5 files changed

+284
-45
lines changed

datafusion/core/tests/datasource/object_store_access.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ async fn query_multi_csv_file() {
117117
+---------+-------+-------+
118118
------- Object Store Request Summary -------
119119
RequestCountingObjectStore()
120-
Total Requests: 4
121-
- LIST prefix=data
120+
Total Requests: 3
122121
- GET (opts) path=data/file_0.csv
123122
- GET (opts) path=data/file_1.csv
124123
- GET (opts) path=data/file_2.csv
@@ -145,8 +144,7 @@ async fn query_partitioned_csv_file() {
145144
+---------+-------+-------+---+----+-----+
146145
------- Object Store Request Summary -------
147146
RequestCountingObjectStore()
148-
Total Requests: 4
149-
- LIST prefix=data
147+
Total Requests: 3
150148
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
151149
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
152150
- GET (opts) path=data/a=3/b=30/c=300/file_3.csv
@@ -183,8 +181,7 @@ async fn query_partitioned_csv_file() {
183181
+---------+-------+-------+---+----+-----+
184182
------- Object Store Request Summary -------
185183
RequestCountingObjectStore()
186-
Total Requests: 2
187-
- LIST prefix=data
184+
Total Requests: 1
188185
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
189186
"
190187
);
@@ -201,8 +198,7 @@ async fn query_partitioned_csv_file() {
201198
+---------+-------+-------+---+----+-----+
202199
------- Object Store Request Summary -------
203200
RequestCountingObjectStore()
204-
Total Requests: 2
205-
- LIST prefix=data
201+
Total Requests: 1
206202
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
207203
"
208204
);
@@ -237,8 +233,7 @@ async fn query_partitioned_csv_file() {
237233
+---------+-------+-------+---+----+-----+
238234
------- Object Store Request Summary -------
239235
RequestCountingObjectStore()
240-
Total Requests: 2
241-
- LIST prefix=data
236+
Total Requests: 1
242237
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
243238
"
244239
);

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ use std::any::Any;
2424
use std::collections::HashMap;
2525
use std::fmt::{Debug, Formatter};
2626
use std::sync::Arc;
27+
use std::time::Duration;
28+
29+
use super::cache_unit::{
30+
DefaultListFilesCache, DEFAULT_LIST_FILES_CACHE_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
31+
};
2732

2833
/// A cache for [`Statistics`].
2934
///
@@ -42,8 +47,18 @@ pub type FileStatisticsCache =
4247
/// especially when done over remote object stores.
4348
///
4449
/// See [`crate::runtime_env::RuntimeEnv`] for more details
45-
pub type ListFilesCache =
46-
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
50+
pub trait ListFilesCache:
51+
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
52+
{
53+
// Returns the cache's object limit.
54+
fn cache_limit(&self) -> usize;
55+
56+
// Returns the cache's object ttl.
57+
fn cache_ttl(&self) -> Duration;
58+
59+
// Updates the cache with a new boject limit.
60+
fn update_cache_limit(&self, limit: usize);
61+
}
4762

4863
/// Generic file-embedded metadata used with [`FileMetadataCache`].
4964
///
@@ -109,7 +124,7 @@ impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
109124
}
110125
}
111126

112-
impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
127+
impl Debug for dyn ListFilesCache {
113128
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114129
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
115130
}
@@ -131,7 +146,7 @@ impl Debug for dyn FileMetadataCache {
131146
#[derive(Debug)]
132147
pub struct CacheManager {
133148
file_statistic_cache: Option<FileStatisticsCache>,
134-
list_files_cache: Option<ListFilesCache>,
149+
list_files_cache: Option<Arc<dyn ListFilesCache>>,
135150
file_metadata_cache: Arc<dyn FileMetadataCache>,
136151
}
137152

@@ -140,7 +155,17 @@ impl CacheManager {
140155
let file_statistic_cache =
141156
config.table_files_statistics_cache.as_ref().map(Arc::clone);
142157

143-
let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);
158+
let list_files_cache = config
159+
.list_files_cache
160+
.as_ref()
161+
.map(Arc::clone)
162+
.unwrap_or_else(|| {
163+
Arc::new(DefaultListFilesCache::new(
164+
// TODO: config
165+
512 * 1024,
166+
Duration::new(600, 0),
167+
))
168+
});
144169

145170
let file_metadata_cache = config
146171
.file_metadata_cache
@@ -155,7 +180,7 @@ impl CacheManager {
155180

156181
Ok(Arc::new(CacheManager {
157182
file_statistic_cache,
158-
list_files_cache,
183+
list_files_cache: Some(list_files_cache), // TODO: reinstate optionality
159184
file_metadata_cache,
160185
}))
161186
}
@@ -166,10 +191,24 @@ impl CacheManager {
166191
}
167192

168193
/// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
169-
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
194+
pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
170195
self.list_files_cache.clone()
171196
}
172197

198+
/// Get the limit of the file embedded metadata cache.
199+
pub fn get_list_files_cache_limit(&self) -> usize {
200+
self.list_files_cache
201+
.as_ref()
202+
.map_or(DEFAULT_LIST_FILES_CACHE_LIMIT, |c| c.cache_limit())
203+
}
204+
205+
/// Get the limit of the file embedded metadata cache.
206+
pub fn get_list_files_cache_ttl(&self) -> Duration {
207+
self.list_files_cache
208+
.as_ref()
209+
.map_or(DEFAULT_LIST_FILES_CACHE_TTL, |c| c.cache_ttl())
210+
}
211+
173212
/// Get the file embedded metadata cache.
174213
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
175214
Arc::clone(&self.file_metadata_cache)
@@ -189,13 +228,20 @@ pub struct CacheManagerConfig {
189228
/// Avoid get same file statistics repeatedly in same datafusion session.
190229
/// Default is disable. Fow now only supports Parquet files.
191230
pub table_files_statistics_cache: Option<FileStatisticsCache>,
192-
/// Enable cache of file metadata when listing files.
193-
/// This setting avoids listing file meta of the same path repeatedly
194-
/// in same session, which may be expensive in certain situations (e.g. remote object storage).
231+
/// Enable caching of file metadata when listing files.
232+
/// Enabling the cache avoids repeat list and metadata fetch operations, which may be expensive
233+
/// in certain situations (e.g. remote object storage), for objects under paths that are
234+
/// cached.
195235
/// Note that if this option is enabled, DataFusion will not see any updates to the underlying
196-
/// location.
197-
/// Default is disable.
198-
pub list_files_cache: Option<ListFilesCache>,
236+
/// storage for at least `list_files_cache_ttl` duration.
237+
/// Default is disabled.
238+
pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
239+
/// Limit the number of objects to keep in the `list_files_cache`. Default: ~125k objects
240+
pub list_files_cache_limit: usize,
241+
/// The duration the list files cache will consider an entry valid after insertion. Note that
242+
/// changes to the underlying storage system, such as adding or removing data, will not be
243+
/// visible until an entry expires. Default: 10 minutes.
244+
pub list_files_cache_ttl: Duration,
199245
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
200246
/// data file (e.g., Parquet footer and page metadata).
201247
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
@@ -209,6 +255,8 @@ impl Default for CacheManagerConfig {
209255
Self {
210256
table_files_statistics_cache: Default::default(),
211257
list_files_cache: Default::default(),
258+
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_LIMIT,
259+
list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
212260
file_metadata_cache: Default::default(),
213261
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
214262
}
@@ -228,13 +276,30 @@ impl CacheManagerConfig {
228276
}
229277

230278
/// Set the cache for listing files.
231-
///
279+
///
232280
/// Default is `None` (disabled).
233-
pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
281+
pub fn with_list_files_cache(
282+
mut self,
283+
cache: Option<Arc<dyn ListFilesCache>>,
284+
) -> Self {
234285
self.list_files_cache = cache;
235286
self
236287
}
237288

289+
pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
290+
self.list_files_cache_limit = limit;
291+
self
292+
}
293+
294+
pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self {
295+
self.list_files_cache_ttl = ttl;
296+
if ttl.is_zero() {
297+
self.list_files_cache = None
298+
}
299+
300+
self
301+
}
302+
238303
/// Sets the cache for file-embedded metadata.
239304
///
240305
/// Default is a [`DefaultFilesMetadataCache`].

0 commit comments

Comments
 (0)