Skip to content

Commit 6a341a9

Browse files
Merge branch 'main' into add-filterexec-with-projection
2 parents f9a2710 + 1037f0a commit 6a341a9

18 files changed

Lines changed: 768 additions & 430 deletions

File tree

.asf.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ github:
6262
branch-51:
6363
required_pull_request_reviews:
6464
required_approving_review_count: 1
65+
branch-52:
66+
required_pull_request_reviews:
67+
required_approving_review_count: 1
6568
pull_requests:
6669
# enable updating head branches of pull requests
6770
allow_update_branch: true

.github/workflows/audit.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,7 @@ jobs:
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check
49-
run: cargo audit
49+
# RUSTSEC-2026-0001: https://rustsec.org/advisories/RUSTSEC-2026-0001.html
50+
# underlying rkyv is patched, but rustsec database not yet updated
51+
# Can remove when this is merged: https://github.com/rustsec/advisory-db/pull/2565
52+
run: cargo audit --ignore RUSTSEC-2026-0001

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/functions.rs

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717

1818
//! Functions that are query-able and searchable via the `\h` command
1919
20+
use datafusion_common::instant::Instant;
2021
use std::fmt;
2122
use std::fs::File;
2223
use std::str::FromStr;
2324
use std::sync::Arc;
2425

25-
use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
26-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
26+
use arrow::array::{
27+
DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
28+
TimestampMillisecondArray, UInt64Array,
29+
};
30+
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
31+
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
2732
use arrow::record_batch::RecordBatch;
2833
use arrow::util::pretty::pretty_format_batches;
2934
use datafusion::catalog::{Session, TableFunctionImpl};
@@ -697,3 +702,158 @@ impl TableFunctionImpl for StatisticsCacheFunc {
697702
Ok(Arc::new(statistics_cache))
698703
}
699704
}
705+
706+
#[derive(Debug)]
707+
struct ListFilesCacheTable {
708+
schema: SchemaRef,
709+
batch: RecordBatch,
710+
}
711+
712+
#[async_trait]
713+
impl TableProvider for ListFilesCacheTable {
714+
fn as_any(&self) -> &dyn std::any::Any {
715+
self
716+
}
717+
718+
fn schema(&self) -> arrow::datatypes::SchemaRef {
719+
self.schema.clone()
720+
}
721+
722+
fn table_type(&self) -> datafusion::logical_expr::TableType {
723+
datafusion::logical_expr::TableType::Base
724+
}
725+
726+
async fn scan(
727+
&self,
728+
_state: &dyn Session,
729+
projection: Option<&Vec<usize>>,
730+
_filters: &[Expr],
731+
_limit: Option<usize>,
732+
) -> Result<Arc<dyn ExecutionPlan>> {
733+
Ok(MemorySourceConfig::try_new_exec(
734+
&[vec![self.batch.clone()]],
735+
TableProvider::schema(self),
736+
projection.cloned(),
737+
)?)
738+
}
739+
}
740+
741+
#[derive(Debug)]
742+
pub struct ListFilesCacheFunc {
743+
cache_manager: Arc<CacheManager>,
744+
}
745+
746+
impl ListFilesCacheFunc {
747+
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
748+
Self { cache_manager }
749+
}
750+
}
751+
752+
impl TableFunctionImpl for ListFilesCacheFunc {
753+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
754+
if !exprs.is_empty() {
755+
return plan_err!("list_files_cache should have no arguments");
756+
}
757+
758+
let nested_fields = Fields::from(vec![
759+
Field::new("file_path", DataType::Utf8, false),
760+
Field::new(
761+
"file_modified",
762+
DataType::Timestamp(TimeUnit::Millisecond, None),
763+
false,
764+
),
765+
Field::new("file_size_bytes", DataType::UInt64, false),
766+
Field::new("e_tag", DataType::Utf8, true),
767+
Field::new("version", DataType::Utf8, true),
768+
]);
769+
770+
let metadata_field =
771+
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
772+
773+
let schema = Arc::new(Schema::new(vec![
774+
Field::new("path", DataType::Utf8, false),
775+
Field::new("metadata_size_bytes", DataType::UInt64, false),
776+
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
777+
Field::new(
778+
"expires_in",
779+
DataType::Duration(TimeUnit::Millisecond),
780+
true,
781+
),
782+
Field::new(
783+
"metadata_list",
784+
DataType::List(Arc::new(metadata_field.clone())),
785+
true,
786+
),
787+
]));
788+
789+
let mut path_arr = vec![];
790+
let mut metadata_size_bytes_arr = vec![];
791+
let mut expires_arr = vec![];
792+
793+
let mut file_path_arr = vec![];
794+
let mut file_modified_arr = vec![];
795+
let mut file_size_bytes_arr = vec![];
796+
let mut etag_arr = vec![];
797+
let mut version_arr = vec![];
798+
let mut offsets: Vec<i32> = vec![0];
799+
800+
if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
801+
let now = Instant::now();
802+
let mut current_offset: i32 = 0;
803+
804+
for (path, entry) in list_files_cache.list_entries() {
805+
path_arr.push(path.to_string());
806+
metadata_size_bytes_arr.push(entry.size_bytes as u64);
807+
// calculates time left before entry expires
808+
expires_arr.push(
809+
entry
810+
.expires
811+
.map(|t| t.duration_since(now).as_millis() as i64),
812+
);
813+
814+
for meta in entry.metas.iter() {
815+
file_path_arr.push(meta.location.to_string());
816+
file_modified_arr.push(meta.last_modified.timestamp_millis());
817+
file_size_bytes_arr.push(meta.size);
818+
etag_arr.push(meta.e_tag.clone());
819+
version_arr.push(meta.version.clone());
820+
}
821+
current_offset += entry.metas.len() as i32;
822+
offsets.push(current_offset);
823+
}
824+
}
825+
826+
let struct_arr = StructArray::new(
827+
nested_fields,
828+
vec![
829+
Arc::new(StringArray::from(file_path_arr)),
830+
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
831+
Arc::new(UInt64Array::from(file_size_bytes_arr)),
832+
Arc::new(StringArray::from(etag_arr)),
833+
Arc::new(StringArray::from(version_arr)),
834+
],
835+
None,
836+
);
837+
838+
let offsets_buffer: OffsetBuffer<i32> =
839+
OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
840+
841+
let batch = RecordBatch::try_new(
842+
schema.clone(),
843+
vec![
844+
Arc::new(StringArray::from(path_arr)),
845+
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
846+
Arc::new(DurationMillisecondArray::from(expires_arr)),
847+
Arc::new(GenericListArray::new(
848+
Arc::new(metadata_field),
849+
offsets_buffer,
850+
Arc::new(struct_arr),
851+
None,
852+
)),
853+
],
854+
)?;
855+
856+
let list_files_cache = ListFilesCacheTable { schema, batch };
857+
Ok(Arc::new(list_files_cache))
858+
}
859+
}

0 commit comments

Comments
 (0)