Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ impl TableProvider for ListingTable {
let keep_partition_by_columns =
state.config_options().execution.keep_partition_by_columns;

// Invalidate cache entries for this table if they exist
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
let _ = lfc.remove(table_path.prefix());
}
Comment on lines +581 to +584
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the initial implementation I chose to use the simple option of invalidating a table's cache entries on INSERT. Since the default TTL is infinite any DataSink that only supports writing new files (as opposed to appending) would never pick up inserts without some action being taken here.

It would be very nice to handle this more elegantly in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Thank you


// Sink related option, apart from format
let config = FileSinkConfig {
original_url: String::default(),
Expand Down
53 changes: 53 additions & 0 deletions datafusion/core/tests/datasource/object_store_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,59 @@ async fn create_multi_file_csv_file() {
);
}

#[tokio::test]
async fn multi_query_multi_file_csv_file() {
let test = Test::new().with_multi_file_csv().await;
assert_snapshot!(
test.query("select * from csv_table").await,
@r"
------- Query Output (6 rows) -------
+---------+-------+-------+
| c1 | c2 | c3 |
+---------+-------+-------+
| 0.0 | 0.0 | true |
| 0.00003 | 5e-12 | false |
| 0.00001 | 1e-12 | true |
| 0.00003 | 5e-12 | false |
| 0.00002 | 2e-12 | true |
| 0.00003 | 5e-12 | false |
+---------+-------+-------+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 4
- LIST prefix=data
- GET (opts) path=data/file_0.csv
- GET (opts) path=data/file_1.csv
- GET (opts) path=data/file_2.csv
"
);

// the second query should re-use the cached LIST results and should not reissue LIST
assert_snapshot!(
test.query("select * from csv_table").await,
@r"
------- Query Output (6 rows) -------
+---------+-------+-------+
| c1 | c2 | c3 |
+---------+-------+-------+
| 0.0 | 0.0 | true |
| 0.00003 | 5e-12 | false |
| 0.00001 | 1e-12 | true |
| 0.00003 | 5e-12 | false |
| 0.00002 | 2e-12 | true |
| 0.00003 | 5e-12 | false |
+---------+-------+-------+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BlakeOrth this seems wrong to me -- now that we have a ListFilesCache, I would expect this query to NOT need to do a LIST command 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current state of this PR I think this result is actually expected. I chose to remove the code that enables the cache by default, as well as this test (they were in the POC for illustration purposes). I thought since the cache currently cannot be configured, and really specifically that cannot be disabled, it would be best to leave it disabled by default to avoid breaking users.

Additionally, quite a few of the sqllogic tests seem to rely on the copy functionality and the existing expectation that DataFusion will pick up the changes on query. The easiest way to handle those tests will be to disable this cache for the duration of the tests, but that again relies on the cache having some configuration.

All that being said, I'm more than happy to move forward on this effort in whatever way you think is best! I can add a very simple configuration option (and docs etc.) to enable/disable this cache so we can keep this test you've written and have it show a reduction in LIST operations, or we can leave that as a portion of the "configuration issue" associated with this cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to do additional work in a follow on R -- I see there is already a ticket tracking the work to turn it on by default,

- LIST prefix=data
- GET (opts) path=data/file_0.csv
- GET (opts) path=data/file_1.csv
- GET (opts) path=data/file_2.csv
"
);
}

#[tokio::test]
async fn query_multi_csv_file() {
let test = Test::new().with_multi_file_csv().await;
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use datafusion::prelude::SessionContext;
use datafusion_common::stats::Precision;
use datafusion_common::DFSchema;
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultListFilesCache,
};
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_execution::cache::DefaultListFilesCache;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit, Expr};
Expand Down
87 changes: 70 additions & 17 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;

use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT;

/// A cache for [`Statistics`].
///
Expand All @@ -41,9 +44,19 @@ pub type FileStatisticsCache =
/// command on the local filesystem. This operation can be expensive,
/// especially when done over remote object stores.
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details
pub type ListFilesCache =
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
pub trait ListFilesCache:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one of the things I would like to do in the Cache Manager caches is to segregate the cache eviction policy. Personally I think the user should be given an option on what is the eviction behaviour they want. wdyt @alamb @BlakeOrth ? I can work on getting some draft out this weekend on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable suggestion, and it looks like there may be some ongoing discussion around this topic here:

While that work would undoubtedly impact this (and other) default cache implementations, I think it should probably be discussed separately to this effort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am personally in favor of keeping what is in DataFusion as simple to use / understand as possible (which i think this and the metadata cache do)

In terms of customizable eviction strategy, as we have mentioned elsewhere that is already possible today, but it requires effectively copying/forking the entire cache implementation which adds to the maintenance burden of downstream projects

However, adding more APIs to DataFusion increases the maintenance burden on the core project

So I see customizable eviction strategies as a tradeoff. If there are multiple users who are likely to use a customizable eviction strategy, I agree it makes sense to put it in the core repo. If there are not that many, I think it would be better to keep DataFusion simpler and move the burden downstream for those users who need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb After reading your thoughts on this I'm wondering if a more complex cache infrastructure would be a good project to start in https://github.com/datafusion-contrib and if it gains enough traction perhaps it could be promoted to the core repo?

Copy link
Contributor

@alamb alamb Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds like a great idea to me -- i think there is a wide wonderful world of complex caching strategies that people might want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little bit side topic. @alamb was looking into other caches, didn't understand why DefaultFileStatisticsCache isn't memory bound yet. Do we want to do it incrementally?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you are asking -- do you mean metadata_cache_limit on https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little bit side topic. @alamb was looking into other caches, didn't understand why DefaultFileStatisticsCache isn't memory bound yet. Do we want to do it incrementally?

I have filed a ticket for this item:

CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
{
/// Returns the cache's memory limit in bytes.
fn cache_limit(&self) -> usize;

/// Returns the TTL (time-to-live) for cache entries, if configured.
fn cache_ttl(&self) -> Option<Duration>;

/// Updates the cache with a new memory limit in bytes.
fn update_cache_limit(&self, limit: usize);
}

/// Generic file-embedded metadata used with [`FileMetadataCache`].
///
Expand Down Expand Up @@ -109,7 +122,7 @@ impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
}
}

impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
impl Debug for dyn ListFilesCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
Expand All @@ -131,7 +144,7 @@ impl Debug for dyn FileMetadataCache {
#[derive(Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
list_files_cache: Option<ListFilesCache>,
list_files_cache: Option<Arc<dyn ListFilesCache>>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
}

Expand Down Expand Up @@ -166,10 +179,22 @@ impl CacheManager {
}

/// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
self.list_files_cache.clone()
}

/// Get the memory limit of the list files cache.
pub fn get_list_files_cache_limit(&self) -> usize {
self.list_files_cache
.as_ref()
.map_or(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, |c| c.cache_limit())
}

/// Get the TTL (time-to-live) of the list files cache.
pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
}

/// Get the file embedded metadata cache.
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
Arc::clone(&self.file_metadata_cache)
Expand All @@ -185,17 +210,24 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M

#[derive(Clone)]
pub struct CacheManagerConfig {
/// Enable cache of files statistics when listing files.
/// Avoid get same file statistics repeatedly in same datafusion session.
/// Default is disable. Fow now only supports Parquet files.
/// Enable caching of file statistics when listing files.
/// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
/// Default is disabled. Currently only Parquet files are supported.
pub table_files_statistics_cache: Option<FileStatisticsCache>,
/// Enable cache of file metadata when listing files.
/// This setting avoids listing file meta of the same path repeatedly
/// in same session, which may be expensive in certain situations (e.g. remote object storage).
/// Enable caching of file metadata when listing files.
/// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
/// expensive in certain situations (e.g. remote object storage), for objects under paths that
/// are cached.
/// Note that if this option is enabled, DataFusion will not see any updates to the underlying
/// location.
/// Default is disable.
pub list_files_cache: Option<ListFilesCache>,
/// storage for at least `list_files_cache_ttl` duration.
/// Default is disabled.
pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
/// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
pub list_files_cache_limit: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it a little strange that the cache size is set on the cache manager itself rather than an options struct -- though now i see it is consistent with how metadata_cache_limits work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does seem like this API could be easier to use. It seems to me that this would be something that we might want to do as future work and fix up both the metadata and list cache at the same time since this PR has already grown to a pretty sizeable amount of code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this setup is used to support runtime configuration of the cache. See for example

"metadata_cache_limit" => {
let limit = Self::parse_memory_limit(value)?;
builder.with_metadata_cache_limit(limit)
}

I think what we should do (as a follow on PR) is to add runtime configuration settings for the max cache size and its ttl in https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings

I filed a ticket to do so:

/// The duration the list files cache will consider an entry valid after insertion. Note that
/// changes to the underlying storage system, such as adding or removing data, will not be
/// visible until an entry expires. Default: None (infinite).
pub list_files_cache_ttl: Option<Duration>,
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
/// data file (e.g., Parquet footer and page metadata).
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
Expand All @@ -209,6 +241,8 @@ impl Default for CacheManagerConfig {
Self {
table_files_statistics_cache: Default::default(),
list_files_cache: Default::default(),
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
list_files_cache_ttl: None,
file_metadata_cache: Default::default(),
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
}
Expand All @@ -228,13 +262,32 @@ impl CacheManagerConfig {
}

/// Set the cache for listing files.
///
///
/// Default is `None` (disabled).
pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
pub fn with_list_files_cache(
mut self,
cache: Option<Arc<dyn ListFilesCache>>,
) -> Self {
self.list_files_cache = cache;
self
}

/// Sets the limit of the list files cache, in bytes.
///
/// Default: 1MiB (1,048,576 bytes).
pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
self.list_files_cache_limit = limit;
self
}

/// Sets the TTL (time-to-live) for entries in the list files cache.
///
/// Default: None (infinite).
pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self {
self.list_files_cache_ttl = Some(ttl);
self
}

/// Sets the cache for file-embedded metadata.
///
/// Default is a [`DefaultFilesMetadataCache`].
Expand Down
89 changes: 1 addition & 88 deletions datafusion/execution/src/cache/cache_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,71 +107,6 @@ impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
}
}

/// Default implementation of [`ListFilesCache`]
///
/// Collected files metadata for listing files.
///
/// Cache is not invalided until user calls [`Self::remove`] or [`Self::clear`].
///
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
#[derive(Default)]
pub struct DefaultListFilesCache {
statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
}

impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
type Extra = ObjectMeta;

fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.get(k).map(|x| Arc::clone(x.value()))
}

fn get_with_extra(
&self,
_k: &Path,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported DefaultListFilesCache get_with_extra")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember what the rationale for this panic was. It seems to have come in via

Maybe @Ted-Jiang or @suremarc have some thoughts here

}

fn put(
&self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.insert(key.clone(), value)
}

fn put_with_extra(
&self,
_key: &Path,
_value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported DefaultListFilesCache put_with_extra")
}

fn remove(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.remove(k).map(|x| x.1)
}

fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}

fn len(&self) -> usize {
self.statistics.len()
}

fn clear(&self) {
self.statistics.clear()
}

fn name(&self) -> String {
"DefaultListFilesCache".to_string()
}
}

/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
struct DefaultFilesMetadataCacheState {
lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
Expand Down Expand Up @@ -433,7 +368,7 @@ mod tests {
FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
};
use crate::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache,
DefaultFileStatisticsCache, DefaultFilesMetadataCache,
};
use crate::cache::CacheAccessor;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
Expand Down Expand Up @@ -486,28 +421,6 @@ mod tests {
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
}

#[test]
fn test_list_file_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};

let cache = DefaultListFilesCache::default();
assert!(cache.get(&meta.location).is_none());

cache.put(&meta.location, vec![meta.clone()].into());
assert_eq!(
cache.get(&meta.location).unwrap().first().unwrap().clone(),
meta.clone()
);
}

pub struct TestFileMetadata {
metadata: String,
}
Expand Down
Loading